Files
DoorCounter/server/ota_endpoint.py
2026-05-11 06:54:26 -07:00

108 lines
3.3 KiB
Python

# server/ota_endpoint.py
"""
OTA firmware update endpoints.
To register in the server main app:
from server.ota_endpoint import router as ota_router
app.include_router(ota_router)
Route handlers have HMAC auth commented out pending import-path confirmation:
from .main import verify_device_hmac # adjust to actual module
Then uncomment the Depends lines in ota_check() and ota_firmware().
Firmware artifacts expected under FIRMWARE_DIR (default: server/firmware/):
current.bin — raw firmware binary
current.sig — 64-byte r‖s Ed25519/ECDSA signature
manifest.json — {"version": "X.Y.Z", "size": N, "sha256": "hex..."}
"""
import base64
import json
from pathlib import Path
from fastapi import APIRouter
from fastapi.responses import FileResponse
FIRMWARE_DIR = Path(__file__).parent / "firmware"
router = APIRouter(prefix="/ota", tags=["ota"])
class FirmwareNotFoundError(Exception):
pass
def _parse_version(v: str) -> tuple:
"""Parse semver string to comparable tuple; returns (0,0,0) on malformed input."""
try:
parts = v.strip().split(".")
return tuple(int(x) for x in parts)
except (ValueError, AttributeError):
return (0, 0, 0)
def ota_check_impl(current_version: str, firmware_dir: Path = FIRMWARE_DIR) -> dict:
"""
Compare device's current_version against staged manifest.
Returns {"update": False} when no update is available or manifest is missing.
Returns full update payload when server version is strictly newer.
"""
manifest_path = firmware_dir / "manifest.json"
if not manifest_path.exists():
return {"update": False}
try:
manifest = json.loads(manifest_path.read_text())
version = manifest["version"]
size = manifest["size"]
sha256 = manifest["sha256"]
except (json.JSONDecodeError, KeyError):
return {"update": False}
if _parse_version(version) <= _parse_version(current_version):
return {"update": False}
sig_path = firmware_dir / "current.sig"
if not sig_path.exists():
return {"update": False}
sig_b64 = base64.b64encode(sig_path.read_bytes()).decode()
return {
"update": True,
"version": version,
"size": size,
"sha256": sha256,
"sig_b64": sig_b64,
}
def ota_firmware_impl(firmware_dir: Path = FIRMWARE_DIR) -> bytes:
"""
Return raw firmware binary bytes.
Raises FirmwareNotFoundError if current.bin is absent.
"""
bin_path = firmware_dir / "current.bin"
if not bin_path.exists():
raise FirmwareNotFoundError("No firmware staged")
return bin_path.read_bytes()
@router.get("/check")
async def ota_check(
version: str,
# device_id: str = Depends(verify_device_hmac), # uncomment when wiring into app
):
"""Check whether a firmware update is available for the given device version."""
return ota_check_impl(current_version=version)
@router.get("/firmware")
async def ota_firmware(
# device_id: str = Depends(verify_device_hmac), # uncomment when wiring into app
):
"""Stream the staged firmware binary to the device."""
from fastapi import HTTPException
bin_path = FIRMWARE_DIR / "current.bin"
if not bin_path.exists():
raise HTTPException(status_code=404, detail="No firmware available")
return FileResponse(bin_path, media_type="application/octet-stream")