#!/usr/bin/env python3 # tools/replay_logs.py # # Replay the event state machine against text serial logs captured from the # production firmware. Input lines of the form: # [F] n= y=.. c= # # Those four values are exactly what the firmware's event state machine # consumes — so we can iterate event-level params (thresholds, max_frames, # extent gates, trajectory cutoffs, refractory) offline without needing raw # frames or the device. # # Usage: # python tools/replay_logs.py walk.log # python tools/replay_logs.py walk.log --enter 250 --exit 100 --max 30 --min-traj 10 # cat walk.log | python tools/replay_logs.py - --ground-truth 12 import argparse import re import sys LINE_RE = re.compile( r'\[F\]\s+n=(?P\d+)\s+y=(?P-?\d+)\.\.(?P-?\d+)\s+c=(?P-?\d+\.\d+)' ) def parse_frames(text): """Yield (fg_count, min_y, max_y, centroid_y) per [F] line, in order.""" for line in text.splitlines(): m = LINE_RE.search(line) if not m: continue yield int(m['n']), int(m['miny']), int(m['maxy']), float(m['c']) class Detector: """Mirror of firmware event state machine. Only uses per-frame diagnostic values — the same inputs the firmware feeds it.""" def __init__(self, a): self.a = a self.ev = False self.ev_n = 0 self.ev_first = self.ev_last = -1.0 self.ev_min = 1e9 self.ev_max = -1.0 self.ev_miny = 1e9 self.ev_maxy = -1 self.ev_quiet = 0 self.last_fire = -10**9 self.ix = 0 self.entries = 0 self.exits = 0 self.fires = [] def _reset(self): self.ev = False self.ev_n = 0 self.ev_first = self.ev_last = -1.0 self.ev_min = 1e9; self.ev_max = -1.0 self.ev_miny = 1e9; self.ev_maxy = -1 self.ev_quiet = 0 def _finalize(self): a = self.a if self.ev_n < a.min_frames: return ('reject_short', None) if self.ev_miny > a.extent_top: return ('reject_extent_top', None) if self.ev_maxy < a.extent_bot: return ('reject_extent_bot', None) up = self.ev_first - self.ev_min down = self.ev_max - self.ev_first winning = max(up, down) if winning < a.min_traj: return ('reject_traj', None) timed_out = self.ev_n > a.max_frames if timed_out: is_entry = self.ev_last < self.ev_first else: is_entry = up >= down kind = 'ENTRY' if is_entry else 'EXIT' self.last_fire = self.ix info = dict(kind=kind, first=self.ev_first, min=self.ev_min, max=self.ev_max, last=self.ev_last, dur=self.ev_n, up=up, down=down, ix=self.ix) if is_entry: self.entries += 1 else: self.exits += 1 self.fires.append(info) return ('fire', info) def step(self, n, miny, maxy, c): self.ix += 1 a = self.a refractory = (self.ix - self.last_fire) < a.refractory if not self.ev: if not refractory and n >= a.enter_thresh: self.ev = True self.ev_n = 1 self.ev_first = self.ev_last = c self.ev_min = c; self.ev_max = c self.ev_miny = miny; self.ev_maxy = maxy self.ev_quiet = 0 return None self.ev_n += 1 if n > 0: self.ev_last = c if c < self.ev_min: self.ev_min = c if c > self.ev_max: self.ev_max = c if miny < self.ev_miny: self.ev_miny = miny if maxy > self.ev_maxy: self.ev_maxy = maxy ended = False if n < a.exit_thresh: self.ev_quiet += 1 if self.ev_quiet >= a.quiet_frames: ended = True reason = 'quiet' else: self.ev_quiet = 0 if self.ev_n > a.max_frames: ended = True reason = 'timeout' if ended: result = self._finalize() self._reset() return (reason, result) return None def main(): ap = argparse.ArgumentParser() ap.add_argument('path', help='log file, or - for stdin') ap.add_argument('--enter', dest='enter_thresh', type=int, default=300) ap.add_argument('--exit', dest='exit_thresh', type=int, default=200) ap.add_argument('--quiet', dest='quiet_frames', type=int, default=3) ap.add_argument('--min', dest='min_frames', type=int, default=5) ap.add_argument('--max', dest='max_frames', type=int, default=25) ap.add_argument('--extent-top', dest='extent_top', type=int, default=10) ap.add_argument('--extent-bot', dest='extent_bot', type=int, default=85) ap.add_argument('--min-traj', dest='min_traj', type=float, default=15.0) ap.add_argument('--refractory', dest='refractory', type=int, default=15) ap.add_argument('--ground-truth', type=int, default=0, help='Total expected walks for accuracy calc') ap.add_argument('-v', '--verbose', action='store_true', help='Print every event end, including rejections') args = ap.parse_args() text = sys.stdin.read() if args.path == '-' else open(args.path).read() det = Detector(args) rejects = {} for n, miny, maxy, c in parse_frames(text): out = det.step(n, miny, maxy, c) if out is None: continue reason, result = out if result is None: continue kind, info = result if kind == 'fire': print(f' {info["kind"]:5} first={info["first"]:5.1f} ' f'min={info["min"]:5.1f} max={info["max"]:5.1f} ' f'last={info["last"]:5.1f} dur={info["dur"]:2d} ' f'exit={reason}') else: rejects[kind] = rejects.get(kind, 0) + 1 if args.verbose: print(f' [drop {kind}]') total = det.entries + det.exits print(f'\n=== entries={det.entries} exits={det.exits} total={total} ===') print(f'rejected events: {rejects}') if args.ground_truth: gt = args.ground_truth acc = min(total, gt) / gt * 100 over = max(0, total - gt) print(f'accuracy vs gt={gt}: {acc:.0f}% (over={over})') if __name__ == '__main__': main()