# server/ota_endpoint.py """ OTA firmware update endpoints. To register in the server main app: from server.ota_endpoint import router as ota_router app.include_router(ota_router) Route handlers have HMAC auth commented out pending import-path confirmation: from .main import verify_device_hmac # adjust to actual module Then uncomment the Depends lines in ota_check() and ota_firmware(). Firmware artifacts expected under FIRMWARE_DIR (default: server/firmware/): current.bin — raw firmware binary current.sig — 64-byte r‖s Ed25519/ECDSA signature manifest.json — {"version": "X.Y.Z", "size": N, "sha256": "hex..."} """ import base64 import json from pathlib import Path from fastapi import APIRouter from fastapi.responses import FileResponse FIRMWARE_DIR = Path(__file__).parent / "firmware" router = APIRouter(prefix="/ota", tags=["ota"]) class FirmwareNotFoundError(Exception): pass def _parse_version(v: str) -> tuple: """Parse semver string to comparable tuple; returns (0,0,0) on malformed input.""" try: parts = v.strip().split(".") return tuple(int(x) for x in parts) except (ValueError, AttributeError): return (0, 0, 0) def ota_check_impl(current_version: str, firmware_dir: Path = FIRMWARE_DIR) -> dict: """ Compare device's current_version against staged manifest. Returns {"update": False} when no update is available or manifest is missing. Returns full update payload when server version is strictly newer. """ manifest_path = firmware_dir / "manifest.json" if not manifest_path.exists(): return {"update": False} try: manifest = json.loads(manifest_path.read_text()) version = manifest["version"] size = manifest["size"] sha256 = manifest["sha256"] except (json.JSONDecodeError, KeyError): return {"update": False} if _parse_version(version) <= _parse_version(current_version): return {"update": False} sig_path = firmware_dir / "current.sig" if not sig_path.exists(): return {"update": False} sig_b64 = base64.b64encode(sig_path.read_bytes()).decode() return { "update": True, "version": version, "size": size, "sha256": sha256, "sig_b64": sig_b64, } def ota_firmware_impl(firmware_dir: Path = FIRMWARE_DIR) -> bytes: """ Return raw firmware binary bytes. Raises FirmwareNotFoundError if current.bin is absent. """ bin_path = firmware_dir / "current.bin" if not bin_path.exists(): raise FirmwareNotFoundError("No firmware staged") return bin_path.read_bytes() @router.get("/check") async def ota_check( version: str, # device_id: str = Depends(verify_device_hmac), # uncomment when wiring into app ): """Check whether a firmware update is available for the given device version.""" return ota_check_impl(current_version=version) @router.get("/firmware") async def ota_firmware( # device_id: str = Depends(verify_device_hmac), # uncomment when wiring into app ): """Stream the staged firmware binary to the device.""" from fastapi import HTTPException bin_path = FIRMWARE_DIR / "current.bin" if not bin_path.exists(): raise HTTPException(status_code=404, detail="No firmware available") return FileResponse(bin_path, media_type="application/octet-stream")