16 Commits

Author SHA1 Message Date
d2c2d97fb7 feat(ota): harden OTA apply flow + bump firmware to 1.0.1
End-to-end OTA verified on dc-0002 after resolving server-side schema
mismatch (server now emits update/size/sig_b64 alongside existing fields).

Firmware changes:
- Bump FW_VERSION 1.0.0 -> 1.0.1
- Replace log_i/w/e with Serial.printf in ota_updater so output appears
  regardless of CORE_DEBUG_LEVEL (the prior macros were silent in prod)
- Log partition labels/offsets, per-128KB progress, computed sha256,
  HTTP errors with body, esp_ota_* errors by name, Content-Length vs
  expected size
- Check esp_ota_write return value (previously ignored -- silent
  partition corruption on write failure) and abort cleanly on error
- Reject update if expected_size > target partition size
- Serial.flush() + 500ms delay before esp_restart() so the final log
  line escapes the UART
- Boot-time: log running partition label/offset/state + FW_VERSION,
  and call esp_ota_mark_app_valid_cancel_rollback() on PENDING_VERIFY
  to prevent silent rollback after a successful OTA

Docs:
- Rewrite docs/ota-deployment-status.md to reflect resolved state,
  document the schema fix and the .bin/.sig co-deploy invariant
2026-05-14 12:21:52 -07:00
5ec678dfa3 fix: tighten version parsing, propagate HMAC sign failure, add deployment docs
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
2026-05-11 11:26:44 -07:00
5cf122b922 feat(firmware): wire OTA updater into main loop with 6-hour polling task
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
2026-05-11 11:22:29 -07:00
a21dcfa349 feat(firmware): implement OTA download, ECDSA verify, and flash
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
2026-05-11 11:18:44 -07:00
66e6808e13 feat(firmware): implement ECDSA P-256 signature verification in OTA library
Replaces placeholder ota_verify_signature_with_key with real mbedtls
ECDSA verify; adds 4-case native test suite with generated P-256 vectors.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
2026-05-11 11:15:52 -07:00
8b1fd10db7 feat(firmware): add OTA updater library skeleton with version comparison
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
2026-05-11 06:59:02 -07:00
f37e0d6b07 feat(tools): add firmware deploy tool (sign + stage for server) 2026-05-11 06:55:44 -07:00
81bcc12f2f fix(server): add error handling for malformed OTA manifest and missing sig file
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
2026-05-11 06:54:26 -07:00
d9a242a5fa feat(server): add OTA check and firmware download endpoints
Implements /ota/check (version comparison + sig_b64 payload) and
/ota/firmware (binary stream) using the same _impl pattern as
camera_endpoint.py. HMAC auth left commented pending main app wiring.
6/6 tests passing.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
2026-05-11 06:52:46 -07:00
87b30a64b2 fix(tools): add key type validation and tighten test assertions in sign_firmware
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
2026-05-11 06:50:51 -07:00
031426e364 feat(tools): add ECDSA P-256 firmware signing tool 2026-05-11 06:49:15 -07:00
437f73739f feat(tools): add ECDSA P-256 key generation tool and public key header
Generates firmware signing keypair; private key stays in gitignored
secrets/, public key written as 65-byte C array to
firmware/lib/ota_updater/ota_pubkey.h for compile-time OTA verification.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
2026-05-11 06:47:10 -07:00
21a3c646aa docs(firmware): document FW_VERSION format constraint for OTA version compare 2026-05-11 06:45:53 -07:00
81dc96b100 feat(firmware): add FW_VERSION constant 2026-05-11 06:44:59 -07:00
56fc58b843 fix(tools): reject CSV metacharacters in flash_device.py inputs
device-id, location-id, wifi-ssid, and wifi-password were interpolated
directly into the NVS partition CSV. A value containing comma, double
quote, CR, or LF would split the field/row and silently provision the
wrong NVS keys — easiest concrete failure: a Wi-Fi password containing
a comma. Validate operator-supplied strings before generating the CSV.

Add an empty tools/__init__.py so the regression tests can import the
helper as 'tools.flash_device' (matches the existing 'server.*' test
pattern).

Found via adversarial review (run 2026-05-01-192928, gpt-5.5 reviewer).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 15:44:57 -07:00
641ab29277 fix(server): reject inverted period_start/period_end in CameraRecord
A misbehaving or clock-broken device could submit period_end <=
period_start, polluting the camera_records table with zero-length or
inverted windows that corrupt downstream hourly analytics. Add a
Pydantic model_validator so the request is rejected at the API
boundary instead of silently persisting bad ranges.

Found via adversarial review (run 2026-05-01-191359, both reviewers).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 15:44:57 -07:00
23 changed files with 1198 additions and 1 deletions

2
.gitignore vendored
View File

@@ -7,3 +7,5 @@ firmware/.pio/
*.log
*secret*
__pycache__/
secrets/
server/firmware/

View File

@@ -0,0 +1,88 @@
# OTA Deployment — Status
## Current state (2026-05-14)
**End-to-end OTA verified working on `dc-0002`.** Device polled `engagement-api-1`, received a signed manifest, downloaded and verified firmware 1.0.1, set the alternate boot partition, rebooted, and came up reporting `fw=1.0.1`.
## What's deployed
- **Branch `feat/pull-ota-code-signing`** merged to `main` (13 commits, 17 new files, 936 LOC).
- **Signing toolchain**: `tools/gen_signing_key.py`, `tools/sign_firmware.py`, `tools/deploy_firmware.py`.
- **Firmware OTA library**: `firmware/lib/ota_updater/`.
- **Signing key**: `secrets/firmware_signing_key.pem` (gitignored). Public key committed at `firmware/lib/ota_updater/ota_pubkey.h`.
- **Live OTA handler**: served by `engagement-api-1` Docker service (source not in this repo). The stub at `server/ota_endpoint.py` is unwired and not the one responding to devices.
- **Configurable poll interval** via NVS key `ota_interval`. Provision with `flash_device.py --ota-interval-seconds N`. Min 10 s, default 21600 (6 h).
## Issues resolved
### 1. HMAC format mismatch (resolved 2026-05-13)
Firmware OTA updater was using `X-HMAC-Signature` header + `millis()`-derived timestamp; the reporter component used `X-Signature` + `time(nullptr)`. Server expected the reporter format. Fixed by aligning the OTA updater to the same canonical scheme as the reporter (`firmware/lib/ota_updater/ota_updater.cpp` `add_hmac_headers`).
### 2. `/ota/check` JSON schema mismatch (resolved 2026-05-14)
Server was emitting `{update_available, sha256, url}` but firmware reads `{update, size, sig_b64}`. Device silently decided "up to date" every poll because `doc["update"]` defaulted to `false`. Fixed server-side: the `/ota/check` response now also includes the fields the firmware needs. Firmware schema remains the source of truth.
### 3. Signed firmware artifact pipeline (resolved 2026-05-14)
Deploy flow now bumps `FW_VERSION` → builds → copies `.pio/build/timercam/firmware.bin` to `firmware-<version>.bin` → signs with `tools/sign_firmware.py` → SCPs both `.bin` and `.bin.sig` to `root@nginx:/root/engagement-api/firmware/`. Server team updates `firmware_releases.sha256` to match the new binary.
**Gotcha:** the `.bin` and `.sig` must always be deployed together. The signature is over the bytes; replacing one without the other puts the server in an inconsistent state and devices will reject the update with `SIGNATURE INVALID`.
## Hardening added this session
### Firmware logging (`firmware/lib/ota_updater/ota_updater.cpp`, `firmware/src/main.cpp`)
The previous `log_i/w/e` macros were silenced by the default `CORE_DEBUG_LEVEL`. Replaced with `Serial.printf` so output appears regardless of log level. Now logs at every step:
- `[OTA] task started, interval=N ms`
- Per-tick WiFi status
- Full check URL + HMAC header preview (device id, ts, sig prefix)
- HTTP response code + error body on non-200
- JSON parse errors
- "Up to date" decision
- Partition labels and offsets (running + target)
- Per-128 KB download progress
- Total bytes + elapsed ms
- Computed sha256 of the downloaded image (compare against server `X-SHA256`)
- Signature verify result
- `esp_ota_end` / `esp_ota_set_boot_partition` errors by name
- 500 ms `Serial.flush()` + `delay()` before `esp_restart()` so the final log line escapes the UART
### Boot-time partition state (`firmware/src/main.cpp`)
Logs `running partition '<label>' (off=0x…) state=N fw=…` at every boot. If `state == ESP_OTA_IMG_PENDING_VERIFY` (3), calls `esp_ota_mark_app_valid_cancel_rollback()` to prevent the bootloader from reverting on the next reboot. Harmless no-op when rollback isn't enabled, but eliminates a class of silent OTA failures.
### `esp_ota_write` return value (`firmware/lib/ota_updater/ota_updater.cpp`)
Previously ignored — a failed write would silently corrupt the new partition and the device would still try to boot from it. Now checked, aborts the OTA cleanly, and logs the failing offset.
### Partition size pre-check
Reject the update before `esp_ota_begin` if `expected_size > target->size`.
## Verifying a deployment
After a server push, watch the device's serial output on the next OTA tick:
```
[OTA] tick: WiFi connected, running check
[OTA] check → GET http://logs.research.bike:80/ota/check?version=X.Y.Z
[OTA] check response: HTTP 200
[OTA] Update: X.Y.Z → A.B.C (N bytes)
[OTA] running='app0' (off=…), target='app1' (off=…)
[OTA] progress: N/N bytes
[OTA] sha256(image)=<hex> ← must match server X-SHA256
[OTA] signature OK
[OTA] boot partition set to 'app1' — rebooting in 500 ms
```
Then on reboot:
```
[BOOT] running partition 'app1' (off=…) state=N fw=A.B.C
```
The `fw=A.B.C` line is the success signal — it reflects the `FW_VERSION` macro baked into the freshly-booted image, not just what the device claims to be running.
## Quick reference
- Plan: `docs/superpowers/plans/2026-05-10-pull-ota-code-signing.md`
- Firmware version: `firmware/include/version.h`
- OTA library: `firmware/lib/ota_updater/`
- HMAC implementation: `firmware/lib/hmac/hmac.cpp`
- Provisioning tool: `tools/flash_device.py`
- Signing tools: `tools/gen_signing_key.py`, `tools/sign_firmware.py`, `tools/deploy_firmware.py`
- Server deploy path: `root@nginx:/root/engagement-api/firmware/` (per server team runbook)

View File

@@ -0,0 +1,3 @@
#pragma once
// Format: MAJOR.MINOR.PATCH (SemVer) — OTA version compare uses sscanf("%d.%d.%d")
#define FW_VERSION "1.0.1"

View File

@@ -0,0 +1,6 @@
{
"name": "ota_updater",
"build": {
"flags": ["-I$PROJECT_INCLUDE_DIR"]
}
}

View File

@@ -0,0 +1,4 @@
#pragma once
// Auto-generated by tools/gen_signing_key.py — DO NOT EDIT
// ECDSA P-256 public key, uncompressed X9.62 (04 || X || Y)
static const uint8_t kOtaPublicKey[65] = {0x04, 0x1c, 0x92, 0x43, 0x23, 0xe9, 0xac, 0xd1, 0xe8, 0x05, 0x32, 0x49, 0x39, 0x12, 0x95, 0xb2, 0x0a, 0x3e, 0xfb, 0x9d, 0xdf, 0xee, 0xd1, 0x98, 0x87, 0x97, 0xa3, 0xb8, 0xcb, 0x2b, 0xa6, 0x06, 0xe0, 0x83, 0x32, 0x71, 0xd2, 0x5f, 0x80, 0x40, 0x68, 0xcd, 0x00, 0xe5, 0x0e, 0xba, 0x13, 0xf6, 0x97, 0x43, 0x6f, 0xe6, 0x4f, 0xd0, 0x95, 0x53, 0x0e, 0xd7, 0x9a, 0x8a, 0x2e, 0x25, 0x52, 0xb4, 0xaf};

View File

@@ -0,0 +1,319 @@
// firmware/lib/ota_updater/ota_updater.cpp
#include "ota_updater.h"
#include <stdio.h>
#include <string.h>
#include <mbedtls/ecdsa.h>
#include <mbedtls/ecp.h>
#include <mbedtls/bignum.h>
// ── version comparison ─────────────────────────────────────────────────────
bool ota_version_newer(const char* current, const char* remote) {
int ca = 0, cb = 0, cc = 0;
int ra = 0, rb = 0, rc = 0;
if (sscanf(current, "%d.%d.%d", &ca, &cb, &cc) != 3) return false;
if (sscanf(remote, "%d.%d.%d", &ra, &rb, &rc) != 3) return false;
if (ra != ca) return ra > ca;
if (rb != cb) return rb > cb;
return rc > cc;
}
// ── signature verification ─────────────────────────────────────────────────
bool ota_verify_signature_with_key(const uint8_t hash32[32], const uint8_t sig64[64],
const uint8_t pubkey65[65]) {
mbedtls_ecp_group grp;
mbedtls_ecp_point Q;
mbedtls_mpi r, s;
mbedtls_ecp_group_init(&grp);
mbedtls_ecp_point_init(&Q);
mbedtls_mpi_init(&r);
mbedtls_mpi_init(&s);
bool ok = false;
if (mbedtls_ecp_group_load(&grp, MBEDTLS_ECP_DP_SECP256R1) == 0 &&
mbedtls_ecp_point_read_binary(&grp, &Q, pubkey65, 65) == 0 &&
mbedtls_mpi_read_binary(&r, sig64, 32) == 0 &&
mbedtls_mpi_read_binary(&s, sig64 + 32, 32) == 0 &&
mbedtls_ecdsa_verify(&grp, hash32, 32, &Q, &r, &s) == 0) {
ok = true;
}
mbedtls_ecp_group_free(&grp);
mbedtls_ecp_point_free(&Q);
mbedtls_mpi_free(&r);
mbedtls_mpi_free(&s);
return ok;
}
// ── device-only code ───────────────────────────────────────────────────────
#ifndef NATIVE_TEST
#include <Arduino.h>
#include <time.h>
#include <HTTPClient.h>
#include <WiFi.h>
#include <ArduinoJson.h>
#include <esp_ota_ops.h>
#include <mbedtls/sha256.h>
#include <mbedtls/base64.h>
#include "hmac.h"
#include "ota_pubkey.h"
#include "version.h"
bool ota_verify_signature(const uint8_t hash32[32], const uint8_t sig64[64]) {
return ota_verify_signature_with_key(hash32, sig64, kOtaPublicKey);
}
static const char* s_server_base = nullptr;
static const char* s_device_id = nullptr;
static const char* s_hmac_secret = nullptr;
static uint32_t s_interval_ms = 21600000UL; // 6 h default
static uint32_t s_last_check_ms = 0;
void ota_updater_init(const char* server_base, const char* device_id,
const char* hmac_secret, uint32_t check_interval_ms) {
s_server_base = server_base;
s_device_id = device_id;
s_hmac_secret = hmac_secret;
s_interval_ms = check_interval_ms;
s_last_check_ms = 0; // force first check on next call
}
static bool add_hmac_headers(HTTPClient& http, const char* method, const char* path) {
uint32_t ts = (uint32_t)time(nullptr);
if (ts < 1700000000UL) {
Serial.printf("[OTA] Clock not synced (ts=%u) — skipping HMAC sign\n", (unsigned)ts);
return false;
}
String sig = hmac_sign(s_hmac_secret, method, path, ts, "");
if (sig.isEmpty()) {
Serial.println("[OTA] HMAC sign failed");
return false;
}
Serial.printf("[OTA] HMAC headers: device=%s ts=%u sig=%s...\n",
s_device_id, (unsigned)ts, sig.substring(0, 12).c_str());
http.addHeader("X-Device-Id", s_device_id);
http.addHeader("X-Timestamp", String(ts));
http.addHeader("X-Signature", sig);
return true;
}
static bool download_and_flash(const char* fw_url, size_t expected_size,
const uint8_t sig64[64]) {
const esp_partition_t* running = esp_ota_get_running_partition();
const esp_partition_t* target = esp_ota_get_next_update_partition(nullptr);
if (!target) {
Serial.println("[OTA] No update partition found");
return false;
}
Serial.printf("[OTA] running='%s' (off=0x%x sz=0x%x), target='%s' (off=0x%x sz=0x%x)\n",
running ? running->label : "?",
running ? (unsigned)running->address : 0,
running ? (unsigned)running->size : 0,
target->label,
(unsigned)target->address, (unsigned)target->size);
if (expected_size > target->size) {
Serial.printf("[OTA] image (%zu) larger than partition (%u)\n",
expected_size, (unsigned)target->size);
return false;
}
esp_ota_handle_t handle;
esp_err_t er = esp_ota_begin(target, OTA_WITH_SEQUENTIAL_WRITES, &handle);
if (er != ESP_OK) {
Serial.printf("[OTA] esp_ota_begin failed: %s\n", esp_err_to_name(er));
return false;
}
mbedtls_sha256_context sha_ctx;
mbedtls_sha256_init(&sha_ctx);
mbedtls_sha256_starts(&sha_ctx, 0);
HTTPClient http;
http.begin(fw_url);
http.setTimeout(30000);
if (!add_hmac_headers(http, "GET", "/ota/firmware")) {
Serial.println("[OTA] Aborting firmware download: HMAC sign failed");
mbedtls_sha256_free(&sha_ctx);
esp_ota_abort(handle);
return false;
}
Serial.printf("[OTA] downloading firmware: %s\n", fw_url);
int code = http.GET();
Serial.printf("[OTA] firmware response: HTTP %d\n", code);
if (code != HTTP_CODE_OK) {
String body = http.getString();
Serial.printf("[OTA] error body: %s\n", body.c_str());
http.end();
mbedtls_sha256_free(&sha_ctx);
esp_ota_abort(handle);
return false;
}
int content_len = http.getSize();
Serial.printf("[OTA] Content-Length: %d (expected %zu)\n",
content_len, expected_size);
WiFiClient* stream = http.getStreamPtr();
uint8_t buf[4096];
size_t written = 0;
size_t last_log_at = 0;
bool write_failed = false;
uint32_t start_ms = millis();
while (written < expected_size) {
size_t want = min((size_t)sizeof(buf), expected_size - written);
int got = stream->readBytes(buf, want);
if (got <= 0) {
Serial.printf("[OTA] stream ended at %zu/%zu bytes (readBytes=%d)\n",
written, expected_size, got);
break;
}
esp_err_t we = esp_ota_write(handle, buf, (size_t)got);
if (we != ESP_OK) {
Serial.printf("[OTA] esp_ota_write failed at offset %zu: %s\n",
written, esp_err_to_name(we));
write_failed = true;
break;
}
mbedtls_sha256_update(&sha_ctx, buf, (size_t)got);
written += (size_t)got;
if (written - last_log_at >= 131072 || written == expected_size) {
Serial.printf("[OTA] progress: %zu/%zu bytes\n", written, expected_size);
last_log_at = written;
}
}
uint32_t elapsed_ms = millis() - start_ms;
http.end();
Serial.printf("[OTA] download done: %zu bytes in %u ms\n",
written, (unsigned)elapsed_ms);
uint8_t hash[32];
mbedtls_sha256_finish(&sha_ctx, hash);
mbedtls_sha256_free(&sha_ctx);
char hex[65];
for (int i = 0; i < 32; i++) snprintf(hex + i*2, 3, "%02x", hash[i]);
Serial.printf("[OTA] sha256(image)=%s\n", hex);
if (write_failed) {
esp_ota_abort(handle);
return false;
}
if (written != expected_size) {
Serial.printf("[OTA] Download truncated (%zu/%zu bytes)\n", written, expected_size);
esp_ota_abort(handle);
return false;
}
if (!ota_verify_signature_with_key(hash, sig64, kOtaPublicKey)) {
Serial.println("[OTA] SIGNATURE INVALID — staying on current firmware");
esp_ota_abort(handle);
return false;
}
Serial.println("[OTA] signature OK");
esp_err_t end_err = esp_ota_end(handle);
if (end_err != ESP_OK) {
Serial.printf("[OTA] esp_ota_end failed: %s\n", esp_err_to_name(end_err));
return false;
}
esp_err_t boot_err = esp_ota_set_boot_partition(target);
if (boot_err != ESP_OK) {
Serial.printf("[OTA] esp_ota_set_boot_partition failed: %s\n",
esp_err_to_name(boot_err));
return false;
}
Serial.printf("[OTA] boot partition set to '%s' — rebooting in 500 ms\n",
target->label);
Serial.flush();
delay(500);
esp_restart();
return true; // unreachable
}
bool ota_updater_check_and_apply() {
if (!s_server_base || !s_device_id || !s_hmac_secret) {
Serial.println("[OTA] check skipped: updater not initialized");
return false;
}
if (s_last_check_ms != 0 &&
(uint32_t)(millis() - s_last_check_ms) < s_interval_ms) {
return false;
}
s_last_check_ms = millis();
if (WiFi.status() != WL_CONNECTED) {
Serial.printf("[OTA] check skipped: WiFi not connected (status=%d)\n",
WiFi.status());
return false;
}
char check_path[128];
snprintf(check_path, sizeof(check_path), "/ota/check?version=%s", FW_VERSION);
char check_url[256];
snprintf(check_url, sizeof(check_url), "%s%s", s_server_base, check_path);
Serial.printf("[OTA] check → GET %s (fw=%s)\n", check_url, FW_VERSION);
HTTPClient http;
if (!http.begin(check_url)) {
Serial.println("[OTA] http.begin() failed");
return false;
}
if (!add_hmac_headers(http, "GET", check_path)) {
Serial.println("[OTA] Aborting check: HMAC sign failed");
http.end();
return false;
}
int code = http.GET();
Serial.printf("[OTA] check response: HTTP %d\n", code);
if (code != HTTP_CODE_OK) {
String body = http.getString();
Serial.printf("[OTA] error body: %s\n", body.c_str());
http.end();
return false;
}
JsonDocument doc;
DeserializationError err = deserializeJson(doc, http.getStream());
http.end();
if (err) {
Serial.printf("[OTA] JSON parse error: %s\n", err.c_str());
return false;
}
if (!doc["update"].as<bool>()) {
Serial.printf("[OTA] Firmware up to date (%s)\n", FW_VERSION);
return false;
}
const char* remote_ver = doc["version"] | "";
size_t fw_size = doc["size"] | 0;
const char* sig_b64 = doc["sig_b64"] | "";
if (fw_size == 0 || strlen(sig_b64) == 0) {
log_e("[OTA] Invalid update manifest");
return false;
}
log_i("[OTA] Update: %s → %s (%zu bytes)", FW_VERSION, remote_ver, fw_size);
uint8_t sig64[64];
size_t sig_len = 0;
if (mbedtls_base64_decode(sig64, sizeof(sig64), &sig_len,
(const uint8_t*)sig_b64, strlen(sig_b64)) != 0 ||
sig_len != 64) {
log_e("[OTA] Bad signature encoding (len=%zu)", sig_len);
return false;
}
char fw_url[256];
snprintf(fw_url, sizeof(fw_url), "%s/ota/firmware", s_server_base);
return download_and_flash(fw_url, fw_size, sig64);
}
#endif // NATIVE_TEST

View File

@@ -0,0 +1,27 @@
// firmware/lib/ota_updater/ota_updater.h
#pragma once
#include <stdint.h>
#include <stddef.h>
#include <stdbool.h>
// One-time init. Call from setup() after WiFi is ready.
// server_base: e.g. "http://logs.research.bike:8000"
// check_interval_ms: milliseconds between polls (e.g. 6*3600*1000 = 21600000)
void ota_updater_init(const char* server_base,
const char* device_id,
const char* hmac_secret,
uint32_t check_interval_ms);
// Polls server; downloads, verifies, and flashes if newer version available.
// Returns true if update was applied (device reboots before returning false path).
// Safe to call from any task; blocks during download.
bool ota_updater_check_and_apply();
// Exposed for unit testing — pass an arbitrary 65-byte uncompressed P-256 pubkey.
bool ota_version_newer(const char* current, const char* remote);
bool ota_verify_signature_with_key(const uint8_t hash32[32], const uint8_t sig64[64],
const uint8_t pubkey65[65]);
// Production wrapper — uses the compiled-in kOtaPublicKey from ota_pubkey.h.
// Not callable from native tests (requires ota_pubkey.h / device build).
bool ota_verify_signature(const uint8_t hash32[32], const uint8_t sig64[64]);

View File

@@ -10,8 +10,11 @@
#include "reporter.h"
#include "event_log.h"
#include "net_guard.h"
#include "version.h"
#include "ota_updater.h"
#include <esp_system.h>
#include <esp_task_wdt.h>
#include <esp_ota_ops.h>
// LED on GPIO2 (TimerCamera-F built-in LED) — verify against board schematic
// Factory reset: hold GPIO37 (BOOT button) for 5 seconds
@@ -93,6 +96,22 @@ static void task_camera(void*) {
}
}
static void ota_task(void*) {
// Min 10s to avoid pathological fast loops if NVS is corrupted
uint32_t interval_ms = g_cfg.ota_interval_s < 10 ? 10000UL : g_cfg.ota_interval_s * 1000UL;
Serial.printf("[OTA] task started, interval=%u ms\n", (unsigned)interval_ms);
for (;;) {
if (WiFi.isConnected()) {
Serial.println("[OTA] tick: WiFi connected, running check");
ota_updater_check_and_apply();
} else {
Serial.printf("[OTA] tick: WiFi not connected (status=%d), skipping\n",
WiFi.status());
}
vTaskDelay(pdMS_TO_TICKS(interval_ms));
}
}
// Hourly reporter task — runs on core 0
static void task_reporter(void*) {
uint32_t last_report_ts = 0; // 0 = not initialized yet
@@ -175,6 +194,27 @@ void setup() {
pinMode(BUTTON_PIN, INPUT_PULLUP);
led_set(true); // on = booting
// OTA rollback guard: if booted from a freshly-flashed OTA image while the
// bootloader has rollback enabled, the image is PENDING_VERIFY and will be
// rolled back on the next reboot unless we mark it valid. Harmless no-op
// when rollback is disabled. Always log the running partition + state so
// we can see post-OTA boot behavior on serial.
{
const esp_partition_t* running = esp_ota_get_running_partition();
esp_ota_img_states_t state = ESP_OTA_IMG_UNDEFINED;
if (running) {
esp_ota_get_state_partition(running, &state);
Serial.printf("[BOOT] running partition '%s' (off=0x%x) state=%d fw=%s\n",
running->label, (unsigned)running->address,
(int)state, FW_VERSION);
}
if (state == ESP_OTA_IMG_PENDING_VERIFY) {
esp_err_t e = esp_ota_mark_app_valid_cancel_rollback();
Serial.printf("[BOOT] esp_ota_mark_app_valid_cancel_rollback: %s\n",
esp_err_to_name(e));
}
}
event_log_init();
event_log_write(EVT_BOOT, (uint16_t)esp_reset_reason(), 0);
@@ -269,6 +309,16 @@ void setup() {
xTaskCreatePinnedToCore(task_camera, "cam", 8192, nullptr, 2, nullptr, 1);
xTaskCreatePinnedToCore(task_reporter, "rep", 8192, nullptr, 1, nullptr, 0);
// static: ota_updater stores raw pointer; must outlive setup()
static String s_ota_base = String("http://") + REPORTER_API_HOST_NAME + ":" + REPORTER_API_PORT;
ota_updater_init(
s_ota_base.c_str(),
g_cfg.device_id.c_str(),
g_cfg.hmac_secret.c_str(),
g_cfg.ota_interval_s < 10 ? 10000UL : g_cfg.ota_interval_s * 1000UL
);
xTaskCreate(ota_task, "ota", 8192, nullptr, 1, nullptr);
}
void loop() {

View File

@@ -0,0 +1,44 @@
// firmware/test/test_ota/test_version.cpp
#include <unity.h>
// Pull in the function under test — include .cpp directly for native builds
// so we don't need a separate compilation unit per test.
#define NATIVE_TEST
#include "../../lib/ota_updater/ota_updater.cpp"
void setUp() {}
void tearDown() {}
void test_remote_newer_patch() {
TEST_ASSERT_TRUE(ota_version_newer("1.0.0", "1.0.1"));
}
void test_remote_newer_minor() {
TEST_ASSERT_TRUE(ota_version_newer("1.0.9", "1.1.0"));
}
void test_remote_newer_major() {
TEST_ASSERT_TRUE(ota_version_newer("0.9.9", "1.0.0"));
}
void test_same_version() {
TEST_ASSERT_FALSE(ota_version_newer("1.2.3", "1.2.3"));
}
void test_remote_older() {
TEST_ASSERT_FALSE(ota_version_newer("1.2.3", "1.2.2"));
}
void test_malformed_current() {
TEST_ASSERT_FALSE(ota_version_newer("bad", "1.0.0"));
}
void test_malformed_remote() {
TEST_ASSERT_FALSE(ota_version_newer("1.0.0", "bad"));
}
int main() {
UNITY_BEGIN();
RUN_TEST(test_remote_newer_patch);
RUN_TEST(test_remote_newer_minor);
RUN_TEST(test_remote_newer_major);
RUN_TEST(test_same_version);
RUN_TEST(test_remote_older);
RUN_TEST(test_malformed_current);
RUN_TEST(test_malformed_remote);
return UNITY_END();
}

View File

@@ -0,0 +1,62 @@
// firmware/test/test_ota_sig/test_sig_verify.cpp
#include <unity.h>
#include <string.h>
#define NATIVE_TEST
#include "../../lib/ota_updater/ota_updater.cpp"
// ── Test vectors generated by Python/cryptography (ECDSA P-256) ────────────
static const uint8_t TEST_PUBKEY[65] = {
0x04, 0x96, 0x18, 0x6c, 0x8b, 0xb2, 0xdf, 0xea, 0x3f, 0xe4, 0x75, 0x35, 0x0e, 0x8a, 0x3e, 0x7d,
0x49, 0x7f, 0x56, 0xb5, 0xb4, 0x1a, 0xae, 0x05, 0xa3, 0x10, 0x6f, 0x02, 0x43, 0x84, 0xb3, 0x1c,
0x1f, 0x44, 0xef, 0x08, 0x84, 0x57, 0xca, 0x6e, 0xd8, 0x19, 0x74, 0x10, 0x8d, 0x95, 0xcc, 0x8c,
0x61, 0x89, 0x56, 0xea, 0xbc, 0x0c, 0xa2, 0x54, 0xd7, 0x02, 0xf3, 0x1d, 0x67, 0x7c, 0xa5, 0xba,
0x42
};
static const uint8_t TEST_HASH[32] = {
0x0a, 0x7e, 0x5f, 0x6a, 0x4c, 0x72, 0x11, 0xb7, 0x14, 0x3f, 0x85, 0x59, 0x50, 0x61, 0x8a, 0xa1,
0xab, 0xee, 0x7b, 0x57, 0x08, 0x59, 0x56, 0x09, 0x6d, 0x18, 0xaf, 0x70, 0xe6, 0x6e, 0x6c, 0xa8
};
static const uint8_t TEST_SIG[64] = {
0x4f, 0xff, 0xc3, 0xc6, 0xd5, 0x04, 0x71, 0x37, 0x87, 0x8c, 0xe1, 0xe5, 0x79, 0xef, 0x59, 0x2a,
0x63, 0xde, 0xf6, 0x96, 0x3e, 0x8f, 0x90, 0x2f, 0x46, 0x1f, 0x1b, 0x8a, 0xd5, 0x94, 0xb8, 0x28,
0x80, 0xfa, 0xe4, 0x26, 0x14, 0xbf, 0x91, 0x54, 0xbf, 0xa6, 0x2f, 0x67, 0xf9, 0x97, 0x45, 0x3a,
0x0f, 0xdc, 0x66, 0xcd, 0x21, 0xb8, 0x91, 0xdb, 0xb9, 0xaa, 0x6b, 0x5d, 0x6c, 0xa5, 0xcb, 0x96
};
// ──────────────────────────────────────────────────────────────────────────
void setUp() {}
void tearDown() {}
void test_valid_signature_accepted() {
TEST_ASSERT_TRUE(ota_verify_signature_with_key(TEST_HASH, TEST_SIG, TEST_PUBKEY));
}
void test_corrupted_hash_rejected() {
uint8_t bad_hash[32];
memcpy(bad_hash, TEST_HASH, 32);
bad_hash[0] ^= 0xff;
TEST_ASSERT_FALSE(ota_verify_signature_with_key(bad_hash, TEST_SIG, TEST_PUBKEY));
}
void test_corrupted_signature_rejected() {
uint8_t bad_sig[64];
memcpy(bad_sig, TEST_SIG, 64);
bad_sig[0] ^= 0xff;
TEST_ASSERT_FALSE(ota_verify_signature_with_key(TEST_HASH, bad_sig, TEST_PUBKEY));
}
void test_zero_signature_rejected() {
uint8_t zero_sig[64] = {};
TEST_ASSERT_FALSE(ota_verify_signature_with_key(TEST_HASH, zero_sig, TEST_PUBKEY));
}
int main() {
UNITY_BEGIN();
RUN_TEST(test_valid_signature_accepted);
RUN_TEST(test_corrupted_hash_rejected);
RUN_TEST(test_corrupted_signature_rejected);
RUN_TEST(test_zero_signature_rejected);
return UNITY_END();
}

View File

@@ -11,7 +11,7 @@ import sqlite3
from typing import List
from fastapi import Depends
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, model_validator
class CameraRecord(BaseModel):
@@ -20,6 +20,12 @@ class CameraRecord(BaseModel):
entries: int = Field(ge=0)
exits: int = Field(ge=0)
@model_validator(mode="after")
def _period_order(self):
if self.period_end <= self.period_start:
raise ValueError("period_end must be strictly greater than period_start")
return self
class CameraEventsRequest(BaseModel):
location_id: str

120
server/ota_endpoint.py Normal file
View File

@@ -0,0 +1,120 @@
# server/ota_endpoint.py
"""
OTA firmware update endpoints.
Deployment workflow:
1. Generate signing key (one-time):
python tools/gen_signing_key.py
→ secrets/firmware_signing_key.pem (keep offline)
→ firmware/lib/ota_updater/ota_pubkey.h (commit this)
2. Build and deploy a new firmware version:
pio run -e timercam # build
python tools/deploy_firmware.py \\
firmware/.pio/build/timercam/firmware.bin 1.2.3
→ server/firmware/ updated with current.bin, current.sig, manifest.json
3. Bump FW_VERSION in firmware/include/version.h before each release.
4. Register in server main app:
from server.ota_endpoint import router as ota_router
app.include_router(ota_router)
Also uncomment Depends(verify_device_hmac) on both route handlers
and confirm the HMAC format matches hmac.cpp:
method + "\\n" + path + "\\n" + timestamp + "\\n" + sha256_hex(body)
Note: HMAC auth is currently commented out on route handlers — must be wired
before production use. verify_device_hmac must use the same format as hmac.cpp.
"""
import base64
import json
from pathlib import Path
from fastapi import APIRouter
from fastapi.responses import FileResponse
FIRMWARE_DIR = Path(__file__).parent / "firmware"
router = APIRouter(prefix="/ota", tags=["ota"])
class FirmwareNotFoundError(Exception):
pass
def _parse_version(v: str) -> tuple:
"""Parse semver string to comparable tuple; returns (0,0,0) on malformed input."""
try:
parts = v.strip().split(".")
if len(parts) != 3:
return (0, 0, 0)
return tuple(int(x) for x in parts)
except (ValueError, AttributeError):
return (0, 0, 0)
def ota_check_impl(current_version: str, firmware_dir: Path = FIRMWARE_DIR) -> dict:
"""
Compare device's current_version against staged manifest.
Returns {"update": False} when no update is available or manifest is missing.
Returns full update payload when server version is strictly newer.
"""
manifest_path = firmware_dir / "manifest.json"
if not manifest_path.exists():
return {"update": False}
try:
manifest = json.loads(manifest_path.read_text())
version = manifest["version"]
size = manifest["size"]
sha256 = manifest["sha256"]
except (json.JSONDecodeError, KeyError):
return {"update": False}
if _parse_version(version) <= _parse_version(current_version):
return {"update": False}
sig_path = firmware_dir / "current.sig"
if not sig_path.exists():
return {"update": False}
sig_b64 = base64.b64encode(sig_path.read_bytes()).decode()
return {
"update": True,
"version": version,
"size": size,
"sha256": sha256,
"sig_b64": sig_b64,
}
def ota_firmware_impl(firmware_dir: Path = FIRMWARE_DIR) -> bytes:
"""
Return raw firmware binary bytes.
Raises FirmwareNotFoundError if current.bin is absent.
"""
bin_path = firmware_dir / "current.bin"
if not bin_path.exists():
raise FirmwareNotFoundError("No firmware staged")
return bin_path.read_bytes()
@router.get("/check")
async def ota_check(
version: str,
# device_id: str = Depends(verify_device_hmac), # uncomment when wiring into app
):
"""Check whether a firmware update is available for the given device version."""
return ota_check_impl(current_version=version)
@router.get("/firmware")
async def ota_firmware(
# device_id: str = Depends(verify_device_hmac), # uncomment when wiring into app
):
"""Stream the staged firmware binary to the device."""
from fastapi import HTTPException
bin_path = FIRMWARE_DIR / "current.bin"
if not bin_path.exists():
raise HTTPException(status_code=404, detail="No firmware available")
return FileResponse(bin_path, media_type="application/octet-stream")

View File

@@ -98,3 +98,15 @@ def test_negative_counts_rejected():
with pytest.raises(ValidationError):
CameraRecord(period_start=1712000000, period_end=1712003600,
entries=-1, exits=0)
def test_inverted_period_rejected():
"""Pydantic should reject period_end <= period_start."""
from pydantic import ValidationError
from server.camera_endpoint import CameraRecord
with pytest.raises(ValidationError):
CameraRecord(period_start=1712003600, period_end=1712003600,
entries=0, exits=0)
with pytest.raises(ValidationError):
CameraRecord(period_start=1712003600, period_end=1712000000,
entries=0, exits=0)

View File

@@ -0,0 +1,83 @@
# server/test_ota_endpoint.py
import base64
import hashlib
import json
from pathlib import Path
import pytest
from server.ota_endpoint import ota_check_impl, ota_firmware_impl
def write_firmware(firmware_dir: Path, version: str, data: bytes = b"fake_fw") -> None:
sig = bytes(64) # zero sig (not validated server-side)
manifest = {
"version": version,
"size": len(data),
"sha256": hashlib.sha256(data).hexdigest(),
}
(firmware_dir / "current.bin").write_bytes(data)
(firmware_dir / "current.sig").write_bytes(sig)
(firmware_dir / "manifest.json").write_text(json.dumps(manifest))
@pytest.fixture(autouse=True)
def patch_firmware_dir(tmp_path, monkeypatch):
import server.ota_endpoint as mod
monkeypatch.setattr(mod, "FIRMWARE_DIR", tmp_path)
yield tmp_path
def test_check_no_update_same_version(tmp_path):
write_firmware(tmp_path, "1.0.0")
result = ota_check_impl(current_version="1.0.0", firmware_dir=tmp_path)
assert result["update"] is False
def test_check_no_update_newer_local(tmp_path):
write_firmware(tmp_path, "1.0.0")
result = ota_check_impl(current_version="1.1.0", firmware_dir=tmp_path)
assert result["update"] is False
def test_check_update_available(tmp_path):
write_firmware(tmp_path, "1.1.0", data=b"new firmware")
result = ota_check_impl(current_version="1.0.0", firmware_dir=tmp_path)
assert result["update"] is True
assert result["version"] == "1.1.0"
assert result["size"] == len(b"new firmware")
assert "sha256" in result
assert "sig_b64" in result
sig_bytes = base64.b64decode(result["sig_b64"])
assert len(sig_bytes) == 64
def test_check_no_manifest(tmp_path):
result = ota_check_impl(current_version="1.0.0", firmware_dir=tmp_path)
assert result["update"] is False
def test_firmware_endpoint_returns_binary(tmp_path):
fw_data = b"firmware binary content"
write_firmware(tmp_path, "1.1.0", data=fw_data)
content = ota_firmware_impl(firmware_dir=tmp_path)
assert content == fw_data
def test_firmware_endpoint_missing_raises(tmp_path):
import server.ota_endpoint as mod
with pytest.raises(mod.FirmwareNotFoundError):
ota_firmware_impl(firmware_dir=tmp_path)
def test_check_malformed_manifest(tmp_path):
(tmp_path / "manifest.json").write_text("not valid json{{{")
result = ota_check_impl(current_version="1.0.0", firmware_dir=tmp_path)
assert result["update"] is False
def test_check_wrong_arity_version_no_update(tmp_path):
write_firmware(tmp_path, "1.2") # wrong arity server version
result = ota_check_impl(current_version="1.0.0", firmware_dir=tmp_path)
# server "1.2" → (0,0,0) ≤ client (1,0,0) → no update
assert result["update"] is False

0
tools/__init__.py Normal file
View File

43
tools/deploy_firmware.py Normal file
View File

@@ -0,0 +1,43 @@
#!/usr/bin/env python3
"""Sign firmware and stage it for the server OTA endpoint."""
import argparse, hashlib, json
from pathlib import Path
from sign_firmware import sign_firmware
def deploy(firmware_path: Path, key_path: Path,
version: str, output_dir: Path) -> None:
output_dir.mkdir(parents=True, exist_ok=True)
data = firmware_path.read_bytes()
sig = sign_firmware(firmware_path, key_path)
(output_dir / "current.bin").write_bytes(data)
(output_dir / "current.sig").write_bytes(sig)
(output_dir / "manifest.json").write_text(json.dumps({
"version": version,
"size": len(data),
"sha256": hashlib.sha256(data).hexdigest(),
}, indent=2))
print(f"Deployed {firmware_path.name} v{version}{output_dir}/")
def main() -> None:
p = argparse.ArgumentParser(description=__doc__)
p.add_argument("firmware", help="Path to .bin")
p.add_argument("version", help="Version string, e.g. 1.2.3")
p.add_argument("--key", default="secrets/firmware_signing_key.pem")
p.add_argument("--out-dir", default="server/firmware")
args = p.parse_args()
deploy(
firmware_path=Path(args.firmware),
key_path=Path(args.key),
version=args.version,
output_dir=Path(args.out_dir),
)
if __name__ == "__main__":
main()

View File

@@ -28,6 +28,25 @@ NVS_NAMESPACE = "doorcounter"
NVS_PARTITION_OFFSET = "0x9000"
NVS_PARTITION_SIZE = "0x5000" # matches firmware partition table (20KB)
# Characters that would change the field/row structure of the NVS-CSV format
# (key,type,encoding,value). A value containing any of these would either
# split into more fields or add rows, silently provisioning the wrong keys.
_CSV_FORBIDDEN = (",", '"', "\n", "\r")
def _reject_csv_metacharacters(name, value):
"""Exit with an error if value contains a character that would corrupt
the NVS CSV. Used for operator-supplied strings (device id, location id,
WiFi credentials)."""
for c in _CSV_FORBIDDEN:
if c in value:
print(
f"Error: --{name} contains forbidden character {c!r}; "
f"this would corrupt the NVS partition CSV.",
file=sys.stderr,
)
sys.exit(1)
def build_nvs_csv(device_id, location_id, hmac_secret,
wifi_ssid=None, wifi_pass=None, line_offset=50):
@@ -78,6 +97,13 @@ def main():
print("Error: --line-offset must be 0-100", file=sys.stderr)
sys.exit(1)
_reject_csv_metacharacters("device-id", args.device_id)
_reject_csv_metacharacters("location-id", args.location_id)
if args.wifi_ssid is not None:
_reject_csv_metacharacters("wifi-ssid", args.wifi_ssid)
if args.wifi_password is not None:
_reject_csv_metacharacters("wifi-password", args.wifi_password)
with tempfile.TemporaryDirectory() as tmp:
csv_path = os.path.join(tmp, "nvs.csv")
bin_path = os.path.join(tmp, "nvs.bin")

57
tools/gen_signing_key.py Normal file
View File

@@ -0,0 +1,57 @@
#!/usr/bin/env python3
"""Generate ECDSA P-256 signing keypair for OTA firmware verification."""
import argparse
import os
from pathlib import Path
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization
def generate(secrets_dir: Path, header_out: Path) -> None:
secrets_dir.mkdir(parents=True, exist_ok=True)
key = ec.generate_private_key(ec.SECP256R1())
pem = key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
key_path = secrets_dir / "firmware_signing_key.pem"
key_path.write_bytes(pem)
key_path.chmod(0o600)
pub_bytes = key.public_key().public_bytes(
encoding=serialization.Encoding.X962,
format=serialization.PublicFormat.UncompressedPoint,
)
assert len(pub_bytes) == 65 and pub_bytes[0] == 0x04
hex_values = ", ".join(f"0x{b:02x}" for b in pub_bytes)
header = (
"#pragma once\n"
"// Auto-generated by tools/gen_signing_key.py — DO NOT EDIT\n"
"// ECDSA P-256 public key, uncompressed X9.62 (04 || X || Y)\n"
f"static const uint8_t kOtaPublicKey[65] = {{{hex_values}}};\n"
)
header_out.parent.mkdir(parents=True, exist_ok=True)
header_out.write_text(header)
print(f"Private key → {key_path}")
print(f"Public key header → {header_out}")
def main() -> None:
p = argparse.ArgumentParser(description=__doc__)
p.add_argument("--secrets-dir", default="secrets",
help="Directory for private key (default: secrets/)")
p.add_argument("--header-out",
default="firmware/lib/ota_updater/ota_pubkey.h",
help="Path to write the C header")
args = p.parse_args()
generate(Path(args.secrets_dir), Path(args.header_out))
if __name__ == "__main__":
main()

52
tools/sign_firmware.py Normal file
View File

@@ -0,0 +1,52 @@
#!/usr/bin/env python3
"""Sign a firmware binary with ECDSA P-256. Outputs a raw 64-byte r||s .sig file."""
import argparse
import sys
from pathlib import Path
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature
def load_private_key(key_path: Path) -> ec.EllipticCurvePrivateKey:
key = serialization.load_pem_private_key(key_path.read_bytes(), password=None)
if not isinstance(key, ec.EllipticCurvePrivateKey):
raise ValueError("Key must be an EC private key")
if not isinstance(key.curve, ec.SECP256R1):
raise ValueError(f"Key must use SECP256R1 curve, got {key.curve.name}")
return key
def sign_firmware(firmware_path: Path, key_path: Path) -> bytes:
key = load_private_key(key_path)
data = firmware_path.read_bytes()
sig_der = key.sign(data, ec.ECDSA(hashes.SHA256()))
r, s = decode_dss_signature(sig_der)
# Returns raw 64-byte r‖s (not DER) — mbedtls_ecdsa_verify expects this layout
return r.to_bytes(32, 'big') + s.to_bytes(32, 'big')
def main() -> None:
p = argparse.ArgumentParser(description=__doc__)
p.add_argument("firmware", help="Path to firmware .bin")
p.add_argument("--key", default="secrets/firmware_signing_key.pem",
help="Path to PEM private key")
p.add_argument("--out", help="Output .sig path (default: firmware.bin.sig)")
args = p.parse_args()
firmware = Path(args.firmware)
key_path = Path(args.key)
out_path = Path(args.out) if args.out else firmware.with_suffix(".bin.sig")
try:
sig = sign_firmware(firmware, key_path)
except (FileNotFoundError, ValueError) as e:
print(f"Error: {e}", file=sys.stderr)
raise SystemExit(1)
out_path.write_bytes(sig)
print(f"Signed {firmware.name}{out_path} ({len(sig)} bytes)")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,63 @@
import json, hashlib, sys
from pathlib import Path
import pytest
REPO_ROOT = Path(__file__).parent.parent
sys.path.insert(0, str(REPO_ROOT / "tools"))
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization
from deploy_firmware import deploy
@pytest.fixture()
def key_pem(tmp_path):
key = ec.generate_private_key(ec.SECP256R1())
pem_path = tmp_path / "key.pem"
pem_path.write_bytes(key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.PKCS8,
serialization.NoEncryption(),
))
return pem_path
def test_deploy_writes_all_artifacts(tmp_path, key_pem):
firmware = tmp_path / "firmware.bin"
firmware.write_bytes(b"fake firmware" * 200)
out_dir = tmp_path / "server_firmware"
deploy(firmware_path=firmware, key_path=key_pem,
version="1.2.3", output_dir=out_dir)
assert (out_dir / "current.bin").exists()
assert (out_dir / "current.sig").exists()
assert (out_dir / "manifest.json").exists()
def test_manifest_contents(tmp_path, key_pem):
data = b"firmware payload"
firmware = tmp_path / "fw.bin"
firmware.write_bytes(data)
out_dir = tmp_path / "out"
deploy(firmware_path=firmware, key_path=key_pem,
version="2.0.1", output_dir=out_dir)
manifest = json.loads((out_dir / "manifest.json").read_text())
assert manifest["version"] == "2.0.1"
assert manifest["size"] == len(data)
assert manifest["sha256"] == hashlib.sha256(data).hexdigest()
def test_signature_is_64_bytes(tmp_path, key_pem):
firmware = tmp_path / "fw.bin"
firmware.write_bytes(b"fw")
out_dir = tmp_path / "out"
deploy(firmware_path=firmware, key_path=key_pem,
version="1.0.0", output_dir=out_dir)
sig = (out_dir / "current.sig").read_bytes()
assert len(sig) == 64

View File

@@ -0,0 +1,17 @@
import pytest
from tools.flash_device import _reject_csv_metacharacters
def test_clean_value_accepted():
"""A value with no metacharacters should pass without exiting."""
_reject_csv_metacharacters("device-id", "dc-0042")
_reject_csv_metacharacters("location-id", "retailer-123")
_reject_csv_metacharacters("wifi-ssid", "StoreWiFi-2.4GHz")
_reject_csv_metacharacters("wifi-password", "p@ssw0rd!~#$%^&*()_+-=:;<>?/")
@pytest.mark.parametrize("bad", ["Home,Network", 'pa"ss', "ssid\nfoo", "name\rbar"])
def test_metacharacter_rejected(bad):
with pytest.raises(SystemExit):
_reject_csv_metacharacters("wifi-ssid", bad)

View File

@@ -0,0 +1,49 @@
import os, subprocess, sys, tempfile
from pathlib import Path
REPO_ROOT = Path(__file__).parent.parent
def run_gen(secrets_dir, header_path):
env = os.environ.copy()
result = subprocess.run(
[sys.executable, str(REPO_ROOT / "tools/gen_signing_key.py"),
"--secrets-dir", str(secrets_dir),
"--header-out", str(header_path)],
capture_output=True, text=True, env=env
)
assert result.returncode == 0, result.stderr
return result
def test_private_key_created():
with tempfile.TemporaryDirectory() as d:
header = Path(d) / "ota_pubkey.h"
run_gen(d, header)
pem = Path(d) / "firmware_signing_key.pem"
assert pem.exists()
content = pem.read_text()
assert "BEGIN PRIVATE KEY" in content
def test_header_created():
with tempfile.TemporaryDirectory() as d:
header = Path(d) / "ota_pubkey.h"
run_gen(d, header)
assert header.exists()
content = header.read_text()
assert "kOtaPublicKey" in content
assert "0x04" in content # uncompressed point prefix
assert "[65]" in content
def test_public_key_is_valid_p256_point():
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization
with tempfile.TemporaryDirectory() as d:
header = Path(d) / "ota_pubkey.h"
run_gen(d, header)
pem = (Path(d) / "firmware_signing_key.pem").read_bytes()
priv = serialization.load_pem_private_key(pem, password=None)
pub_bytes = priv.public_key().public_bytes(
serialization.Encoding.X962,
serialization.PublicFormat.UncompressedPoint,
)
assert len(pub_bytes) == 65
assert pub_bytes[0] == 0x04

View File

@@ -0,0 +1,64 @@
import sys
from pathlib import Path
import pytest
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature
REPO_ROOT = Path(__file__).parent.parent
sys.path.insert(0, str(REPO_ROOT / "tools"))
from sign_firmware import sign_firmware, load_private_key
@pytest.fixture()
def keypair(tmp_path):
key = ec.generate_private_key(ec.SECP256R1())
pem_path = tmp_path / "key.pem"
pem_path.write_bytes(key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.PKCS8,
serialization.NoEncryption(),
))
return key, pem_path
def test_signature_is_64_bytes(keypair, tmp_path):
key, key_path = keypair
firmware = tmp_path / "fw.bin"
firmware.write_bytes(b"fake firmware data" * 100)
sig = sign_firmware(firmware, key_path)
assert len(sig) == 64
def test_signature_verifies(keypair, tmp_path):
key, key_path = keypair
data = b"test firmware payload"
firmware = tmp_path / "fw.bin"
firmware.write_bytes(data)
sig_raw = sign_firmware(firmware, key_path)
# Convert raw r||s back to DER for cryptography lib verify
r = int.from_bytes(sig_raw[:32], 'big')
s = int.from_bytes(sig_raw[32:], 'big')
from cryptography.hazmat.primitives.asymmetric.utils import encode_dss_signature
sig_der = encode_dss_signature(r, s)
key.public_key().verify(sig_der, data, ec.ECDSA(hashes.SHA256()))
def test_wrong_key_fails_verification(keypair, tmp_path):
key, key_path = keypair
firmware = tmp_path / "fw.bin"
firmware.write_bytes(b"firmware")
sig_raw = sign_firmware(firmware, key_path)
other_key = ec.generate_private_key(ec.SECP256R1())
r = int.from_bytes(sig_raw[:32], 'big')
s = int.from_bytes(sig_raw[32:], 'big')
from cryptography.hazmat.primitives.asymmetric.utils import encode_dss_signature
sig_der = encode_dss_signature(r, s)
with pytest.raises(InvalidSignature):
other_key.public_key().verify(sig_der, b"firmware", ec.ECDSA(hashes.SHA256()))