diff --git a/tools/ota_push.py b/tools/ota_push.py new file mode 100755 index 0000000..c80bf06 --- /dev/null +++ b/tools/ota_push.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python3 +""" +ota_push.py — Push firmware OTA to a TimerCamera-F device via Arduino OTA. + +Requires: Python 3.8+, no extra pip packages needed +Device must be connected to WiFi and reachable on the local network. + +Usage: + python ota_push.py \\ + --host dc-0042.local \\ + --firmware firmware/.pio/build/timercam/firmware.bin +""" +import argparse +import hashlib +import os +import socket +import sys +import time + + +OTA_PORT = 3232 + + +def compute_md5(path: str) -> str: + h = hashlib.md5() + with open(path, "rb") as f: + while chunk := f.read(8192): + h.update(chunk) + return h.hexdigest() + + +def resolve_host(host: str, timeout: float = 10.0) -> str: + """Resolve hostname to IP, retrying until timeout.""" + deadline = time.monotonic() + timeout + last_err = None + while time.monotonic() < deadline: + try: + return socket.gethostbyname(host) + except socket.gaierror as e: + last_err = e + time.sleep(0.5) + raise RuntimeError(f"Could not resolve {host!r} within {timeout:.0f}s: {last_err}") + + +def push_ota(host: str, firmware_path: str) -> None: + size = os.path.getsize(firmware_path) + md5 = compute_md5(firmware_path) + + print(f"Resolving {host} ...") + ip = resolve_host(host) + print(f" → {ip}") + + print(f"Connecting to {ip}:{OTA_PORT} ...") + with socket.create_connection((ip, OTA_PORT), timeout=15) as sock: + # Arduino OTA handshake: "0::\n" + header = f"0:{size}:{md5}\n".encode() + sock.sendall(header) + + sock.settimeout(10) + resp = sock.recv(64).decode(errors="replace").strip() + if resp != "OK": + raise RuntimeError(f"OTA handshake rejected: {resp!r}") + + print(f"Sending {size:,} bytes ...") + sent = 0 + with open(firmware_path, "rb") as f: + while chunk := f.read(4096): + sock.sendall(chunk) + sent += len(chunk) + pct = sent * 100 // size + print(f"\r {pct:3d}% [{sent:,}/{size:,}]", end="", flush=True) + print() + + sock.settimeout(30) + final = sock.recv(64).decode(errors="replace").strip() + if final != "OK": + raise RuntimeError(f"OTA write failed: {final!r}") + + print(f"✓ OTA complete — {host} is rebooting") + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Push OTA firmware update to a door counter device") + parser.add_argument("--host", required=True, + help="mDNS hostname or IP, e.g. dc-0042.local") + parser.add_argument("--firmware", required=True, + help="Path to .bin firmware file") + args = parser.parse_args() + + if not os.path.exists(args.firmware): + print(f"Error: firmware file not found: {args.firmware}", file=sys.stderr) + sys.exit(1) + + try: + push_ota(args.host, args.firmware) + except (RuntimeError, OSError) as e: + print(f"\nError: {e}", file=sys.stderr) + sys.exit(1) + + +if __name__ == "__main__": + main()