Compare commits
16 Commits
8342904488
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| d2c2d97fb7 | |||
| 5ec678dfa3 | |||
| 5cf122b922 | |||
| a21dcfa349 | |||
| 66e6808e13 | |||
| 8b1fd10db7 | |||
| f37e0d6b07 | |||
| 81bcc12f2f | |||
| d9a242a5fa | |||
| 87b30a64b2 | |||
| 031426e364 | |||
| 437f73739f | |||
| 21a3c646aa | |||
| 81dc96b100 | |||
| 56fc58b843 | |||
| 641ab29277 |
2
.gitignore
vendored
2
.gitignore
vendored
@@ -7,3 +7,5 @@ firmware/.pio/
|
|||||||
*.log
|
*.log
|
||||||
*secret*
|
*secret*
|
||||||
__pycache__/
|
__pycache__/
|
||||||
|
secrets/
|
||||||
|
server/firmware/
|
||||||
|
|||||||
88
docs/ota-deployment-status.md
Normal file
88
docs/ota-deployment-status.md
Normal file
@@ -0,0 +1,88 @@
|
|||||||
|
# OTA Deployment — Status
|
||||||
|
|
||||||
|
## Current state (2026-05-14)
|
||||||
|
|
||||||
|
**End-to-end OTA verified working on `dc-0002`.** Device polled `engagement-api-1`, received a signed manifest, downloaded and verified firmware 1.0.1, set the alternate boot partition, rebooted, and came up reporting `fw=1.0.1`.
|
||||||
|
|
||||||
|
## What's deployed
|
||||||
|
|
||||||
|
- **Branch `feat/pull-ota-code-signing`** merged to `main` (13 commits, 17 new files, 936 LOC).
|
||||||
|
- **Signing toolchain**: `tools/gen_signing_key.py`, `tools/sign_firmware.py`, `tools/deploy_firmware.py`.
|
||||||
|
- **Firmware OTA library**: `firmware/lib/ota_updater/`.
|
||||||
|
- **Signing key**: `secrets/firmware_signing_key.pem` (gitignored). Public key committed at `firmware/lib/ota_updater/ota_pubkey.h`.
|
||||||
|
- **Live OTA handler**: served by `engagement-api-1` Docker service (source not in this repo). The stub at `server/ota_endpoint.py` is unwired and not the one responding to devices.
|
||||||
|
- **Configurable poll interval** via NVS key `ota_interval`. Provision with `flash_device.py --ota-interval-seconds N`. Min 10 s, default 21600 (6 h).
|
||||||
|
|
||||||
|
## Issues resolved
|
||||||
|
|
||||||
|
### 1. HMAC format mismatch (resolved 2026-05-13)
|
||||||
|
Firmware OTA updater was using `X-HMAC-Signature` header + `millis()`-derived timestamp; the reporter component used `X-Signature` + `time(nullptr)`. Server expected the reporter format. Fixed by aligning the OTA updater to the same canonical scheme as the reporter (`firmware/lib/ota_updater/ota_updater.cpp` `add_hmac_headers`).
|
||||||
|
|
||||||
|
### 2. `/ota/check` JSON schema mismatch (resolved 2026-05-14)
|
||||||
|
Server was emitting `{update_available, sha256, url}` but firmware reads `{update, size, sig_b64}`. Device silently decided "up to date" every poll because `doc["update"]` defaulted to `false`. Fixed server-side: the `/ota/check` response now also includes the fields the firmware needs. Firmware schema remains the source of truth.
|
||||||
|
|
||||||
|
### 3. Signed firmware artifact pipeline (resolved 2026-05-14)
|
||||||
|
Deploy flow now bumps `FW_VERSION` → builds → copies `.pio/build/timercam/firmware.bin` to `firmware-<version>.bin` → signs with `tools/sign_firmware.py` → SCPs both `.bin` and `.bin.sig` to `root@nginx:/root/engagement-api/firmware/`. Server team updates `firmware_releases.sha256` to match the new binary.
|
||||||
|
|
||||||
|
**Gotcha:** the `.bin` and `.sig` must always be deployed together. The signature is over the bytes; replacing one without the other puts the server in an inconsistent state and devices will reject the update with `SIGNATURE INVALID`.
|
||||||
|
|
||||||
|
## Hardening added this session
|
||||||
|
|
||||||
|
### Firmware logging (`firmware/lib/ota_updater/ota_updater.cpp`, `firmware/src/main.cpp`)
|
||||||
|
The previous `log_i/w/e` macros were silenced by the default `CORE_DEBUG_LEVEL`. Replaced with `Serial.printf` so output appears regardless of log level. Now logs at every step:
|
||||||
|
- `[OTA] task started, interval=N ms`
|
||||||
|
- Per-tick WiFi status
|
||||||
|
- Full check URL + HMAC header preview (device id, ts, sig prefix)
|
||||||
|
- HTTP response code + error body on non-200
|
||||||
|
- JSON parse errors
|
||||||
|
- "Up to date" decision
|
||||||
|
- Partition labels and offsets (running + target)
|
||||||
|
- Per-128 KB download progress
|
||||||
|
- Total bytes + elapsed ms
|
||||||
|
- Computed sha256 of the downloaded image (compare against server `X-SHA256`)
|
||||||
|
- Signature verify result
|
||||||
|
- `esp_ota_end` / `esp_ota_set_boot_partition` errors by name
|
||||||
|
- 500 ms `Serial.flush()` + `delay()` before `esp_restart()` so the final log line escapes the UART
|
||||||
|
|
||||||
|
### Boot-time partition state (`firmware/src/main.cpp`)
|
||||||
|
Logs `running partition '<label>' (off=0x…) state=N fw=…` at every boot. If `state == ESP_OTA_IMG_PENDING_VERIFY` (3), calls `esp_ota_mark_app_valid_cancel_rollback()` to prevent the bootloader from reverting on the next reboot. Harmless no-op when rollback isn't enabled, but eliminates a class of silent OTA failures.
|
||||||
|
|
||||||
|
### `esp_ota_write` return value (`firmware/lib/ota_updater/ota_updater.cpp`)
|
||||||
|
Previously ignored — a failed write would silently corrupt the new partition and the device would still try to boot from it. Now checked, aborts the OTA cleanly, and logs the failing offset.
|
||||||
|
|
||||||
|
### Partition size pre-check
|
||||||
|
Reject the update before `esp_ota_begin` if `expected_size > target->size`.
|
||||||
|
|
||||||
|
## Verifying a deployment
|
||||||
|
|
||||||
|
After a server push, watch the device's serial output on the next OTA tick:
|
||||||
|
|
||||||
|
```
|
||||||
|
[OTA] tick: WiFi connected, running check
|
||||||
|
[OTA] check → GET http://logs.research.bike:80/ota/check?version=X.Y.Z
|
||||||
|
[OTA] check response: HTTP 200
|
||||||
|
[OTA] Update: X.Y.Z → A.B.C (N bytes)
|
||||||
|
[OTA] running='app0' (off=…), target='app1' (off=…)
|
||||||
|
[OTA] progress: N/N bytes
|
||||||
|
[OTA] sha256(image)=<hex> ← must match server X-SHA256
|
||||||
|
[OTA] signature OK
|
||||||
|
[OTA] boot partition set to 'app1' — rebooting in 500 ms
|
||||||
|
```
|
||||||
|
|
||||||
|
Then on reboot:
|
||||||
|
|
||||||
|
```
|
||||||
|
[BOOT] running partition 'app1' (off=…) state=N fw=A.B.C
|
||||||
|
```
|
||||||
|
|
||||||
|
The `fw=A.B.C` line is the success signal — it reflects the `FW_VERSION` macro baked into the freshly-booted image, not just what the device claims to be running.
|
||||||
|
|
||||||
|
## Quick reference
|
||||||
|
|
||||||
|
- Plan: `docs/superpowers/plans/2026-05-10-pull-ota-code-signing.md`
|
||||||
|
- Firmware version: `firmware/include/version.h`
|
||||||
|
- OTA library: `firmware/lib/ota_updater/`
|
||||||
|
- HMAC implementation: `firmware/lib/hmac/hmac.cpp`
|
||||||
|
- Provisioning tool: `tools/flash_device.py`
|
||||||
|
- Signing tools: `tools/gen_signing_key.py`, `tools/sign_firmware.py`, `tools/deploy_firmware.py`
|
||||||
|
- Server deploy path: `root@nginx:/root/engagement-api/firmware/` (per server team runbook)
|
||||||
3
firmware/include/version.h
Normal file
3
firmware/include/version.h
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
#pragma once
|
||||||
|
// Format: MAJOR.MINOR.PATCH (SemVer) — OTA version compare uses sscanf("%d.%d.%d")
|
||||||
|
#define FW_VERSION "1.0.1"
|
||||||
6
firmware/lib/ota_updater/library.json
Normal file
6
firmware/lib/ota_updater/library.json
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
{
|
||||||
|
"name": "ota_updater",
|
||||||
|
"build": {
|
||||||
|
"flags": ["-I$PROJECT_INCLUDE_DIR"]
|
||||||
|
}
|
||||||
|
}
|
||||||
4
firmware/lib/ota_updater/ota_pubkey.h
Normal file
4
firmware/lib/ota_updater/ota_pubkey.h
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
#pragma once
|
||||||
|
// Auto-generated by tools/gen_signing_key.py — DO NOT EDIT
|
||||||
|
// ECDSA P-256 public key, uncompressed X9.62 (04 || X || Y)
|
||||||
|
static const uint8_t kOtaPublicKey[65] = {0x04, 0x1c, 0x92, 0x43, 0x23, 0xe9, 0xac, 0xd1, 0xe8, 0x05, 0x32, 0x49, 0x39, 0x12, 0x95, 0xb2, 0x0a, 0x3e, 0xfb, 0x9d, 0xdf, 0xee, 0xd1, 0x98, 0x87, 0x97, 0xa3, 0xb8, 0xcb, 0x2b, 0xa6, 0x06, 0xe0, 0x83, 0x32, 0x71, 0xd2, 0x5f, 0x80, 0x40, 0x68, 0xcd, 0x00, 0xe5, 0x0e, 0xba, 0x13, 0xf6, 0x97, 0x43, 0x6f, 0xe6, 0x4f, 0xd0, 0x95, 0x53, 0x0e, 0xd7, 0x9a, 0x8a, 0x2e, 0x25, 0x52, 0xb4, 0xaf};
|
||||||
319
firmware/lib/ota_updater/ota_updater.cpp
Normal file
319
firmware/lib/ota_updater/ota_updater.cpp
Normal file
@@ -0,0 +1,319 @@
|
|||||||
|
// firmware/lib/ota_updater/ota_updater.cpp
|
||||||
|
#include "ota_updater.h"
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include <mbedtls/ecdsa.h>
|
||||||
|
#include <mbedtls/ecp.h>
|
||||||
|
#include <mbedtls/bignum.h>
|
||||||
|
|
||||||
|
// ── version comparison ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
bool ota_version_newer(const char* current, const char* remote) {
|
||||||
|
int ca = 0, cb = 0, cc = 0;
|
||||||
|
int ra = 0, rb = 0, rc = 0;
|
||||||
|
if (sscanf(current, "%d.%d.%d", &ca, &cb, &cc) != 3) return false;
|
||||||
|
if (sscanf(remote, "%d.%d.%d", &ra, &rb, &rc) != 3) return false;
|
||||||
|
if (ra != ca) return ra > ca;
|
||||||
|
if (rb != cb) return rb > cb;
|
||||||
|
return rc > cc;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── signature verification ─────────────────────────────────────────────────
|
||||||
|
bool ota_verify_signature_with_key(const uint8_t hash32[32], const uint8_t sig64[64],
|
||||||
|
const uint8_t pubkey65[65]) {
|
||||||
|
mbedtls_ecp_group grp;
|
||||||
|
mbedtls_ecp_point Q;
|
||||||
|
mbedtls_mpi r, s;
|
||||||
|
|
||||||
|
mbedtls_ecp_group_init(&grp);
|
||||||
|
mbedtls_ecp_point_init(&Q);
|
||||||
|
mbedtls_mpi_init(&r);
|
||||||
|
mbedtls_mpi_init(&s);
|
||||||
|
|
||||||
|
bool ok = false;
|
||||||
|
if (mbedtls_ecp_group_load(&grp, MBEDTLS_ECP_DP_SECP256R1) == 0 &&
|
||||||
|
mbedtls_ecp_point_read_binary(&grp, &Q, pubkey65, 65) == 0 &&
|
||||||
|
mbedtls_mpi_read_binary(&r, sig64, 32) == 0 &&
|
||||||
|
mbedtls_mpi_read_binary(&s, sig64 + 32, 32) == 0 &&
|
||||||
|
mbedtls_ecdsa_verify(&grp, hash32, 32, &Q, &r, &s) == 0) {
|
||||||
|
ok = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
mbedtls_ecp_group_free(&grp);
|
||||||
|
mbedtls_ecp_point_free(&Q);
|
||||||
|
mbedtls_mpi_free(&r);
|
||||||
|
mbedtls_mpi_free(&s);
|
||||||
|
return ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── device-only code ───────────────────────────────────────────────────────
|
||||||
|
#ifndef NATIVE_TEST
|
||||||
|
|
||||||
|
#include <Arduino.h>
|
||||||
|
#include <time.h>
|
||||||
|
#include <HTTPClient.h>
|
||||||
|
#include <WiFi.h>
|
||||||
|
#include <ArduinoJson.h>
|
||||||
|
#include <esp_ota_ops.h>
|
||||||
|
#include <mbedtls/sha256.h>
|
||||||
|
#include <mbedtls/base64.h>
|
||||||
|
#include "hmac.h"
|
||||||
|
#include "ota_pubkey.h"
|
||||||
|
#include "version.h"
|
||||||
|
|
||||||
|
bool ota_verify_signature(const uint8_t hash32[32], const uint8_t sig64[64]) {
|
||||||
|
return ota_verify_signature_with_key(hash32, sig64, kOtaPublicKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
static const char* s_server_base = nullptr;
|
||||||
|
static const char* s_device_id = nullptr;
|
||||||
|
static const char* s_hmac_secret = nullptr;
|
||||||
|
static uint32_t s_interval_ms = 21600000UL; // 6 h default
|
||||||
|
static uint32_t s_last_check_ms = 0;
|
||||||
|
|
||||||
|
void ota_updater_init(const char* server_base, const char* device_id,
|
||||||
|
const char* hmac_secret, uint32_t check_interval_ms) {
|
||||||
|
s_server_base = server_base;
|
||||||
|
s_device_id = device_id;
|
||||||
|
s_hmac_secret = hmac_secret;
|
||||||
|
s_interval_ms = check_interval_ms;
|
||||||
|
s_last_check_ms = 0; // force first check on next call
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool add_hmac_headers(HTTPClient& http, const char* method, const char* path) {
|
||||||
|
uint32_t ts = (uint32_t)time(nullptr);
|
||||||
|
if (ts < 1700000000UL) {
|
||||||
|
Serial.printf("[OTA] Clock not synced (ts=%u) — skipping HMAC sign\n", (unsigned)ts);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
String sig = hmac_sign(s_hmac_secret, method, path, ts, "");
|
||||||
|
if (sig.isEmpty()) {
|
||||||
|
Serial.println("[OTA] HMAC sign failed");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
Serial.printf("[OTA] HMAC headers: device=%s ts=%u sig=%s...\n",
|
||||||
|
s_device_id, (unsigned)ts, sig.substring(0, 12).c_str());
|
||||||
|
http.addHeader("X-Device-Id", s_device_id);
|
||||||
|
http.addHeader("X-Timestamp", String(ts));
|
||||||
|
http.addHeader("X-Signature", sig);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool download_and_flash(const char* fw_url, size_t expected_size,
|
||||||
|
const uint8_t sig64[64]) {
|
||||||
|
const esp_partition_t* running = esp_ota_get_running_partition();
|
||||||
|
const esp_partition_t* target = esp_ota_get_next_update_partition(nullptr);
|
||||||
|
if (!target) {
|
||||||
|
Serial.println("[OTA] No update partition found");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
Serial.printf("[OTA] running='%s' (off=0x%x sz=0x%x), target='%s' (off=0x%x sz=0x%x)\n",
|
||||||
|
running ? running->label : "?",
|
||||||
|
running ? (unsigned)running->address : 0,
|
||||||
|
running ? (unsigned)running->size : 0,
|
||||||
|
target->label,
|
||||||
|
(unsigned)target->address, (unsigned)target->size);
|
||||||
|
|
||||||
|
if (expected_size > target->size) {
|
||||||
|
Serial.printf("[OTA] image (%zu) larger than partition (%u)\n",
|
||||||
|
expected_size, (unsigned)target->size);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
esp_ota_handle_t handle;
|
||||||
|
esp_err_t er = esp_ota_begin(target, OTA_WITH_SEQUENTIAL_WRITES, &handle);
|
||||||
|
if (er != ESP_OK) {
|
||||||
|
Serial.printf("[OTA] esp_ota_begin failed: %s\n", esp_err_to_name(er));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
mbedtls_sha256_context sha_ctx;
|
||||||
|
mbedtls_sha256_init(&sha_ctx);
|
||||||
|
mbedtls_sha256_starts(&sha_ctx, 0);
|
||||||
|
|
||||||
|
HTTPClient http;
|
||||||
|
http.begin(fw_url);
|
||||||
|
http.setTimeout(30000);
|
||||||
|
if (!add_hmac_headers(http, "GET", "/ota/firmware")) {
|
||||||
|
Serial.println("[OTA] Aborting firmware download: HMAC sign failed");
|
||||||
|
mbedtls_sha256_free(&sha_ctx);
|
||||||
|
esp_ota_abort(handle);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
Serial.printf("[OTA] downloading firmware: %s\n", fw_url);
|
||||||
|
int code = http.GET();
|
||||||
|
Serial.printf("[OTA] firmware response: HTTP %d\n", code);
|
||||||
|
if (code != HTTP_CODE_OK) {
|
||||||
|
String body = http.getString();
|
||||||
|
Serial.printf("[OTA] error body: %s\n", body.c_str());
|
||||||
|
http.end();
|
||||||
|
mbedtls_sha256_free(&sha_ctx);
|
||||||
|
esp_ota_abort(handle);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
int content_len = http.getSize();
|
||||||
|
Serial.printf("[OTA] Content-Length: %d (expected %zu)\n",
|
||||||
|
content_len, expected_size);
|
||||||
|
|
||||||
|
WiFiClient* stream = http.getStreamPtr();
|
||||||
|
uint8_t buf[4096];
|
||||||
|
size_t written = 0;
|
||||||
|
size_t last_log_at = 0;
|
||||||
|
bool write_failed = false;
|
||||||
|
|
||||||
|
uint32_t start_ms = millis();
|
||||||
|
while (written < expected_size) {
|
||||||
|
size_t want = min((size_t)sizeof(buf), expected_size - written);
|
||||||
|
int got = stream->readBytes(buf, want);
|
||||||
|
if (got <= 0) {
|
||||||
|
Serial.printf("[OTA] stream ended at %zu/%zu bytes (readBytes=%d)\n",
|
||||||
|
written, expected_size, got);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
esp_err_t we = esp_ota_write(handle, buf, (size_t)got);
|
||||||
|
if (we != ESP_OK) {
|
||||||
|
Serial.printf("[OTA] esp_ota_write failed at offset %zu: %s\n",
|
||||||
|
written, esp_err_to_name(we));
|
||||||
|
write_failed = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
mbedtls_sha256_update(&sha_ctx, buf, (size_t)got);
|
||||||
|
written += (size_t)got;
|
||||||
|
if (written - last_log_at >= 131072 || written == expected_size) {
|
||||||
|
Serial.printf("[OTA] progress: %zu/%zu bytes\n", written, expected_size);
|
||||||
|
last_log_at = written;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
uint32_t elapsed_ms = millis() - start_ms;
|
||||||
|
http.end();
|
||||||
|
Serial.printf("[OTA] download done: %zu bytes in %u ms\n",
|
||||||
|
written, (unsigned)elapsed_ms);
|
||||||
|
|
||||||
|
uint8_t hash[32];
|
||||||
|
mbedtls_sha256_finish(&sha_ctx, hash);
|
||||||
|
mbedtls_sha256_free(&sha_ctx);
|
||||||
|
|
||||||
|
char hex[65];
|
||||||
|
for (int i = 0; i < 32; i++) snprintf(hex + i*2, 3, "%02x", hash[i]);
|
||||||
|
Serial.printf("[OTA] sha256(image)=%s\n", hex);
|
||||||
|
|
||||||
|
if (write_failed) {
|
||||||
|
esp_ota_abort(handle);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (written != expected_size) {
|
||||||
|
Serial.printf("[OTA] Download truncated (%zu/%zu bytes)\n", written, expected_size);
|
||||||
|
esp_ota_abort(handle);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!ota_verify_signature_with_key(hash, sig64, kOtaPublicKey)) {
|
||||||
|
Serial.println("[OTA] SIGNATURE INVALID — staying on current firmware");
|
||||||
|
esp_ota_abort(handle);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
Serial.println("[OTA] signature OK");
|
||||||
|
|
||||||
|
esp_err_t end_err = esp_ota_end(handle);
|
||||||
|
if (end_err != ESP_OK) {
|
||||||
|
Serial.printf("[OTA] esp_ota_end failed: %s\n", esp_err_to_name(end_err));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
esp_err_t boot_err = esp_ota_set_boot_partition(target);
|
||||||
|
if (boot_err != ESP_OK) {
|
||||||
|
Serial.printf("[OTA] esp_ota_set_boot_partition failed: %s\n",
|
||||||
|
esp_err_to_name(boot_err));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
Serial.printf("[OTA] boot partition set to '%s' — rebooting in 500 ms\n",
|
||||||
|
target->label);
|
||||||
|
Serial.flush();
|
||||||
|
delay(500);
|
||||||
|
esp_restart();
|
||||||
|
return true; // unreachable
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ota_updater_check_and_apply() {
|
||||||
|
if (!s_server_base || !s_device_id || !s_hmac_secret) {
|
||||||
|
Serial.println("[OTA] check skipped: updater not initialized");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (s_last_check_ms != 0 &&
|
||||||
|
(uint32_t)(millis() - s_last_check_ms) < s_interval_ms) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
s_last_check_ms = millis();
|
||||||
|
|
||||||
|
if (WiFi.status() != WL_CONNECTED) {
|
||||||
|
Serial.printf("[OTA] check skipped: WiFi not connected (status=%d)\n",
|
||||||
|
WiFi.status());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
char check_path[128];
|
||||||
|
snprintf(check_path, sizeof(check_path), "/ota/check?version=%s", FW_VERSION);
|
||||||
|
char check_url[256];
|
||||||
|
snprintf(check_url, sizeof(check_url), "%s%s", s_server_base, check_path);
|
||||||
|
|
||||||
|
Serial.printf("[OTA] check → GET %s (fw=%s)\n", check_url, FW_VERSION);
|
||||||
|
|
||||||
|
HTTPClient http;
|
||||||
|
if (!http.begin(check_url)) {
|
||||||
|
Serial.println("[OTA] http.begin() failed");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!add_hmac_headers(http, "GET", check_path)) {
|
||||||
|
Serial.println("[OTA] Aborting check: HMAC sign failed");
|
||||||
|
http.end();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
int code = http.GET();
|
||||||
|
Serial.printf("[OTA] check response: HTTP %d\n", code);
|
||||||
|
if (code != HTTP_CODE_OK) {
|
||||||
|
String body = http.getString();
|
||||||
|
Serial.printf("[OTA] error body: %s\n", body.c_str());
|
||||||
|
http.end();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
JsonDocument doc;
|
||||||
|
DeserializationError err = deserializeJson(doc, http.getStream());
|
||||||
|
http.end();
|
||||||
|
if (err) {
|
||||||
|
Serial.printf("[OTA] JSON parse error: %s\n", err.c_str());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!doc["update"].as<bool>()) {
|
||||||
|
Serial.printf("[OTA] Firmware up to date (%s)\n", FW_VERSION);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
const char* remote_ver = doc["version"] | "";
|
||||||
|
size_t fw_size = doc["size"] | 0;
|
||||||
|
const char* sig_b64 = doc["sig_b64"] | "";
|
||||||
|
|
||||||
|
if (fw_size == 0 || strlen(sig_b64) == 0) {
|
||||||
|
log_e("[OTA] Invalid update manifest");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
log_i("[OTA] Update: %s → %s (%zu bytes)", FW_VERSION, remote_ver, fw_size);
|
||||||
|
|
||||||
|
uint8_t sig64[64];
|
||||||
|
size_t sig_len = 0;
|
||||||
|
if (mbedtls_base64_decode(sig64, sizeof(sig64), &sig_len,
|
||||||
|
(const uint8_t*)sig_b64, strlen(sig_b64)) != 0 ||
|
||||||
|
sig_len != 64) {
|
||||||
|
log_e("[OTA] Bad signature encoding (len=%zu)", sig_len);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
char fw_url[256];
|
||||||
|
snprintf(fw_url, sizeof(fw_url), "%s/ota/firmware", s_server_base);
|
||||||
|
return download_and_flash(fw_url, fw_size, sig64);
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif // NATIVE_TEST
|
||||||
27
firmware/lib/ota_updater/ota_updater.h
Normal file
27
firmware/lib/ota_updater/ota_updater.h
Normal file
@@ -0,0 +1,27 @@
|
|||||||
|
// firmware/lib/ota_updater/ota_updater.h
|
||||||
|
#pragma once
|
||||||
|
#include <stdint.h>
|
||||||
|
#include <stddef.h>
|
||||||
|
#include <stdbool.h>
|
||||||
|
|
||||||
|
// One-time init. Call from setup() after WiFi is ready.
|
||||||
|
// server_base: e.g. "http://logs.research.bike:8000"
|
||||||
|
// check_interval_ms: milliseconds between polls (e.g. 6*3600*1000 = 21600000)
|
||||||
|
void ota_updater_init(const char* server_base,
|
||||||
|
const char* device_id,
|
||||||
|
const char* hmac_secret,
|
||||||
|
uint32_t check_interval_ms);
|
||||||
|
|
||||||
|
// Polls server; downloads, verifies, and flashes if newer version available.
|
||||||
|
// Returns true if update was applied (device reboots before returning false path).
|
||||||
|
// Safe to call from any task; blocks during download.
|
||||||
|
bool ota_updater_check_and_apply();
|
||||||
|
|
||||||
|
// Exposed for unit testing — pass an arbitrary 65-byte uncompressed P-256 pubkey.
|
||||||
|
bool ota_version_newer(const char* current, const char* remote);
|
||||||
|
bool ota_verify_signature_with_key(const uint8_t hash32[32], const uint8_t sig64[64],
|
||||||
|
const uint8_t pubkey65[65]);
|
||||||
|
|
||||||
|
// Production wrapper — uses the compiled-in kOtaPublicKey from ota_pubkey.h.
|
||||||
|
// Not callable from native tests (requires ota_pubkey.h / device build).
|
||||||
|
bool ota_verify_signature(const uint8_t hash32[32], const uint8_t sig64[64]);
|
||||||
@@ -10,8 +10,11 @@
|
|||||||
#include "reporter.h"
|
#include "reporter.h"
|
||||||
#include "event_log.h"
|
#include "event_log.h"
|
||||||
#include "net_guard.h"
|
#include "net_guard.h"
|
||||||
|
#include "version.h"
|
||||||
|
#include "ota_updater.h"
|
||||||
#include <esp_system.h>
|
#include <esp_system.h>
|
||||||
#include <esp_task_wdt.h>
|
#include <esp_task_wdt.h>
|
||||||
|
#include <esp_ota_ops.h>
|
||||||
|
|
||||||
// LED on GPIO2 (TimerCamera-F built-in LED) — verify against board schematic
|
// LED on GPIO2 (TimerCamera-F built-in LED) — verify against board schematic
|
||||||
// Factory reset: hold GPIO37 (BOOT button) for 5 seconds
|
// Factory reset: hold GPIO37 (BOOT button) for 5 seconds
|
||||||
@@ -93,6 +96,22 @@ static void task_camera(void*) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void ota_task(void*) {
|
||||||
|
// Min 10s to avoid pathological fast loops if NVS is corrupted
|
||||||
|
uint32_t interval_ms = g_cfg.ota_interval_s < 10 ? 10000UL : g_cfg.ota_interval_s * 1000UL;
|
||||||
|
Serial.printf("[OTA] task started, interval=%u ms\n", (unsigned)interval_ms);
|
||||||
|
for (;;) {
|
||||||
|
if (WiFi.isConnected()) {
|
||||||
|
Serial.println("[OTA] tick: WiFi connected, running check");
|
||||||
|
ota_updater_check_and_apply();
|
||||||
|
} else {
|
||||||
|
Serial.printf("[OTA] tick: WiFi not connected (status=%d), skipping\n",
|
||||||
|
WiFi.status());
|
||||||
|
}
|
||||||
|
vTaskDelay(pdMS_TO_TICKS(interval_ms));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Hourly reporter task — runs on core 0
|
// Hourly reporter task — runs on core 0
|
||||||
static void task_reporter(void*) {
|
static void task_reporter(void*) {
|
||||||
uint32_t last_report_ts = 0; // 0 = not initialized yet
|
uint32_t last_report_ts = 0; // 0 = not initialized yet
|
||||||
@@ -175,6 +194,27 @@ void setup() {
|
|||||||
pinMode(BUTTON_PIN, INPUT_PULLUP);
|
pinMode(BUTTON_PIN, INPUT_PULLUP);
|
||||||
led_set(true); // on = booting
|
led_set(true); // on = booting
|
||||||
|
|
||||||
|
// OTA rollback guard: if booted from a freshly-flashed OTA image while the
|
||||||
|
// bootloader has rollback enabled, the image is PENDING_VERIFY and will be
|
||||||
|
// rolled back on the next reboot unless we mark it valid. Harmless no-op
|
||||||
|
// when rollback is disabled. Always log the running partition + state so
|
||||||
|
// we can see post-OTA boot behavior on serial.
|
||||||
|
{
|
||||||
|
const esp_partition_t* running = esp_ota_get_running_partition();
|
||||||
|
esp_ota_img_states_t state = ESP_OTA_IMG_UNDEFINED;
|
||||||
|
if (running) {
|
||||||
|
esp_ota_get_state_partition(running, &state);
|
||||||
|
Serial.printf("[BOOT] running partition '%s' (off=0x%x) state=%d fw=%s\n",
|
||||||
|
running->label, (unsigned)running->address,
|
||||||
|
(int)state, FW_VERSION);
|
||||||
|
}
|
||||||
|
if (state == ESP_OTA_IMG_PENDING_VERIFY) {
|
||||||
|
esp_err_t e = esp_ota_mark_app_valid_cancel_rollback();
|
||||||
|
Serial.printf("[BOOT] esp_ota_mark_app_valid_cancel_rollback: %s\n",
|
||||||
|
esp_err_to_name(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
event_log_init();
|
event_log_init();
|
||||||
event_log_write(EVT_BOOT, (uint16_t)esp_reset_reason(), 0);
|
event_log_write(EVT_BOOT, (uint16_t)esp_reset_reason(), 0);
|
||||||
|
|
||||||
@@ -269,6 +309,16 @@ void setup() {
|
|||||||
|
|
||||||
xTaskCreatePinnedToCore(task_camera, "cam", 8192, nullptr, 2, nullptr, 1);
|
xTaskCreatePinnedToCore(task_camera, "cam", 8192, nullptr, 2, nullptr, 1);
|
||||||
xTaskCreatePinnedToCore(task_reporter, "rep", 8192, nullptr, 1, nullptr, 0);
|
xTaskCreatePinnedToCore(task_reporter, "rep", 8192, nullptr, 1, nullptr, 0);
|
||||||
|
|
||||||
|
// static: ota_updater stores raw pointer; must outlive setup()
|
||||||
|
static String s_ota_base = String("http://") + REPORTER_API_HOST_NAME + ":" + REPORTER_API_PORT;
|
||||||
|
ota_updater_init(
|
||||||
|
s_ota_base.c_str(),
|
||||||
|
g_cfg.device_id.c_str(),
|
||||||
|
g_cfg.hmac_secret.c_str(),
|
||||||
|
g_cfg.ota_interval_s < 10 ? 10000UL : g_cfg.ota_interval_s * 1000UL
|
||||||
|
);
|
||||||
|
xTaskCreate(ota_task, "ota", 8192, nullptr, 1, nullptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
void loop() {
|
void loop() {
|
||||||
|
|||||||
44
firmware/test/test_ota/test_version.cpp
Normal file
44
firmware/test/test_ota/test_version.cpp
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
// firmware/test/test_ota/test_version.cpp
|
||||||
|
#include <unity.h>
|
||||||
|
|
||||||
|
// Pull in the function under test — include .cpp directly for native builds
|
||||||
|
// so we don't need a separate compilation unit per test.
|
||||||
|
#define NATIVE_TEST
|
||||||
|
#include "../../lib/ota_updater/ota_updater.cpp"
|
||||||
|
|
||||||
|
void setUp() {}
|
||||||
|
void tearDown() {}
|
||||||
|
|
||||||
|
void test_remote_newer_patch() {
|
||||||
|
TEST_ASSERT_TRUE(ota_version_newer("1.0.0", "1.0.1"));
|
||||||
|
}
|
||||||
|
void test_remote_newer_minor() {
|
||||||
|
TEST_ASSERT_TRUE(ota_version_newer("1.0.9", "1.1.0"));
|
||||||
|
}
|
||||||
|
void test_remote_newer_major() {
|
||||||
|
TEST_ASSERT_TRUE(ota_version_newer("0.9.9", "1.0.0"));
|
||||||
|
}
|
||||||
|
void test_same_version() {
|
||||||
|
TEST_ASSERT_FALSE(ota_version_newer("1.2.3", "1.2.3"));
|
||||||
|
}
|
||||||
|
void test_remote_older() {
|
||||||
|
TEST_ASSERT_FALSE(ota_version_newer("1.2.3", "1.2.2"));
|
||||||
|
}
|
||||||
|
void test_malformed_current() {
|
||||||
|
TEST_ASSERT_FALSE(ota_version_newer("bad", "1.0.0"));
|
||||||
|
}
|
||||||
|
void test_malformed_remote() {
|
||||||
|
TEST_ASSERT_FALSE(ota_version_newer("1.0.0", "bad"));
|
||||||
|
}
|
||||||
|
|
||||||
|
int main() {
|
||||||
|
UNITY_BEGIN();
|
||||||
|
RUN_TEST(test_remote_newer_patch);
|
||||||
|
RUN_TEST(test_remote_newer_minor);
|
||||||
|
RUN_TEST(test_remote_newer_major);
|
||||||
|
RUN_TEST(test_same_version);
|
||||||
|
RUN_TEST(test_remote_older);
|
||||||
|
RUN_TEST(test_malformed_current);
|
||||||
|
RUN_TEST(test_malformed_remote);
|
||||||
|
return UNITY_END();
|
||||||
|
}
|
||||||
62
firmware/test/test_ota_sig/test_sig_verify.cpp
Normal file
62
firmware/test/test_ota_sig/test_sig_verify.cpp
Normal file
@@ -0,0 +1,62 @@
|
|||||||
|
// firmware/test/test_ota_sig/test_sig_verify.cpp
|
||||||
|
#include <unity.h>
|
||||||
|
#include <string.h>
|
||||||
|
#define NATIVE_TEST
|
||||||
|
#include "../../lib/ota_updater/ota_updater.cpp"
|
||||||
|
|
||||||
|
// ── Test vectors generated by Python/cryptography (ECDSA P-256) ────────────
|
||||||
|
static const uint8_t TEST_PUBKEY[65] = {
|
||||||
|
0x04, 0x96, 0x18, 0x6c, 0x8b, 0xb2, 0xdf, 0xea, 0x3f, 0xe4, 0x75, 0x35, 0x0e, 0x8a, 0x3e, 0x7d,
|
||||||
|
0x49, 0x7f, 0x56, 0xb5, 0xb4, 0x1a, 0xae, 0x05, 0xa3, 0x10, 0x6f, 0x02, 0x43, 0x84, 0xb3, 0x1c,
|
||||||
|
0x1f, 0x44, 0xef, 0x08, 0x84, 0x57, 0xca, 0x6e, 0xd8, 0x19, 0x74, 0x10, 0x8d, 0x95, 0xcc, 0x8c,
|
||||||
|
0x61, 0x89, 0x56, 0xea, 0xbc, 0x0c, 0xa2, 0x54, 0xd7, 0x02, 0xf3, 0x1d, 0x67, 0x7c, 0xa5, 0xba,
|
||||||
|
0x42
|
||||||
|
};
|
||||||
|
|
||||||
|
static const uint8_t TEST_HASH[32] = {
|
||||||
|
0x0a, 0x7e, 0x5f, 0x6a, 0x4c, 0x72, 0x11, 0xb7, 0x14, 0x3f, 0x85, 0x59, 0x50, 0x61, 0x8a, 0xa1,
|
||||||
|
0xab, 0xee, 0x7b, 0x57, 0x08, 0x59, 0x56, 0x09, 0x6d, 0x18, 0xaf, 0x70, 0xe6, 0x6e, 0x6c, 0xa8
|
||||||
|
};
|
||||||
|
|
||||||
|
static const uint8_t TEST_SIG[64] = {
|
||||||
|
0x4f, 0xff, 0xc3, 0xc6, 0xd5, 0x04, 0x71, 0x37, 0x87, 0x8c, 0xe1, 0xe5, 0x79, 0xef, 0x59, 0x2a,
|
||||||
|
0x63, 0xde, 0xf6, 0x96, 0x3e, 0x8f, 0x90, 0x2f, 0x46, 0x1f, 0x1b, 0x8a, 0xd5, 0x94, 0xb8, 0x28,
|
||||||
|
0x80, 0xfa, 0xe4, 0x26, 0x14, 0xbf, 0x91, 0x54, 0xbf, 0xa6, 0x2f, 0x67, 0xf9, 0x97, 0x45, 0x3a,
|
||||||
|
0x0f, 0xdc, 0x66, 0xcd, 0x21, 0xb8, 0x91, 0xdb, 0xb9, 0xaa, 0x6b, 0x5d, 0x6c, 0xa5, 0xcb, 0x96
|
||||||
|
};
|
||||||
|
// ──────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
void setUp() {}
|
||||||
|
void tearDown() {}
|
||||||
|
|
||||||
|
void test_valid_signature_accepted() {
|
||||||
|
TEST_ASSERT_TRUE(ota_verify_signature_with_key(TEST_HASH, TEST_SIG, TEST_PUBKEY));
|
||||||
|
}
|
||||||
|
|
||||||
|
void test_corrupted_hash_rejected() {
|
||||||
|
uint8_t bad_hash[32];
|
||||||
|
memcpy(bad_hash, TEST_HASH, 32);
|
||||||
|
bad_hash[0] ^= 0xff;
|
||||||
|
TEST_ASSERT_FALSE(ota_verify_signature_with_key(bad_hash, TEST_SIG, TEST_PUBKEY));
|
||||||
|
}
|
||||||
|
|
||||||
|
void test_corrupted_signature_rejected() {
|
||||||
|
uint8_t bad_sig[64];
|
||||||
|
memcpy(bad_sig, TEST_SIG, 64);
|
||||||
|
bad_sig[0] ^= 0xff;
|
||||||
|
TEST_ASSERT_FALSE(ota_verify_signature_with_key(TEST_HASH, bad_sig, TEST_PUBKEY));
|
||||||
|
}
|
||||||
|
|
||||||
|
void test_zero_signature_rejected() {
|
||||||
|
uint8_t zero_sig[64] = {};
|
||||||
|
TEST_ASSERT_FALSE(ota_verify_signature_with_key(TEST_HASH, zero_sig, TEST_PUBKEY));
|
||||||
|
}
|
||||||
|
|
||||||
|
int main() {
|
||||||
|
UNITY_BEGIN();
|
||||||
|
RUN_TEST(test_valid_signature_accepted);
|
||||||
|
RUN_TEST(test_corrupted_hash_rejected);
|
||||||
|
RUN_TEST(test_corrupted_signature_rejected);
|
||||||
|
RUN_TEST(test_zero_signature_rejected);
|
||||||
|
return UNITY_END();
|
||||||
|
}
|
||||||
@@ -11,7 +11,7 @@ import sqlite3
|
|||||||
from typing import List
|
from typing import List
|
||||||
|
|
||||||
from fastapi import Depends
|
from fastapi import Depends
|
||||||
from pydantic import BaseModel, Field
|
from pydantic import BaseModel, Field, model_validator
|
||||||
|
|
||||||
|
|
||||||
class CameraRecord(BaseModel):
|
class CameraRecord(BaseModel):
|
||||||
@@ -20,6 +20,12 @@ class CameraRecord(BaseModel):
|
|||||||
entries: int = Field(ge=0)
|
entries: int = Field(ge=0)
|
||||||
exits: int = Field(ge=0)
|
exits: int = Field(ge=0)
|
||||||
|
|
||||||
|
@model_validator(mode="after")
|
||||||
|
def _period_order(self):
|
||||||
|
if self.period_end <= self.period_start:
|
||||||
|
raise ValueError("period_end must be strictly greater than period_start")
|
||||||
|
return self
|
||||||
|
|
||||||
|
|
||||||
class CameraEventsRequest(BaseModel):
|
class CameraEventsRequest(BaseModel):
|
||||||
location_id: str
|
location_id: str
|
||||||
|
|||||||
120
server/ota_endpoint.py
Normal file
120
server/ota_endpoint.py
Normal file
@@ -0,0 +1,120 @@
|
|||||||
|
# server/ota_endpoint.py
|
||||||
|
"""
|
||||||
|
OTA firmware update endpoints.
|
||||||
|
|
||||||
|
Deployment workflow:
|
||||||
|
1. Generate signing key (one-time):
|
||||||
|
python tools/gen_signing_key.py
|
||||||
|
→ secrets/firmware_signing_key.pem (keep offline)
|
||||||
|
→ firmware/lib/ota_updater/ota_pubkey.h (commit this)
|
||||||
|
|
||||||
|
2. Build and deploy a new firmware version:
|
||||||
|
pio run -e timercam # build
|
||||||
|
python tools/deploy_firmware.py \\
|
||||||
|
firmware/.pio/build/timercam/firmware.bin 1.2.3
|
||||||
|
→ server/firmware/ updated with current.bin, current.sig, manifest.json
|
||||||
|
|
||||||
|
3. Bump FW_VERSION in firmware/include/version.h before each release.
|
||||||
|
|
||||||
|
4. Register in server main app:
|
||||||
|
from server.ota_endpoint import router as ota_router
|
||||||
|
app.include_router(ota_router)
|
||||||
|
Also uncomment Depends(verify_device_hmac) on both route handlers
|
||||||
|
and confirm the HMAC format matches hmac.cpp:
|
||||||
|
method + "\\n" + path + "\\n" + timestamp + "\\n" + sha256_hex(body)
|
||||||
|
|
||||||
|
Note: HMAC auth is currently commented out on route handlers — must be wired
|
||||||
|
before production use. verify_device_hmac must use the same format as hmac.cpp.
|
||||||
|
"""
|
||||||
|
import base64
|
||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from fastapi import APIRouter
|
||||||
|
from fastapi.responses import FileResponse
|
||||||
|
|
||||||
|
FIRMWARE_DIR = Path(__file__).parent / "firmware"
|
||||||
|
|
||||||
|
router = APIRouter(prefix="/ota", tags=["ota"])
|
||||||
|
|
||||||
|
|
||||||
|
class FirmwareNotFoundError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_version(v: str) -> tuple:
|
||||||
|
"""Parse semver string to comparable tuple; returns (0,0,0) on malformed input."""
|
||||||
|
try:
|
||||||
|
parts = v.strip().split(".")
|
||||||
|
if len(parts) != 3:
|
||||||
|
return (0, 0, 0)
|
||||||
|
return tuple(int(x) for x in parts)
|
||||||
|
except (ValueError, AttributeError):
|
||||||
|
return (0, 0, 0)
|
||||||
|
|
||||||
|
|
||||||
|
def ota_check_impl(current_version: str, firmware_dir: Path = FIRMWARE_DIR) -> dict:
|
||||||
|
"""
|
||||||
|
Compare device's current_version against staged manifest.
|
||||||
|
Returns {"update": False} when no update is available or manifest is missing.
|
||||||
|
Returns full update payload when server version is strictly newer.
|
||||||
|
"""
|
||||||
|
manifest_path = firmware_dir / "manifest.json"
|
||||||
|
if not manifest_path.exists():
|
||||||
|
return {"update": False}
|
||||||
|
|
||||||
|
try:
|
||||||
|
manifest = json.loads(manifest_path.read_text())
|
||||||
|
version = manifest["version"]
|
||||||
|
size = manifest["size"]
|
||||||
|
sha256 = manifest["sha256"]
|
||||||
|
except (json.JSONDecodeError, KeyError):
|
||||||
|
return {"update": False}
|
||||||
|
|
||||||
|
if _parse_version(version) <= _parse_version(current_version):
|
||||||
|
return {"update": False}
|
||||||
|
|
||||||
|
sig_path = firmware_dir / "current.sig"
|
||||||
|
if not sig_path.exists():
|
||||||
|
return {"update": False}
|
||||||
|
sig_b64 = base64.b64encode(sig_path.read_bytes()).decode()
|
||||||
|
|
||||||
|
return {
|
||||||
|
"update": True,
|
||||||
|
"version": version,
|
||||||
|
"size": size,
|
||||||
|
"sha256": sha256,
|
||||||
|
"sig_b64": sig_b64,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def ota_firmware_impl(firmware_dir: Path = FIRMWARE_DIR) -> bytes:
|
||||||
|
"""
|
||||||
|
Return raw firmware binary bytes.
|
||||||
|
Raises FirmwareNotFoundError if current.bin is absent.
|
||||||
|
"""
|
||||||
|
bin_path = firmware_dir / "current.bin"
|
||||||
|
if not bin_path.exists():
|
||||||
|
raise FirmwareNotFoundError("No firmware staged")
|
||||||
|
return bin_path.read_bytes()
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/check")
|
||||||
|
async def ota_check(
|
||||||
|
version: str,
|
||||||
|
# device_id: str = Depends(verify_device_hmac), # uncomment when wiring into app
|
||||||
|
):
|
||||||
|
"""Check whether a firmware update is available for the given device version."""
|
||||||
|
return ota_check_impl(current_version=version)
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/firmware")
|
||||||
|
async def ota_firmware(
|
||||||
|
# device_id: str = Depends(verify_device_hmac), # uncomment when wiring into app
|
||||||
|
):
|
||||||
|
"""Stream the staged firmware binary to the device."""
|
||||||
|
from fastapi import HTTPException
|
||||||
|
bin_path = FIRMWARE_DIR / "current.bin"
|
||||||
|
if not bin_path.exists():
|
||||||
|
raise HTTPException(status_code=404, detail="No firmware available")
|
||||||
|
return FileResponse(bin_path, media_type="application/octet-stream")
|
||||||
@@ -98,3 +98,15 @@ def test_negative_counts_rejected():
|
|||||||
with pytest.raises(ValidationError):
|
with pytest.raises(ValidationError):
|
||||||
CameraRecord(period_start=1712000000, period_end=1712003600,
|
CameraRecord(period_start=1712000000, period_end=1712003600,
|
||||||
entries=-1, exits=0)
|
entries=-1, exits=0)
|
||||||
|
|
||||||
|
|
||||||
|
def test_inverted_period_rejected():
|
||||||
|
"""Pydantic should reject period_end <= period_start."""
|
||||||
|
from pydantic import ValidationError
|
||||||
|
from server.camera_endpoint import CameraRecord
|
||||||
|
with pytest.raises(ValidationError):
|
||||||
|
CameraRecord(period_start=1712003600, period_end=1712003600,
|
||||||
|
entries=0, exits=0)
|
||||||
|
with pytest.raises(ValidationError):
|
||||||
|
CameraRecord(period_start=1712003600, period_end=1712000000,
|
||||||
|
entries=0, exits=0)
|
||||||
|
|||||||
83
server/test_ota_endpoint.py
Normal file
83
server/test_ota_endpoint.py
Normal file
@@ -0,0 +1,83 @@
|
|||||||
|
# server/test_ota_endpoint.py
|
||||||
|
import base64
|
||||||
|
import hashlib
|
||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from server.ota_endpoint import ota_check_impl, ota_firmware_impl
|
||||||
|
|
||||||
|
|
||||||
|
def write_firmware(firmware_dir: Path, version: str, data: bytes = b"fake_fw") -> None:
|
||||||
|
sig = bytes(64) # zero sig (not validated server-side)
|
||||||
|
manifest = {
|
||||||
|
"version": version,
|
||||||
|
"size": len(data),
|
||||||
|
"sha256": hashlib.sha256(data).hexdigest(),
|
||||||
|
}
|
||||||
|
(firmware_dir / "current.bin").write_bytes(data)
|
||||||
|
(firmware_dir / "current.sig").write_bytes(sig)
|
||||||
|
(firmware_dir / "manifest.json").write_text(json.dumps(manifest))
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def patch_firmware_dir(tmp_path, monkeypatch):
|
||||||
|
import server.ota_endpoint as mod
|
||||||
|
monkeypatch.setattr(mod, "FIRMWARE_DIR", tmp_path)
|
||||||
|
yield tmp_path
|
||||||
|
|
||||||
|
|
||||||
|
def test_check_no_update_same_version(tmp_path):
|
||||||
|
write_firmware(tmp_path, "1.0.0")
|
||||||
|
result = ota_check_impl(current_version="1.0.0", firmware_dir=tmp_path)
|
||||||
|
assert result["update"] is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_check_no_update_newer_local(tmp_path):
|
||||||
|
write_firmware(tmp_path, "1.0.0")
|
||||||
|
result = ota_check_impl(current_version="1.1.0", firmware_dir=tmp_path)
|
||||||
|
assert result["update"] is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_check_update_available(tmp_path):
|
||||||
|
write_firmware(tmp_path, "1.1.0", data=b"new firmware")
|
||||||
|
result = ota_check_impl(current_version="1.0.0", firmware_dir=tmp_path)
|
||||||
|
assert result["update"] is True
|
||||||
|
assert result["version"] == "1.1.0"
|
||||||
|
assert result["size"] == len(b"new firmware")
|
||||||
|
assert "sha256" in result
|
||||||
|
assert "sig_b64" in result
|
||||||
|
sig_bytes = base64.b64decode(result["sig_b64"])
|
||||||
|
assert len(sig_bytes) == 64
|
||||||
|
|
||||||
|
|
||||||
|
def test_check_no_manifest(tmp_path):
|
||||||
|
result = ota_check_impl(current_version="1.0.0", firmware_dir=tmp_path)
|
||||||
|
assert result["update"] is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_firmware_endpoint_returns_binary(tmp_path):
|
||||||
|
fw_data = b"firmware binary content"
|
||||||
|
write_firmware(tmp_path, "1.1.0", data=fw_data)
|
||||||
|
content = ota_firmware_impl(firmware_dir=tmp_path)
|
||||||
|
assert content == fw_data
|
||||||
|
|
||||||
|
|
||||||
|
def test_firmware_endpoint_missing_raises(tmp_path):
|
||||||
|
import server.ota_endpoint as mod
|
||||||
|
with pytest.raises(mod.FirmwareNotFoundError):
|
||||||
|
ota_firmware_impl(firmware_dir=tmp_path)
|
||||||
|
|
||||||
|
|
||||||
|
def test_check_malformed_manifest(tmp_path):
|
||||||
|
(tmp_path / "manifest.json").write_text("not valid json{{{")
|
||||||
|
result = ota_check_impl(current_version="1.0.0", firmware_dir=tmp_path)
|
||||||
|
assert result["update"] is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_check_wrong_arity_version_no_update(tmp_path):
|
||||||
|
write_firmware(tmp_path, "1.2") # wrong arity server version
|
||||||
|
result = ota_check_impl(current_version="1.0.0", firmware_dir=tmp_path)
|
||||||
|
# server "1.2" → (0,0,0) ≤ client (1,0,0) → no update
|
||||||
|
assert result["update"] is False
|
||||||
0
tools/__init__.py
Normal file
0
tools/__init__.py
Normal file
43
tools/deploy_firmware.py
Normal file
43
tools/deploy_firmware.py
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Sign firmware and stage it for the server OTA endpoint."""
|
||||||
|
import argparse, hashlib, json
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from sign_firmware import sign_firmware
|
||||||
|
|
||||||
|
|
||||||
|
def deploy(firmware_path: Path, key_path: Path,
|
||||||
|
version: str, output_dir: Path) -> None:
|
||||||
|
output_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
data = firmware_path.read_bytes()
|
||||||
|
sig = sign_firmware(firmware_path, key_path)
|
||||||
|
|
||||||
|
(output_dir / "current.bin").write_bytes(data)
|
||||||
|
(output_dir / "current.sig").write_bytes(sig)
|
||||||
|
(output_dir / "manifest.json").write_text(json.dumps({
|
||||||
|
"version": version,
|
||||||
|
"size": len(data),
|
||||||
|
"sha256": hashlib.sha256(data).hexdigest(),
|
||||||
|
}, indent=2))
|
||||||
|
|
||||||
|
print(f"Deployed {firmware_path.name} v{version} → {output_dir}/")
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
p = argparse.ArgumentParser(description=__doc__)
|
||||||
|
p.add_argument("firmware", help="Path to .bin")
|
||||||
|
p.add_argument("version", help="Version string, e.g. 1.2.3")
|
||||||
|
p.add_argument("--key", default="secrets/firmware_signing_key.pem")
|
||||||
|
p.add_argument("--out-dir", default="server/firmware")
|
||||||
|
args = p.parse_args()
|
||||||
|
deploy(
|
||||||
|
firmware_path=Path(args.firmware),
|
||||||
|
key_path=Path(args.key),
|
||||||
|
version=args.version,
|
||||||
|
output_dir=Path(args.out_dir),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -28,6 +28,25 @@ NVS_NAMESPACE = "doorcounter"
|
|||||||
NVS_PARTITION_OFFSET = "0x9000"
|
NVS_PARTITION_OFFSET = "0x9000"
|
||||||
NVS_PARTITION_SIZE = "0x5000" # matches firmware partition table (20KB)
|
NVS_PARTITION_SIZE = "0x5000" # matches firmware partition table (20KB)
|
||||||
|
|
||||||
|
# Characters that would change the field/row structure of the NVS-CSV format
|
||||||
|
# (key,type,encoding,value). A value containing any of these would either
|
||||||
|
# split into more fields or add rows, silently provisioning the wrong keys.
|
||||||
|
_CSV_FORBIDDEN = (",", '"', "\n", "\r")
|
||||||
|
|
||||||
|
|
||||||
|
def _reject_csv_metacharacters(name, value):
|
||||||
|
"""Exit with an error if value contains a character that would corrupt
|
||||||
|
the NVS CSV. Used for operator-supplied strings (device id, location id,
|
||||||
|
WiFi credentials)."""
|
||||||
|
for c in _CSV_FORBIDDEN:
|
||||||
|
if c in value:
|
||||||
|
print(
|
||||||
|
f"Error: --{name} contains forbidden character {c!r}; "
|
||||||
|
f"this would corrupt the NVS partition CSV.",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
def build_nvs_csv(device_id, location_id, hmac_secret,
|
def build_nvs_csv(device_id, location_id, hmac_secret,
|
||||||
wifi_ssid=None, wifi_pass=None, line_offset=50):
|
wifi_ssid=None, wifi_pass=None, line_offset=50):
|
||||||
@@ -78,6 +97,13 @@ def main():
|
|||||||
print("Error: --line-offset must be 0-100", file=sys.stderr)
|
print("Error: --line-offset must be 0-100", file=sys.stderr)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
|
_reject_csv_metacharacters("device-id", args.device_id)
|
||||||
|
_reject_csv_metacharacters("location-id", args.location_id)
|
||||||
|
if args.wifi_ssid is not None:
|
||||||
|
_reject_csv_metacharacters("wifi-ssid", args.wifi_ssid)
|
||||||
|
if args.wifi_password is not None:
|
||||||
|
_reject_csv_metacharacters("wifi-password", args.wifi_password)
|
||||||
|
|
||||||
with tempfile.TemporaryDirectory() as tmp:
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
csv_path = os.path.join(tmp, "nvs.csv")
|
csv_path = os.path.join(tmp, "nvs.csv")
|
||||||
bin_path = os.path.join(tmp, "nvs.bin")
|
bin_path = os.path.join(tmp, "nvs.bin")
|
||||||
|
|||||||
57
tools/gen_signing_key.py
Normal file
57
tools/gen_signing_key.py
Normal file
@@ -0,0 +1,57 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Generate ECDSA P-256 signing keypair for OTA firmware verification."""
|
||||||
|
import argparse
|
||||||
|
import os
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import ec
|
||||||
|
from cryptography.hazmat.primitives import serialization
|
||||||
|
|
||||||
|
|
||||||
|
def generate(secrets_dir: Path, header_out: Path) -> None:
|
||||||
|
secrets_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
key = ec.generate_private_key(ec.SECP256R1())
|
||||||
|
|
||||||
|
pem = key.private_bytes(
|
||||||
|
encoding=serialization.Encoding.PEM,
|
||||||
|
format=serialization.PrivateFormat.PKCS8,
|
||||||
|
encryption_algorithm=serialization.NoEncryption(),
|
||||||
|
)
|
||||||
|
key_path = secrets_dir / "firmware_signing_key.pem"
|
||||||
|
key_path.write_bytes(pem)
|
||||||
|
key_path.chmod(0o600)
|
||||||
|
|
||||||
|
pub_bytes = key.public_key().public_bytes(
|
||||||
|
encoding=serialization.Encoding.X962,
|
||||||
|
format=serialization.PublicFormat.UncompressedPoint,
|
||||||
|
)
|
||||||
|
assert len(pub_bytes) == 65 and pub_bytes[0] == 0x04
|
||||||
|
|
||||||
|
hex_values = ", ".join(f"0x{b:02x}" for b in pub_bytes)
|
||||||
|
header = (
|
||||||
|
"#pragma once\n"
|
||||||
|
"// Auto-generated by tools/gen_signing_key.py — DO NOT EDIT\n"
|
||||||
|
"// ECDSA P-256 public key, uncompressed X9.62 (04 || X || Y)\n"
|
||||||
|
f"static const uint8_t kOtaPublicKey[65] = {{{hex_values}}};\n"
|
||||||
|
)
|
||||||
|
header_out.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
header_out.write_text(header)
|
||||||
|
|
||||||
|
print(f"Private key → {key_path}")
|
||||||
|
print(f"Public key header → {header_out}")
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
p = argparse.ArgumentParser(description=__doc__)
|
||||||
|
p.add_argument("--secrets-dir", default="secrets",
|
||||||
|
help="Directory for private key (default: secrets/)")
|
||||||
|
p.add_argument("--header-out",
|
||||||
|
default="firmware/lib/ota_updater/ota_pubkey.h",
|
||||||
|
help="Path to write the C header")
|
||||||
|
args = p.parse_args()
|
||||||
|
generate(Path(args.secrets_dir), Path(args.header_out))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
52
tools/sign_firmware.py
Normal file
52
tools/sign_firmware.py
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Sign a firmware binary with ECDSA P-256. Outputs a raw 64-byte r||s .sig file."""
|
||||||
|
import argparse
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import ec
|
||||||
|
from cryptography.hazmat.primitives import hashes, serialization
|
||||||
|
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature
|
||||||
|
|
||||||
|
|
||||||
|
def load_private_key(key_path: Path) -> ec.EllipticCurvePrivateKey:
|
||||||
|
key = serialization.load_pem_private_key(key_path.read_bytes(), password=None)
|
||||||
|
if not isinstance(key, ec.EllipticCurvePrivateKey):
|
||||||
|
raise ValueError("Key must be an EC private key")
|
||||||
|
if not isinstance(key.curve, ec.SECP256R1):
|
||||||
|
raise ValueError(f"Key must use SECP256R1 curve, got {key.curve.name}")
|
||||||
|
return key
|
||||||
|
|
||||||
|
|
||||||
|
def sign_firmware(firmware_path: Path, key_path: Path) -> bytes:
|
||||||
|
key = load_private_key(key_path)
|
||||||
|
data = firmware_path.read_bytes()
|
||||||
|
sig_der = key.sign(data, ec.ECDSA(hashes.SHA256()))
|
||||||
|
r, s = decode_dss_signature(sig_der)
|
||||||
|
# Returns raw 64-byte r‖s (not DER) — mbedtls_ecdsa_verify expects this layout
|
||||||
|
return r.to_bytes(32, 'big') + s.to_bytes(32, 'big')
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
p = argparse.ArgumentParser(description=__doc__)
|
||||||
|
p.add_argument("firmware", help="Path to firmware .bin")
|
||||||
|
p.add_argument("--key", default="secrets/firmware_signing_key.pem",
|
||||||
|
help="Path to PEM private key")
|
||||||
|
p.add_argument("--out", help="Output .sig path (default: firmware.bin.sig)")
|
||||||
|
args = p.parse_args()
|
||||||
|
|
||||||
|
firmware = Path(args.firmware)
|
||||||
|
key_path = Path(args.key)
|
||||||
|
out_path = Path(args.out) if args.out else firmware.with_suffix(".bin.sig")
|
||||||
|
|
||||||
|
try:
|
||||||
|
sig = sign_firmware(firmware, key_path)
|
||||||
|
except (FileNotFoundError, ValueError) as e:
|
||||||
|
print(f"Error: {e}", file=sys.stderr)
|
||||||
|
raise SystemExit(1)
|
||||||
|
out_path.write_bytes(sig)
|
||||||
|
print(f"Signed {firmware.name} → {out_path} ({len(sig)} bytes)")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
63
tools/test_deploy_firmware.py
Normal file
63
tools/test_deploy_firmware.py
Normal file
@@ -0,0 +1,63 @@
|
|||||||
|
import json, hashlib, sys
|
||||||
|
from pathlib import Path
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
REPO_ROOT = Path(__file__).parent.parent
|
||||||
|
sys.path.insert(0, str(REPO_ROOT / "tools"))
|
||||||
|
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import ec
|
||||||
|
from cryptography.hazmat.primitives import serialization
|
||||||
|
|
||||||
|
from deploy_firmware import deploy
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
def key_pem(tmp_path):
|
||||||
|
key = ec.generate_private_key(ec.SECP256R1())
|
||||||
|
pem_path = tmp_path / "key.pem"
|
||||||
|
pem_path.write_bytes(key.private_bytes(
|
||||||
|
serialization.Encoding.PEM,
|
||||||
|
serialization.PrivateFormat.PKCS8,
|
||||||
|
serialization.NoEncryption(),
|
||||||
|
))
|
||||||
|
return pem_path
|
||||||
|
|
||||||
|
|
||||||
|
def test_deploy_writes_all_artifacts(tmp_path, key_pem):
|
||||||
|
firmware = tmp_path / "firmware.bin"
|
||||||
|
firmware.write_bytes(b"fake firmware" * 200)
|
||||||
|
out_dir = tmp_path / "server_firmware"
|
||||||
|
|
||||||
|
deploy(firmware_path=firmware, key_path=key_pem,
|
||||||
|
version="1.2.3", output_dir=out_dir)
|
||||||
|
|
||||||
|
assert (out_dir / "current.bin").exists()
|
||||||
|
assert (out_dir / "current.sig").exists()
|
||||||
|
assert (out_dir / "manifest.json").exists()
|
||||||
|
|
||||||
|
|
||||||
|
def test_manifest_contents(tmp_path, key_pem):
|
||||||
|
data = b"firmware payload"
|
||||||
|
firmware = tmp_path / "fw.bin"
|
||||||
|
firmware.write_bytes(data)
|
||||||
|
out_dir = tmp_path / "out"
|
||||||
|
|
||||||
|
deploy(firmware_path=firmware, key_path=key_pem,
|
||||||
|
version="2.0.1", output_dir=out_dir)
|
||||||
|
|
||||||
|
manifest = json.loads((out_dir / "manifest.json").read_text())
|
||||||
|
assert manifest["version"] == "2.0.1"
|
||||||
|
assert manifest["size"] == len(data)
|
||||||
|
assert manifest["sha256"] == hashlib.sha256(data).hexdigest()
|
||||||
|
|
||||||
|
|
||||||
|
def test_signature_is_64_bytes(tmp_path, key_pem):
|
||||||
|
firmware = tmp_path / "fw.bin"
|
||||||
|
firmware.write_bytes(b"fw")
|
||||||
|
out_dir = tmp_path / "out"
|
||||||
|
|
||||||
|
deploy(firmware_path=firmware, key_path=key_pem,
|
||||||
|
version="1.0.0", output_dir=out_dir)
|
||||||
|
|
||||||
|
sig = (out_dir / "current.sig").read_bytes()
|
||||||
|
assert len(sig) == 64
|
||||||
17
tools/test_flash_device.py
Normal file
17
tools/test_flash_device.py
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
import pytest
|
||||||
|
|
||||||
|
from tools.flash_device import _reject_csv_metacharacters
|
||||||
|
|
||||||
|
|
||||||
|
def test_clean_value_accepted():
|
||||||
|
"""A value with no metacharacters should pass without exiting."""
|
||||||
|
_reject_csv_metacharacters("device-id", "dc-0042")
|
||||||
|
_reject_csv_metacharacters("location-id", "retailer-123")
|
||||||
|
_reject_csv_metacharacters("wifi-ssid", "StoreWiFi-2.4GHz")
|
||||||
|
_reject_csv_metacharacters("wifi-password", "p@ssw0rd!~#$%^&*()_+-=:;<>?/")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("bad", ["Home,Network", 'pa"ss', "ssid\nfoo", "name\rbar"])
|
||||||
|
def test_metacharacter_rejected(bad):
|
||||||
|
with pytest.raises(SystemExit):
|
||||||
|
_reject_csv_metacharacters("wifi-ssid", bad)
|
||||||
49
tools/test_gen_signing_key.py
Normal file
49
tools/test_gen_signing_key.py
Normal file
@@ -0,0 +1,49 @@
|
|||||||
|
import os, subprocess, sys, tempfile
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
REPO_ROOT = Path(__file__).parent.parent
|
||||||
|
|
||||||
|
def run_gen(secrets_dir, header_path):
|
||||||
|
env = os.environ.copy()
|
||||||
|
result = subprocess.run(
|
||||||
|
[sys.executable, str(REPO_ROOT / "tools/gen_signing_key.py"),
|
||||||
|
"--secrets-dir", str(secrets_dir),
|
||||||
|
"--header-out", str(header_path)],
|
||||||
|
capture_output=True, text=True, env=env
|
||||||
|
)
|
||||||
|
assert result.returncode == 0, result.stderr
|
||||||
|
return result
|
||||||
|
|
||||||
|
def test_private_key_created():
|
||||||
|
with tempfile.TemporaryDirectory() as d:
|
||||||
|
header = Path(d) / "ota_pubkey.h"
|
||||||
|
run_gen(d, header)
|
||||||
|
pem = Path(d) / "firmware_signing_key.pem"
|
||||||
|
assert pem.exists()
|
||||||
|
content = pem.read_text()
|
||||||
|
assert "BEGIN PRIVATE KEY" in content
|
||||||
|
|
||||||
|
def test_header_created():
|
||||||
|
with tempfile.TemporaryDirectory() as d:
|
||||||
|
header = Path(d) / "ota_pubkey.h"
|
||||||
|
run_gen(d, header)
|
||||||
|
assert header.exists()
|
||||||
|
content = header.read_text()
|
||||||
|
assert "kOtaPublicKey" in content
|
||||||
|
assert "0x04" in content # uncompressed point prefix
|
||||||
|
assert "[65]" in content
|
||||||
|
|
||||||
|
def test_public_key_is_valid_p256_point():
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import ec
|
||||||
|
from cryptography.hazmat.primitives import serialization
|
||||||
|
with tempfile.TemporaryDirectory() as d:
|
||||||
|
header = Path(d) / "ota_pubkey.h"
|
||||||
|
run_gen(d, header)
|
||||||
|
pem = (Path(d) / "firmware_signing_key.pem").read_bytes()
|
||||||
|
priv = serialization.load_pem_private_key(pem, password=None)
|
||||||
|
pub_bytes = priv.public_key().public_bytes(
|
||||||
|
serialization.Encoding.X962,
|
||||||
|
serialization.PublicFormat.UncompressedPoint,
|
||||||
|
)
|
||||||
|
assert len(pub_bytes) == 65
|
||||||
|
assert pub_bytes[0] == 0x04
|
||||||
64
tools/test_sign_firmware.py
Normal file
64
tools/test_sign_firmware.py
Normal file
@@ -0,0 +1,64 @@
|
|||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from cryptography.exceptions import InvalidSignature
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import ec
|
||||||
|
from cryptography.hazmat.primitives import hashes, serialization
|
||||||
|
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature
|
||||||
|
|
||||||
|
REPO_ROOT = Path(__file__).parent.parent
|
||||||
|
sys.path.insert(0, str(REPO_ROOT / "tools"))
|
||||||
|
from sign_firmware import sign_firmware, load_private_key
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
def keypair(tmp_path):
|
||||||
|
key = ec.generate_private_key(ec.SECP256R1())
|
||||||
|
pem_path = tmp_path / "key.pem"
|
||||||
|
pem_path.write_bytes(key.private_bytes(
|
||||||
|
serialization.Encoding.PEM,
|
||||||
|
serialization.PrivateFormat.PKCS8,
|
||||||
|
serialization.NoEncryption(),
|
||||||
|
))
|
||||||
|
return key, pem_path
|
||||||
|
|
||||||
|
|
||||||
|
def test_signature_is_64_bytes(keypair, tmp_path):
|
||||||
|
key, key_path = keypair
|
||||||
|
firmware = tmp_path / "fw.bin"
|
||||||
|
firmware.write_bytes(b"fake firmware data" * 100)
|
||||||
|
sig = sign_firmware(firmware, key_path)
|
||||||
|
assert len(sig) == 64
|
||||||
|
|
||||||
|
|
||||||
|
def test_signature_verifies(keypair, tmp_path):
|
||||||
|
key, key_path = keypair
|
||||||
|
data = b"test firmware payload"
|
||||||
|
firmware = tmp_path / "fw.bin"
|
||||||
|
firmware.write_bytes(data)
|
||||||
|
sig_raw = sign_firmware(firmware, key_path)
|
||||||
|
|
||||||
|
# Convert raw r||s back to DER for cryptography lib verify
|
||||||
|
r = int.from_bytes(sig_raw[:32], 'big')
|
||||||
|
s = int.from_bytes(sig_raw[32:], 'big')
|
||||||
|
from cryptography.hazmat.primitives.asymmetric.utils import encode_dss_signature
|
||||||
|
sig_der = encode_dss_signature(r, s)
|
||||||
|
|
||||||
|
key.public_key().verify(sig_der, data, ec.ECDSA(hashes.SHA256()))
|
||||||
|
|
||||||
|
|
||||||
|
def test_wrong_key_fails_verification(keypair, tmp_path):
|
||||||
|
key, key_path = keypair
|
||||||
|
firmware = tmp_path / "fw.bin"
|
||||||
|
firmware.write_bytes(b"firmware")
|
||||||
|
sig_raw = sign_firmware(firmware, key_path)
|
||||||
|
|
||||||
|
other_key = ec.generate_private_key(ec.SECP256R1())
|
||||||
|
r = int.from_bytes(sig_raw[:32], 'big')
|
||||||
|
s = int.from_bytes(sig_raw[32:], 'big')
|
||||||
|
from cryptography.hazmat.primitives.asymmetric.utils import encode_dss_signature
|
||||||
|
sig_der = encode_dss_signature(r, s)
|
||||||
|
|
||||||
|
with pytest.raises(InvalidSignature):
|
||||||
|
other_key.public_key().verify(sig_der, b"firmware", ec.ECDSA(hashes.SHA256()))
|
||||||
Reference in New Issue
Block a user