# server/ota_endpoint.py """ OTA firmware update endpoints. Deployment workflow: 1. Generate signing key (one-time): python tools/gen_signing_key.py → secrets/firmware_signing_key.pem (keep offline) → firmware/lib/ota_updater/ota_pubkey.h (commit this) 2. Build and deploy a new firmware version: pio run -e timercam # build python tools/deploy_firmware.py \\ firmware/.pio/build/timercam/firmware.bin 1.2.3 → server/firmware/ updated with current.bin, current.sig, manifest.json 3. Bump FW_VERSION in firmware/include/version.h before each release. 4. Register in server main app: from server.ota_endpoint import router as ota_router app.include_router(ota_router) Also uncomment Depends(verify_device_hmac) on both route handlers and confirm the HMAC format matches hmac.cpp: method + "\\n" + path + "\\n" + timestamp + "\\n" + sha256_hex(body) Note: HMAC auth is currently commented out on route handlers — must be wired before production use. verify_device_hmac must use the same format as hmac.cpp. """ import base64 import json from pathlib import Path from fastapi import APIRouter from fastapi.responses import FileResponse FIRMWARE_DIR = Path(__file__).parent / "firmware" router = APIRouter(prefix="/ota", tags=["ota"]) class FirmwareNotFoundError(Exception): pass def _parse_version(v: str) -> tuple: """Parse semver string to comparable tuple; returns (0,0,0) on malformed input.""" try: parts = v.strip().split(".") if len(parts) != 3: return (0, 0, 0) return tuple(int(x) for x in parts) except (ValueError, AttributeError): return (0, 0, 0) def ota_check_impl(current_version: str, firmware_dir: Path = FIRMWARE_DIR) -> dict: """ Compare device's current_version against staged manifest. Returns {"update": False} when no update is available or manifest is missing. Returns full update payload when server version is strictly newer. """ manifest_path = firmware_dir / "manifest.json" if not manifest_path.exists(): return {"update": False} try: manifest = json.loads(manifest_path.read_text()) version = manifest["version"] size = manifest["size"] sha256 = manifest["sha256"] except (json.JSONDecodeError, KeyError): return {"update": False} if _parse_version(version) <= _parse_version(current_version): return {"update": False} sig_path = firmware_dir / "current.sig" if not sig_path.exists(): return {"update": False} sig_b64 = base64.b64encode(sig_path.read_bytes()).decode() return { "update": True, "version": version, "size": size, "sha256": sha256, "sig_b64": sig_b64, } def ota_firmware_impl(firmware_dir: Path = FIRMWARE_DIR) -> bytes: """ Return raw firmware binary bytes. Raises FirmwareNotFoundError if current.bin is absent. """ bin_path = firmware_dir / "current.bin" if not bin_path.exists(): raise FirmwareNotFoundError("No firmware staged") return bin_path.read_bytes() @router.get("/check") async def ota_check( version: str, # device_id: str = Depends(verify_device_hmac), # uncomment when wiring into app ): """Check whether a firmware update is available for the given device version.""" return ota_check_impl(current_version=version) @router.get("/firmware") async def ota_firmware( # device_id: str = Depends(verify_device_hmac), # uncomment when wiring into app ): """Stream the staged firmware binary to the device.""" from fastapi import HTTPException bin_path = FIRMWARE_DIR / "current.bin" if not bin_path.exists(): raise HTTPException(status_code=404, detail="No firmware available") return FileResponse(bin_path, media_type="application/octet-stream")