Files
DoorCounter/tools/replay_frames.py
Peter Woolery a37207b6ff feat: event-based walker detector tuned to real 7' overhead mount
Replace per-track line-crossing counter with a single event state machine
gated by foreground pixel count (ENTER=250, EXIT=150) and finalized by
quiet-exit or timeout. Direction inferred from centroid excursion
(up_score vs down_score) on quiet-exit fires, and from net displacement
(last_c vs first_c) on timeout fires.

Tuning reflects bench data at the intended 7' overhead mount: walkers
produce smaller centroid excursions than originally modelled, so
EXTENT gates, MIN_TRAJ, MAX_FRAMES and REFRACTORY were all relaxed from
their initial guesses. Constants and rationale live in firmware/lib/cv/cv.h.

Bench results (8 isolated walks, 4 entries + 4 exits):
  * Event detection: 8/8 (100%)
  * Aggregate entries+exits split: 4+4 (matches)
  * Per-walk direction labelling: 4/8 (~50%)

Document explicitly that per-walk direction is unreliable at this mount
and that downstream analytics should trust only gross traffic
(entries + exits). Recovering direction would require a physical mount
change or a richer signal; both are out of scope for v1.

Tooling:
  * tools/replay_logs.py — replay event state machine against captured
    [F] diagnostic lines, for offline tuning without flash-test loops.
  * firmware/src/main_capture.cpp + tools/capture_frames.py +
    tools/replay_frames.py — raw-frame capture firmware and Python port
    of the detector, kept in tree for future iteration even though the
    TimerCamera-F serial driver stripped specific byte ranges in testing
    and log-based replay became the working path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-17 16:03:36 -07:00

212 lines
7.2 KiB
Python

#!/usr/bin/env python3
# tools/replay_frames.py
#
# Offline Python port of the event-based CV detector (firmware/lib/cv/cv.cpp).
# Reads a .bin file produced by capture_frames.py and prints events.
#
# Purpose: iterate algorithm changes in seconds instead of minutes. All
# constants match cv.h so baseline behavior matches firmware.
#
# Usage:
# python tools/replay_frames.py walk.bin
# python tools/replay_frames.py walk.bin --enter 250 --exit 150 --max 25
#
# Output: one line per frame with fg diagnostics, plus [ENTRY]/[EXIT] lines
# when the detector fires.
import argparse
import struct
import sys
import numpy as np
MAGIC = 0x314D5246 # 'FRM1'
W = H = 96
PIXELS = W * H
HEADER = 12
FRAME_LEN = HEADER + PIXELS
class Detector:
"""Mirror of firmware CV state machine. Single walker events, centroid
trajectory direction. Only per-frame fg_count + min/max y + centroid y
feed the decision — per-blob tracks are diagnostic in firmware, dropped
here."""
def __init__(self, args):
self.a = args
self.bg = None
self.ev_active = False
self.ev_frames = 0
self.ev_first_c = -1.0
self.ev_last_c = -1.0
self.ev_min_c = float(H)
self.ev_max_c = -1.0
self.ev_min_y = H
self.ev_max_y = -1
self.ev_quiet = 0
self.last_fire = 0
self.frame_ix = 0
self.entries = 0
self.exits = 0
def _reset_event(self):
self.ev_active = False
self.ev_frames = 0
self.ev_first_c = self.ev_last_c = -1.0
self.ev_min_c = float(H)
self.ev_max_c = -1.0
self.ev_min_y = H
self.ev_max_y = -1
self.ev_quiet = 0
def _finalize(self):
a = self.a
if self.ev_frames < a.min_frames: return None
if self.ev_min_y > a.extent_top: return None
if self.ev_max_y < a.extent_bot: return None
up = self.ev_first_c - self.ev_min_c
down = self.ev_max_c - self.ev_first_c
winning = max(up, down)
if winning < a.min_traj: return None
is_entry = up >= down
self.last_fire = self.frame_ix
info = dict(
kind='ENTRY' if is_entry else 'EXIT',
first=self.ev_first_c, min=self.ev_min_c,
max=self.ev_max_c, last=self.ev_last_c,
dur=self.ev_frames,
)
if is_entry: self.entries += 1
else: self.exits += 1
return info
def step(self, frame):
"""frame: uint8 array of shape (H, W). Returns list of fire dicts."""
self.frame_ix += 1
fires = []
if self.bg is None:
self.bg = frame.astype(np.int16)
return fires
bg = self.bg.astype(np.int16)
diff = np.abs(frame.astype(np.int16) - bg)
fg = (diff > self.a.diff_thresh).astype(np.uint8)
# Running-avg bg blend, frozen during active event.
if not self.ev_active:
self.bg = ((self.bg * 31 + frame.astype(np.int16)) >> 5)
fg_count = int(fg.sum())
if fg_count > 0:
row_counts = fg.sum(axis=1)
ys = np.where(row_counts > 0)[0]
min_y = int(ys.min())
max_y = int(ys.max())
centroid_y = float((row_counts * np.arange(H)).sum() / fg_count)
else:
min_y, max_y, centroid_y = -1, -1, -1.0
# Self-heal on catastrophic bg mismatch.
if fg_count > PIXELS // 2:
self.bg = frame.astype(np.int16)
if self.ev_active: self._reset_event()
return fires
a = self.a
in_refractory = (self.last_fire != 0 and
(self.frame_ix - self.last_fire) < a.refractory)
if not self.ev_active:
if not in_refractory and fg_count >= a.enter_thresh:
self.ev_active = True
self.ev_frames = 1
self.ev_first_c = centroid_y
self.ev_last_c = centroid_y
self.ev_min_c = centroid_y
self.ev_max_c = centroid_y
self.ev_min_y = min_y
self.ev_max_y = max_y
self.ev_quiet = 0
else:
self.ev_frames += 1
if fg_count > 0:
self.ev_last_c = centroid_y
if centroid_y < self.ev_min_c: self.ev_min_c = centroid_y
if centroid_y > self.ev_max_c: self.ev_max_c = centroid_y
if min_y < self.ev_min_y: self.ev_min_y = min_y
if max_y > self.ev_max_y: self.ev_max_y = max_y
ended = False
if fg_count < a.exit_thresh:
self.ev_quiet += 1
if self.ev_quiet >= a.quiet_frames:
ended = True
else:
self.ev_quiet = 0
if self.ev_frames > a.max_frames:
ended = True
if ended:
fire = self._finalize()
if fire: fires.append(fire)
self._reset_event()
self.bg = frame.astype(np.int16)
return fires, fg_count, min_y, max_y, centroid_y
def iter_frames(path):
with open(path, 'rb') as f:
data = f.read()
n = len(data) // FRAME_LEN
for i in range(n):
off = i * FRAME_LEN
magic, ix, ms = struct.unpack('<III', data[off:off + HEADER])
if magic != MAGIC:
raise RuntimeError(f'bad magic at frame {i}: 0x{magic:08x}')
frame = np.frombuffer(data, dtype=np.uint8,
count=PIXELS, offset=off + HEADER).reshape(H, W)
yield ix, ms, frame
def main():
ap = argparse.ArgumentParser()
ap.add_argument('path')
ap.add_argument('--diff-thresh', dest='diff_thresh', type=int, default=30)
ap.add_argument('--enter', dest='enter_thresh', type=int, default=300)
ap.add_argument('--exit', dest='exit_thresh', type=int, default=200)
ap.add_argument('--quiet', dest='quiet_frames', type=int, default=3)
ap.add_argument('--min', dest='min_frames', type=int, default=5)
ap.add_argument('--max', dest='max_frames', type=int, default=25)
ap.add_argument('--extent-top', dest='extent_top', type=int, default=10)
ap.add_argument('--extent-bot', dest='extent_bot', type=int, default=85)
ap.add_argument('--min-traj', dest='min_traj', type=float, default=15.0)
ap.add_argument('--refractory', dest='refractory', type=int, default=15)
ap.add_argument('--quiet-log', action='store_true',
help='Suppress per-frame fg lines')
args = ap.parse_args()
det = Detector(args)
total = 0
for ix, ms, frame in iter_frames(args.path):
total += 1
out = det.step(frame)
if out == []:
if not args.quiet_log:
print(f'[{ix:4d}] bg init')
continue
fires, fg, miny, maxy, cy = out
if not args.quiet_log and fg > 0:
print(f'[{ix:4d}] n={fg:4d} y={miny:2d}..{maxy:2d} c={cy:5.1f}')
for fire in fires:
print(f' >>> {fire["kind"]} first={fire["first"]:.1f} '
f'min={fire["min"]:.1f} max={fire["max"]:.1f} '
f'last={fire["last"]:.1f} dur={fire["dur"]}')
print(f'\n# {total} frames entries={det.entries} exits={det.exits}')
if __name__ == '__main__':
main()