feat: event-based walker detector tuned to real 7' overhead mount

Replace per-track line-crossing counter with a single event state machine
gated by foreground pixel count (ENTER=250, EXIT=150) and finalized by
quiet-exit or timeout. Direction inferred from centroid excursion
(up_score vs down_score) on quiet-exit fires, and from net displacement
(last_c vs first_c) on timeout fires.

Tuning reflects bench data at the intended 7' overhead mount: walkers
produce smaller centroid excursions than originally modelled, so
EXTENT gates, MIN_TRAJ, MAX_FRAMES and REFRACTORY were all relaxed from
their initial guesses. Constants and rationale live in firmware/lib/cv/cv.h.

Bench results (8 isolated walks, 4 entries + 4 exits):
  * Event detection: 8/8 (100%)
  * Aggregate entries+exits split: 4+4 (matches)
  * Per-walk direction labelling: 4/8 (~50%)

Document explicitly that per-walk direction is unreliable at this mount
and that downstream analytics should trust only gross traffic
(entries + exits). Recovering direction would require a physical mount
change or a richer signal; both are out of scope for v1.

Tooling:
  * tools/replay_logs.py — replay event state machine against captured
    [F] diagnostic lines, for offline tuning without flash-test loops.
  * firmware/src/main_capture.cpp + tools/capture_frames.py +
    tools/replay_frames.py — raw-frame capture firmware and Python port
    of the detector, kept in tree for future iteration even though the
    TimerCamera-F serial driver stripped specific byte ranges in testing
    and log-based replay became the working path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-17 16:03:36 -07:00
parent 3b471992f2
commit a37207b6ff
12 changed files with 1203 additions and 340 deletions

186
tools/replay_logs.py Normal file
View File

@@ -0,0 +1,186 @@
#!/usr/bin/env python3
# tools/replay_logs.py
#
# Replay the event state machine against text serial logs captured from the
# production firmware. Input lines of the form:
# [F] n=<fg_count> y=<min_y>..<max_y> c=<centroid_y>
#
# Those four values are exactly what the firmware's event state machine
# consumes — so we can iterate event-level params (thresholds, max_frames,
# extent gates, trajectory cutoffs, refractory) offline without needing raw
# frames or the device.
#
# Usage:
# python tools/replay_logs.py walk.log
# python tools/replay_logs.py walk.log --enter 250 --exit 100 --max 30 --min-traj 10
# cat walk.log | python tools/replay_logs.py - --ground-truth 12
import argparse
import re
import sys
LINE_RE = re.compile(
r'\[F\]\s+n=(?P<n>\d+)\s+y=(?P<miny>-?\d+)\.\.(?P<maxy>-?\d+)\s+c=(?P<c>-?\d+\.\d+)'
)
def parse_frames(text):
"""Yield (fg_count, min_y, max_y, centroid_y) per [F] line, in order."""
for line in text.splitlines():
m = LINE_RE.search(line)
if not m:
continue
yield int(m['n']), int(m['miny']), int(m['maxy']), float(m['c'])
class Detector:
"""Mirror of firmware event state machine. Only uses per-frame diagnostic
values — the same inputs the firmware feeds it."""
def __init__(self, a):
self.a = a
self.ev = False
self.ev_n = 0
self.ev_first = self.ev_last = -1.0
self.ev_min = 1e9
self.ev_max = -1.0
self.ev_miny = 1e9
self.ev_maxy = -1
self.ev_quiet = 0
self.last_fire = -10**9
self.ix = 0
self.entries = 0
self.exits = 0
self.fires = []
def _reset(self):
self.ev = False
self.ev_n = 0
self.ev_first = self.ev_last = -1.0
self.ev_min = 1e9; self.ev_max = -1.0
self.ev_miny = 1e9; self.ev_maxy = -1
self.ev_quiet = 0
def _finalize(self):
a = self.a
if self.ev_n < a.min_frames:
return ('reject_short', None)
if self.ev_miny > a.extent_top:
return ('reject_extent_top', None)
if self.ev_maxy < a.extent_bot:
return ('reject_extent_bot', None)
up = self.ev_first - self.ev_min
down = self.ev_max - self.ev_first
winning = max(up, down)
if winning < a.min_traj:
return ('reject_traj', None)
timed_out = self.ev_n > a.max_frames
if timed_out:
is_entry = self.ev_last < self.ev_first
else:
is_entry = up >= down
kind = 'ENTRY' if is_entry else 'EXIT'
self.last_fire = self.ix
info = dict(kind=kind, first=self.ev_first, min=self.ev_min,
max=self.ev_max, last=self.ev_last, dur=self.ev_n,
up=up, down=down, ix=self.ix)
if is_entry: self.entries += 1
else: self.exits += 1
self.fires.append(info)
return ('fire', info)
def step(self, n, miny, maxy, c):
self.ix += 1
a = self.a
refractory = (self.ix - self.last_fire) < a.refractory
if not self.ev:
if not refractory and n >= a.enter_thresh:
self.ev = True
self.ev_n = 1
self.ev_first = self.ev_last = c
self.ev_min = c; self.ev_max = c
self.ev_miny = miny; self.ev_maxy = maxy
self.ev_quiet = 0
return None
self.ev_n += 1
if n > 0:
self.ev_last = c
if c < self.ev_min: self.ev_min = c
if c > self.ev_max: self.ev_max = c
if miny < self.ev_miny: self.ev_miny = miny
if maxy > self.ev_maxy: self.ev_maxy = maxy
ended = False
if n < a.exit_thresh:
self.ev_quiet += 1
if self.ev_quiet >= a.quiet_frames:
ended = True
reason = 'quiet'
else:
self.ev_quiet = 0
if self.ev_n > a.max_frames:
ended = True
reason = 'timeout'
if ended:
result = self._finalize()
self._reset()
return (reason, result)
return None
def main():
ap = argparse.ArgumentParser()
ap.add_argument('path', help='log file, or - for stdin')
ap.add_argument('--enter', dest='enter_thresh', type=int, default=300)
ap.add_argument('--exit', dest='exit_thresh', type=int, default=200)
ap.add_argument('--quiet', dest='quiet_frames', type=int, default=3)
ap.add_argument('--min', dest='min_frames', type=int, default=5)
ap.add_argument('--max', dest='max_frames', type=int, default=25)
ap.add_argument('--extent-top', dest='extent_top', type=int, default=10)
ap.add_argument('--extent-bot', dest='extent_bot', type=int, default=85)
ap.add_argument('--min-traj', dest='min_traj', type=float, default=15.0)
ap.add_argument('--refractory', dest='refractory', type=int, default=15)
ap.add_argument('--ground-truth', type=int, default=0,
help='Total expected walks for accuracy calc')
ap.add_argument('-v', '--verbose', action='store_true',
help='Print every event end, including rejections')
args = ap.parse_args()
text = sys.stdin.read() if args.path == '-' else open(args.path).read()
det = Detector(args)
rejects = {}
for n, miny, maxy, c in parse_frames(text):
out = det.step(n, miny, maxy, c)
if out is None:
continue
reason, result = out
if result is None:
continue
kind, info = result
if kind == 'fire':
print(f' {info["kind"]:5} first={info["first"]:5.1f} '
f'min={info["min"]:5.1f} max={info["max"]:5.1f} '
f'last={info["last"]:5.1f} dur={info["dur"]:2d} '
f'exit={reason}')
else:
rejects[kind] = rejects.get(kind, 0) + 1
if args.verbose:
print(f' [drop {kind}]')
total = det.entries + det.exits
print(f'\n=== entries={det.entries} exits={det.exits} total={total} ===')
print(f'rejected events: {rejects}')
if args.ground_truth:
gt = args.ground_truth
acc = min(total, gt) / gt * 100
over = max(0, total - gt)
print(f'accuracy vs gt={gt}: {acc:.0f}% (over={over})')
if __name__ == '__main__':
main()