feat: ota_push.py operator firmware update script

This commit is contained in:
2026-04-14 10:28:28 -07:00
parent 36f4becbe9
commit 883b72be77

103
tools/ota_push.py Executable file
View File

@@ -0,0 +1,103 @@
#!/usr/bin/env python3
"""
ota_push.py — Push firmware OTA to a TimerCamera-F device via Arduino OTA.
Requires: Python 3.8+, no extra pip packages needed
Device must be connected to WiFi and reachable on the local network.
Usage:
python ota_push.py \\
--host dc-0042.local \\
--firmware firmware/.pio/build/timercam/firmware.bin
"""
import argparse
import hashlib
import os
import socket
import sys
import time
OTA_PORT = 3232
def compute_md5(path: str) -> str:
h = hashlib.md5()
with open(path, "rb") as f:
while chunk := f.read(8192):
h.update(chunk)
return h.hexdigest()
def resolve_host(host: str, timeout: float = 10.0) -> str:
"""Resolve hostname to IP, retrying until timeout."""
deadline = time.monotonic() + timeout
last_err = None
while time.monotonic() < deadline:
try:
return socket.gethostbyname(host)
except socket.gaierror as e:
last_err = e
time.sleep(0.5)
raise RuntimeError(f"Could not resolve {host!r} within {timeout:.0f}s: {last_err}")
def push_ota(host: str, firmware_path: str) -> None:
size = os.path.getsize(firmware_path)
md5 = compute_md5(firmware_path)
print(f"Resolving {host} ...")
ip = resolve_host(host)
print(f"{ip}")
print(f"Connecting to {ip}:{OTA_PORT} ...")
with socket.create_connection((ip, OTA_PORT), timeout=15) as sock:
# Arduino OTA handshake: "0:<size>:<md5>\n"
header = f"0:{size}:{md5}\n".encode()
sock.sendall(header)
sock.settimeout(10)
resp = sock.recv(64).decode(errors="replace").strip()
if resp != "OK":
raise RuntimeError(f"OTA handshake rejected: {resp!r}")
print(f"Sending {size:,} bytes ...")
sent = 0
with open(firmware_path, "rb") as f:
while chunk := f.read(4096):
sock.sendall(chunk)
sent += len(chunk)
pct = sent * 100 // size
print(f"\r {pct:3d}% [{sent:,}/{size:,}]", end="", flush=True)
print()
sock.settimeout(30)
final = sock.recv(64).decode(errors="replace").strip()
if final != "OK":
raise RuntimeError(f"OTA write failed: {final!r}")
print(f"✓ OTA complete — {host} is rebooting")
def main() -> None:
parser = argparse.ArgumentParser(
description="Push OTA firmware update to a door counter device")
parser.add_argument("--host", required=True,
help="mDNS hostname or IP, e.g. dc-0042.local")
parser.add_argument("--firmware", required=True,
help="Path to .bin firmware file")
args = parser.parse_args()
if not os.path.exists(args.firmware):
print(f"Error: firmware file not found: {args.firmware}", file=sys.stderr)
sys.exit(1)
try:
push_ota(args.host, args.firmware)
except (RuntimeError, OSError) as e:
print(f"\nError: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()