#!/usr/bin/env python3 """ ota_push.py — Push firmware OTA to a TimerCamera-F device via Arduino OTA. Requires: Python 3.8+, no extra pip packages needed Device must be connected to WiFi and reachable on the local network. Usage: python ota_push.py \\ --host dc-0042.local \\ --firmware firmware/.pio/build/timercam/firmware.bin """ import argparse import hashlib import os import socket import sys import time OTA_PORT = 3232 def compute_md5(path: str) -> str: h = hashlib.md5() with open(path, "rb") as f: while chunk := f.read(8192): h.update(chunk) return h.hexdigest() def resolve_host(host: str, timeout: float = 10.0) -> str: """Resolve hostname to IP, retrying until timeout.""" deadline = time.monotonic() + timeout last_err = None while time.monotonic() < deadline: try: return socket.gethostbyname(host) except socket.gaierror as e: last_err = e time.sleep(0.5) raise RuntimeError(f"Could not resolve {host!r} within {timeout:.0f}s: {last_err}") def push_ota(host: str, firmware_path: str) -> None: size = os.path.getsize(firmware_path) md5 = compute_md5(firmware_path) print(f"Resolving {host} ...") ip = resolve_host(host) print(f" → {ip}") print(f"Connecting to {ip}:{OTA_PORT} ...") with socket.create_connection((ip, OTA_PORT), timeout=15) as sock: # Arduino OTA handshake: "0::\n" header = f"0:{size}:{md5}\n".encode() sock.sendall(header) sock.settimeout(10) resp = sock.recv(64).decode(errors="replace").strip() if resp != "OK": raise RuntimeError(f"OTA handshake rejected: {resp!r}") print(f"Sending {size:,} bytes ...") sent = 0 with open(firmware_path, "rb") as f: while chunk := f.read(4096): sock.sendall(chunk) sent += len(chunk) pct = sent * 100 // size print(f"\r {pct:3d}% [{sent:,}/{size:,}]", end="", flush=True) print() sock.settimeout(30) final = sock.recv(64).decode(errors="replace").strip() if final != "OK": raise RuntimeError(f"OTA write failed: {final!r}") print(f"✓ OTA complete — {host} is rebooting") def main() -> None: parser = argparse.ArgumentParser( description="Push OTA firmware update to a door counter device") parser.add_argument("--host", required=True, help="mDNS hostname or IP, e.g. dc-0042.local") parser.add_argument("--firmware", required=True, help="Path to .bin firmware file") args = parser.parse_args() if not os.path.exists(args.firmware): print(f"Error: firmware file not found: {args.firmware}", file=sys.stderr) sys.exit(1) try: push_ota(args.host, args.firmware) except (RuntimeError, OSError) as e: print(f"\nError: {e}", file=sys.stderr) sys.exit(1) if __name__ == "__main__": main()