#!/usr/bin/env python3 # tools/replay_frames.py # # Offline Python port of the event-based CV detector (firmware/lib/cv/cv.cpp). # Reads a .bin file produced by capture_frames.py and prints events. # # Purpose: iterate algorithm changes in seconds instead of minutes. All # constants match cv.h so baseline behavior matches firmware. # # Usage: # python tools/replay_frames.py walk.bin # python tools/replay_frames.py walk.bin --enter 250 --exit 150 --max 25 # # Output: one line per frame with fg diagnostics, plus [ENTRY]/[EXIT] lines # when the detector fires. import argparse import struct import sys import numpy as np MAGIC = 0x314D5246 # 'FRM1' W = H = 96 PIXELS = W * H HEADER = 12 FRAME_LEN = HEADER + PIXELS class Detector: """Mirror of firmware CV state machine. Single walker events, centroid trajectory direction. Only per-frame fg_count + min/max y + centroid y feed the decision — per-blob tracks are diagnostic in firmware, dropped here.""" def __init__(self, args): self.a = args self.bg = None self.ev_active = False self.ev_frames = 0 self.ev_first_c = -1.0 self.ev_last_c = -1.0 self.ev_min_c = float(H) self.ev_max_c = -1.0 self.ev_min_y = H self.ev_max_y = -1 self.ev_quiet = 0 self.last_fire = 0 self.frame_ix = 0 self.entries = 0 self.exits = 0 def _reset_event(self): self.ev_active = False self.ev_frames = 0 self.ev_first_c = self.ev_last_c = -1.0 self.ev_min_c = float(H) self.ev_max_c = -1.0 self.ev_min_y = H self.ev_max_y = -1 self.ev_quiet = 0 def _finalize(self): a = self.a if self.ev_frames < a.min_frames: return None if self.ev_min_y > a.extent_top: return None if self.ev_max_y < a.extent_bot: return None up = self.ev_first_c - self.ev_min_c down = self.ev_max_c - self.ev_first_c winning = max(up, down) if winning < a.min_traj: return None is_entry = up >= down self.last_fire = self.frame_ix info = dict( kind='ENTRY' if is_entry else 'EXIT', first=self.ev_first_c, min=self.ev_min_c, max=self.ev_max_c, last=self.ev_last_c, dur=self.ev_frames, ) if is_entry: self.entries += 1 else: self.exits += 1 return info def step(self, frame): """frame: uint8 array of shape (H, W). Returns list of fire dicts.""" self.frame_ix += 1 fires = [] if self.bg is None: self.bg = frame.astype(np.int16) return fires bg = self.bg.astype(np.int16) diff = np.abs(frame.astype(np.int16) - bg) fg = (diff > self.a.diff_thresh).astype(np.uint8) # Running-avg bg blend, frozen during active event. if not self.ev_active: self.bg = ((self.bg * 31 + frame.astype(np.int16)) >> 5) fg_count = int(fg.sum()) if fg_count > 0: row_counts = fg.sum(axis=1) ys = np.where(row_counts > 0)[0] min_y = int(ys.min()) max_y = int(ys.max()) centroid_y = float((row_counts * np.arange(H)).sum() / fg_count) else: min_y, max_y, centroid_y = -1, -1, -1.0 # Self-heal on catastrophic bg mismatch. if fg_count > PIXELS // 2: self.bg = frame.astype(np.int16) if self.ev_active: self._reset_event() return fires a = self.a in_refractory = (self.last_fire != 0 and (self.frame_ix - self.last_fire) < a.refractory) if not self.ev_active: if not in_refractory and fg_count >= a.enter_thresh: self.ev_active = True self.ev_frames = 1 self.ev_first_c = centroid_y self.ev_last_c = centroid_y self.ev_min_c = centroid_y self.ev_max_c = centroid_y self.ev_min_y = min_y self.ev_max_y = max_y self.ev_quiet = 0 else: self.ev_frames += 1 if fg_count > 0: self.ev_last_c = centroid_y if centroid_y < self.ev_min_c: self.ev_min_c = centroid_y if centroid_y > self.ev_max_c: self.ev_max_c = centroid_y if min_y < self.ev_min_y: self.ev_min_y = min_y if max_y > self.ev_max_y: self.ev_max_y = max_y ended = False if fg_count < a.exit_thresh: self.ev_quiet += 1 if self.ev_quiet >= a.quiet_frames: ended = True else: self.ev_quiet = 0 if self.ev_frames > a.max_frames: ended = True if ended: fire = self._finalize() if fire: fires.append(fire) self._reset_event() self.bg = frame.astype(np.int16) return fires, fg_count, min_y, max_y, centroid_y def iter_frames(path): with open(path, 'rb') as f: data = f.read() n = len(data) // FRAME_LEN for i in range(n): off = i * FRAME_LEN magic, ix, ms = struct.unpack(' 0: print(f'[{ix:4d}] n={fg:4d} y={miny:2d}..{maxy:2d} c={cy:5.1f}') for fire in fires: print(f' >>> {fire["kind"]} first={fire["first"]:.1f} ' f'min={fire["min"]:.1f} max={fire["max"]:.1f} ' f'last={fire["last"]:.1f} dur={fire["dur"]}') print(f'\n# {total} frames entries={det.entries} exits={det.exits}') if __name__ == '__main__': main()