Replace per-track line-crossing counter with a single event state machine
gated by foreground pixel count (ENTER=250, EXIT=150) and finalized by
quiet-exit or timeout. Direction inferred from centroid excursion
(up_score vs down_score) on quiet-exit fires, and from net displacement
(last_c vs first_c) on timeout fires.
Tuning reflects bench data at the intended 7' overhead mount: walkers
produce smaller centroid excursions than originally modelled, so
EXTENT gates, MIN_TRAJ, MAX_FRAMES and REFRACTORY were all relaxed from
their initial guesses. Constants and rationale live in firmware/lib/cv/cv.h.
Bench results (8 isolated walks, 4 entries + 4 exits):
* Event detection: 8/8 (100%)
* Aggregate entries+exits split: 4+4 (matches)
* Per-walk direction labelling: 4/8 (~50%)
Document explicitly that per-walk direction is unreliable at this mount
and that downstream analytics should trust only gross traffic
(entries + exits). Recovering direction would require a physical mount
change or a richer signal; both are out of scope for v1.
Tooling:
* tools/replay_logs.py — replay event state machine against captured
[F] diagnostic lines, for offline tuning without flash-test loops.
* firmware/src/main_capture.cpp + tools/capture_frames.py +
tools/replay_frames.py — raw-frame capture firmware and Python port
of the detector, kept in tree for future iteration even though the
TimerCamera-F serial driver stripped specific byte ranges in testing
and log-based replay became the working path.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
212 lines
7.2 KiB
Python
212 lines
7.2 KiB
Python
#!/usr/bin/env python3
|
|
# tools/replay_frames.py
|
|
#
|
|
# Offline Python port of the event-based CV detector (firmware/lib/cv/cv.cpp).
|
|
# Reads a .bin file produced by capture_frames.py and prints events.
|
|
#
|
|
# Purpose: iterate algorithm changes in seconds instead of minutes. All
|
|
# constants match cv.h so baseline behavior matches firmware.
|
|
#
|
|
# Usage:
|
|
# python tools/replay_frames.py walk.bin
|
|
# python tools/replay_frames.py walk.bin --enter 250 --exit 150 --max 25
|
|
#
|
|
# Output: one line per frame with fg diagnostics, plus [ENTRY]/[EXIT] lines
|
|
# when the detector fires.
|
|
|
|
import argparse
|
|
import struct
|
|
import sys
|
|
|
|
import numpy as np
|
|
|
|
MAGIC = 0x314D5246 # 'FRM1'
|
|
W = H = 96
|
|
PIXELS = W * H
|
|
HEADER = 12
|
|
FRAME_LEN = HEADER + PIXELS
|
|
|
|
|
|
class Detector:
|
|
"""Mirror of firmware CV state machine. Single walker events, centroid
|
|
trajectory direction. Only per-frame fg_count + min/max y + centroid y
|
|
feed the decision — per-blob tracks are diagnostic in firmware, dropped
|
|
here."""
|
|
|
|
def __init__(self, args):
|
|
self.a = args
|
|
self.bg = None
|
|
self.ev_active = False
|
|
self.ev_frames = 0
|
|
self.ev_first_c = -1.0
|
|
self.ev_last_c = -1.0
|
|
self.ev_min_c = float(H)
|
|
self.ev_max_c = -1.0
|
|
self.ev_min_y = H
|
|
self.ev_max_y = -1
|
|
self.ev_quiet = 0
|
|
self.last_fire = 0
|
|
self.frame_ix = 0
|
|
self.entries = 0
|
|
self.exits = 0
|
|
|
|
def _reset_event(self):
|
|
self.ev_active = False
|
|
self.ev_frames = 0
|
|
self.ev_first_c = self.ev_last_c = -1.0
|
|
self.ev_min_c = float(H)
|
|
self.ev_max_c = -1.0
|
|
self.ev_min_y = H
|
|
self.ev_max_y = -1
|
|
self.ev_quiet = 0
|
|
|
|
def _finalize(self):
|
|
a = self.a
|
|
if self.ev_frames < a.min_frames: return None
|
|
if self.ev_min_y > a.extent_top: return None
|
|
if self.ev_max_y < a.extent_bot: return None
|
|
up = self.ev_first_c - self.ev_min_c
|
|
down = self.ev_max_c - self.ev_first_c
|
|
winning = max(up, down)
|
|
if winning < a.min_traj: return None
|
|
is_entry = up >= down
|
|
self.last_fire = self.frame_ix
|
|
info = dict(
|
|
kind='ENTRY' if is_entry else 'EXIT',
|
|
first=self.ev_first_c, min=self.ev_min_c,
|
|
max=self.ev_max_c, last=self.ev_last_c,
|
|
dur=self.ev_frames,
|
|
)
|
|
if is_entry: self.entries += 1
|
|
else: self.exits += 1
|
|
return info
|
|
|
|
def step(self, frame):
|
|
"""frame: uint8 array of shape (H, W). Returns list of fire dicts."""
|
|
self.frame_ix += 1
|
|
fires = []
|
|
|
|
if self.bg is None:
|
|
self.bg = frame.astype(np.int16)
|
|
return fires
|
|
|
|
bg = self.bg.astype(np.int16)
|
|
diff = np.abs(frame.astype(np.int16) - bg)
|
|
fg = (diff > self.a.diff_thresh).astype(np.uint8)
|
|
|
|
# Running-avg bg blend, frozen during active event.
|
|
if not self.ev_active:
|
|
self.bg = ((self.bg * 31 + frame.astype(np.int16)) >> 5)
|
|
|
|
fg_count = int(fg.sum())
|
|
if fg_count > 0:
|
|
row_counts = fg.sum(axis=1)
|
|
ys = np.where(row_counts > 0)[0]
|
|
min_y = int(ys.min())
|
|
max_y = int(ys.max())
|
|
centroid_y = float((row_counts * np.arange(H)).sum() / fg_count)
|
|
else:
|
|
min_y, max_y, centroid_y = -1, -1, -1.0
|
|
|
|
# Self-heal on catastrophic bg mismatch.
|
|
if fg_count > PIXELS // 2:
|
|
self.bg = frame.astype(np.int16)
|
|
if self.ev_active: self._reset_event()
|
|
return fires
|
|
|
|
a = self.a
|
|
in_refractory = (self.last_fire != 0 and
|
|
(self.frame_ix - self.last_fire) < a.refractory)
|
|
|
|
if not self.ev_active:
|
|
if not in_refractory and fg_count >= a.enter_thresh:
|
|
self.ev_active = True
|
|
self.ev_frames = 1
|
|
self.ev_first_c = centroid_y
|
|
self.ev_last_c = centroid_y
|
|
self.ev_min_c = centroid_y
|
|
self.ev_max_c = centroid_y
|
|
self.ev_min_y = min_y
|
|
self.ev_max_y = max_y
|
|
self.ev_quiet = 0
|
|
else:
|
|
self.ev_frames += 1
|
|
if fg_count > 0:
|
|
self.ev_last_c = centroid_y
|
|
if centroid_y < self.ev_min_c: self.ev_min_c = centroid_y
|
|
if centroid_y > self.ev_max_c: self.ev_max_c = centroid_y
|
|
if min_y < self.ev_min_y: self.ev_min_y = min_y
|
|
if max_y > self.ev_max_y: self.ev_max_y = max_y
|
|
|
|
ended = False
|
|
if fg_count < a.exit_thresh:
|
|
self.ev_quiet += 1
|
|
if self.ev_quiet >= a.quiet_frames:
|
|
ended = True
|
|
else:
|
|
self.ev_quiet = 0
|
|
if self.ev_frames > a.max_frames:
|
|
ended = True
|
|
|
|
if ended:
|
|
fire = self._finalize()
|
|
if fire: fires.append(fire)
|
|
self._reset_event()
|
|
self.bg = frame.astype(np.int16)
|
|
|
|
return fires, fg_count, min_y, max_y, centroid_y
|
|
|
|
|
|
def iter_frames(path):
|
|
with open(path, 'rb') as f:
|
|
data = f.read()
|
|
n = len(data) // FRAME_LEN
|
|
for i in range(n):
|
|
off = i * FRAME_LEN
|
|
magic, ix, ms = struct.unpack('<III', data[off:off + HEADER])
|
|
if magic != MAGIC:
|
|
raise RuntimeError(f'bad magic at frame {i}: 0x{magic:08x}')
|
|
frame = np.frombuffer(data, dtype=np.uint8,
|
|
count=PIXELS, offset=off + HEADER).reshape(H, W)
|
|
yield ix, ms, frame
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument('path')
|
|
ap.add_argument('--diff-thresh', dest='diff_thresh', type=int, default=30)
|
|
ap.add_argument('--enter', dest='enter_thresh', type=int, default=300)
|
|
ap.add_argument('--exit', dest='exit_thresh', type=int, default=200)
|
|
ap.add_argument('--quiet', dest='quiet_frames', type=int, default=3)
|
|
ap.add_argument('--min', dest='min_frames', type=int, default=5)
|
|
ap.add_argument('--max', dest='max_frames', type=int, default=25)
|
|
ap.add_argument('--extent-top', dest='extent_top', type=int, default=10)
|
|
ap.add_argument('--extent-bot', dest='extent_bot', type=int, default=85)
|
|
ap.add_argument('--min-traj', dest='min_traj', type=float, default=15.0)
|
|
ap.add_argument('--refractory', dest='refractory', type=int, default=15)
|
|
ap.add_argument('--quiet-log', action='store_true',
|
|
help='Suppress per-frame fg lines')
|
|
args = ap.parse_args()
|
|
|
|
det = Detector(args)
|
|
total = 0
|
|
for ix, ms, frame in iter_frames(args.path):
|
|
total += 1
|
|
out = det.step(frame)
|
|
if out == []:
|
|
if not args.quiet_log:
|
|
print(f'[{ix:4d}] bg init')
|
|
continue
|
|
fires, fg, miny, maxy, cy = out
|
|
if not args.quiet_log and fg > 0:
|
|
print(f'[{ix:4d}] n={fg:4d} y={miny:2d}..{maxy:2d} c={cy:5.1f}')
|
|
for fire in fires:
|
|
print(f' >>> {fire["kind"]} first={fire["first"]:.1f} '
|
|
f'min={fire["min"]:.1f} max={fire["max"]:.1f} '
|
|
f'last={fire["last"]:.1f} dur={fire["dur"]}')
|
|
print(f'\n# {total} frames entries={det.entries} exits={det.exits}')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|