Replace per-track line-crossing counter with a single event state machine
gated by foreground pixel count (ENTER=250, EXIT=150) and finalized by
quiet-exit or timeout. Direction inferred from centroid excursion
(up_score vs down_score) on quiet-exit fires, and from net displacement
(last_c vs first_c) on timeout fires.
Tuning reflects bench data at the intended 7' overhead mount: walkers
produce smaller centroid excursions than originally modelled, so
EXTENT gates, MIN_TRAJ, MAX_FRAMES and REFRACTORY were all relaxed from
their initial guesses. Constants and rationale live in firmware/lib/cv/cv.h.
Bench results (8 isolated walks, 4 entries + 4 exits):
* Event detection: 8/8 (100%)
* Aggregate entries+exits split: 4+4 (matches)
* Per-walk direction labelling: 4/8 (~50%)
Document explicitly that per-walk direction is unreliable at this mount
and that downstream analytics should trust only gross traffic
(entries + exits). Recovering direction would require a physical mount
change or a richer signal; both are out of scope for v1.
Tooling:
* tools/replay_logs.py — replay event state machine against captured
[F] diagnostic lines, for offline tuning without flash-test loops.
* firmware/src/main_capture.cpp + tools/capture_frames.py +
tools/replay_frames.py — raw-frame capture firmware and Python port
of the detector, kept in tree for future iteration even though the
TimerCamera-F serial driver stripped specific byte ranges in testing
and log-based replay became the working path.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
106 lines
3.0 KiB
Python
106 lines
3.0 KiB
Python
#!/usr/bin/env python3
|
|
# tools/capture_frames.py
|
|
#
|
|
# Read framed 96x96 grayscale frames from the capture-mode firmware over serial
|
|
# and write them to a .bin file for offline replay.
|
|
#
|
|
# Wire format per frame (little-endian):
|
|
# magic u32 0xDC0FC0DE
|
|
# frame_ix u32
|
|
# millis u32
|
|
# pixels 9216 bytes
|
|
#
|
|
# Output file is the raw concatenation of frames (same layout as the wire),
|
|
# so replay_frames.py can stream it with identical parsing.
|
|
#
|
|
# Usage: python tools/capture_frames.py --port /dev/ttyUSB0 --out walk.bin --duration 60
|
|
|
|
import argparse
|
|
import serial
|
|
import struct
|
|
import sys
|
|
import time
|
|
|
|
MAGIC = 0x314D5246 # 'FRM1' — ascii bytes that survive the CH9102 stream
|
|
FRAME_PIXELS = 96 * 96
|
|
HEADER_LEN = 12
|
|
FRAME_LEN = HEADER_LEN + FRAME_PIXELS
|
|
|
|
|
|
def read_exact(ser, n):
|
|
buf = bytearray()
|
|
while len(buf) < n:
|
|
chunk = ser.read(n - len(buf))
|
|
if not chunk:
|
|
return None
|
|
buf.extend(chunk)
|
|
return bytes(buf)
|
|
|
|
|
|
def find_magic(ser):
|
|
"""Scan serial byte-by-byte until we see the 4-byte MAGIC."""
|
|
window = bytearray()
|
|
magic_bytes = struct.pack('<I', MAGIC)
|
|
while True:
|
|
b = ser.read(1)
|
|
if not b:
|
|
return False
|
|
window.extend(b)
|
|
if len(window) > 4:
|
|
del window[0]
|
|
if bytes(window) == magic_bytes:
|
|
return True
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument('--port', required=True)
|
|
ap.add_argument('--baud', type=int, default=460800)
|
|
ap.add_argument('--out', required=True)
|
|
ap.add_argument('--duration', type=float, default=60.0,
|
|
help='Seconds to capture (default 60)')
|
|
args = ap.parse_args()
|
|
|
|
ser = serial.Serial(args.port, args.baud, timeout=1.0)
|
|
print(f'# listening on {args.port} @ {args.baud} for {args.duration}s...',
|
|
file=sys.stderr)
|
|
|
|
# Drain boot banner lines.
|
|
deadline_banner = time.time() + 2.0
|
|
while time.time() < deadline_banner:
|
|
line = ser.readline()
|
|
if line.startswith(b'#'):
|
|
print(line.decode(errors='replace').rstrip(), file=sys.stderr)
|
|
if b'capture-mode' in line:
|
|
break
|
|
|
|
deadline = time.time() + args.duration
|
|
frames = 0
|
|
last_ix = None
|
|
dropped = 0
|
|
|
|
with open(args.out, 'wb') as f:
|
|
while time.time() < deadline:
|
|
if not find_magic(ser):
|
|
continue
|
|
body = read_exact(ser, 8 + FRAME_PIXELS)
|
|
if body is None:
|
|
break
|
|
frame_ix, ms = struct.unpack('<II', body[:8])
|
|
if last_ix is not None and frame_ix != last_ix + 1:
|
|
dropped += frame_ix - last_ix - 1
|
|
last_ix = frame_ix
|
|
f.write(struct.pack('<I', MAGIC))
|
|
f.write(body)
|
|
frames += 1
|
|
if frames % 25 == 0:
|
|
print(f'# {frames} frames, last ix={frame_ix} ms={ms} '
|
|
f'dropped={dropped}', file=sys.stderr)
|
|
|
|
print(f'# done: {frames} frames written to {args.out} '
|
|
f'({dropped} dropped)', file=sys.stderr)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|