Compare commits
14 Commits
56fc58b843
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| d2c2d97fb7 | |||
| 5ec678dfa3 | |||
| 5cf122b922 | |||
| a21dcfa349 | |||
| 66e6808e13 | |||
| 8b1fd10db7 | |||
| f37e0d6b07 | |||
| 81bcc12f2f | |||
| d9a242a5fa | |||
| 87b30a64b2 | |||
| 031426e364 | |||
| 437f73739f | |||
| 21a3c646aa | |||
| 81dc96b100 |
2
.gitignore
vendored
2
.gitignore
vendored
@@ -7,3 +7,5 @@ firmware/.pio/
|
||||
*.log
|
||||
*secret*
|
||||
__pycache__/
|
||||
secrets/
|
||||
server/firmware/
|
||||
|
||||
88
docs/ota-deployment-status.md
Normal file
88
docs/ota-deployment-status.md
Normal file
@@ -0,0 +1,88 @@
|
||||
# OTA Deployment — Status
|
||||
|
||||
## Current state (2026-05-14)
|
||||
|
||||
**End-to-end OTA verified working on `dc-0002`.** Device polled `engagement-api-1`, received a signed manifest, downloaded and verified firmware 1.0.1, set the alternate boot partition, rebooted, and came up reporting `fw=1.0.1`.
|
||||
|
||||
## What's deployed
|
||||
|
||||
- **Branch `feat/pull-ota-code-signing`** merged to `main` (13 commits, 17 new files, 936 LOC).
|
||||
- **Signing toolchain**: `tools/gen_signing_key.py`, `tools/sign_firmware.py`, `tools/deploy_firmware.py`.
|
||||
- **Firmware OTA library**: `firmware/lib/ota_updater/`.
|
||||
- **Signing key**: `secrets/firmware_signing_key.pem` (gitignored). Public key committed at `firmware/lib/ota_updater/ota_pubkey.h`.
|
||||
- **Live OTA handler**: served by `engagement-api-1` Docker service (source not in this repo). The stub at `server/ota_endpoint.py` is unwired and not the one responding to devices.
|
||||
- **Configurable poll interval** via NVS key `ota_interval`. Provision with `flash_device.py --ota-interval-seconds N`. Min 10 s, default 21600 (6 h).
|
||||
|
||||
## Issues resolved
|
||||
|
||||
### 1. HMAC format mismatch (resolved 2026-05-13)
|
||||
Firmware OTA updater was using `X-HMAC-Signature` header + `millis()`-derived timestamp; the reporter component used `X-Signature` + `time(nullptr)`. Server expected the reporter format. Fixed by aligning the OTA updater to the same canonical scheme as the reporter (`firmware/lib/ota_updater/ota_updater.cpp` `add_hmac_headers`).
|
||||
|
||||
### 2. `/ota/check` JSON schema mismatch (resolved 2026-05-14)
|
||||
Server was emitting `{update_available, sha256, url}` but firmware reads `{update, size, sig_b64}`. Device silently decided "up to date" every poll because `doc["update"]` defaulted to `false`. Fixed server-side: the `/ota/check` response now also includes the fields the firmware needs. Firmware schema remains the source of truth.
|
||||
|
||||
### 3. Signed firmware artifact pipeline (resolved 2026-05-14)
|
||||
Deploy flow now bumps `FW_VERSION` → builds → copies `.pio/build/timercam/firmware.bin` to `firmware-<version>.bin` → signs with `tools/sign_firmware.py` → SCPs both `.bin` and `.bin.sig` to `root@nginx:/root/engagement-api/firmware/`. Server team updates `firmware_releases.sha256` to match the new binary.
|
||||
|
||||
**Gotcha:** the `.bin` and `.sig` must always be deployed together. The signature is over the bytes; replacing one without the other puts the server in an inconsistent state and devices will reject the update with `SIGNATURE INVALID`.
|
||||
|
||||
## Hardening added this session
|
||||
|
||||
### Firmware logging (`firmware/lib/ota_updater/ota_updater.cpp`, `firmware/src/main.cpp`)
|
||||
The previous `log_i/w/e` macros were silenced by the default `CORE_DEBUG_LEVEL`. Replaced with `Serial.printf` so output appears regardless of log level. Now logs at every step:
|
||||
- `[OTA] task started, interval=N ms`
|
||||
- Per-tick WiFi status
|
||||
- Full check URL + HMAC header preview (device id, ts, sig prefix)
|
||||
- HTTP response code + error body on non-200
|
||||
- JSON parse errors
|
||||
- "Up to date" decision
|
||||
- Partition labels and offsets (running + target)
|
||||
- Per-128 KB download progress
|
||||
- Total bytes + elapsed ms
|
||||
- Computed sha256 of the downloaded image (compare against server `X-SHA256`)
|
||||
- Signature verify result
|
||||
- `esp_ota_end` / `esp_ota_set_boot_partition` errors by name
|
||||
- 500 ms `Serial.flush()` + `delay()` before `esp_restart()` so the final log line escapes the UART
|
||||
|
||||
### Boot-time partition state (`firmware/src/main.cpp`)
|
||||
Logs `running partition '<label>' (off=0x…) state=N fw=…` at every boot. If `state == ESP_OTA_IMG_PENDING_VERIFY` (3), calls `esp_ota_mark_app_valid_cancel_rollback()` to prevent the bootloader from reverting on the next reboot. Harmless no-op when rollback isn't enabled, but eliminates a class of silent OTA failures.
|
||||
|
||||
### `esp_ota_write` return value (`firmware/lib/ota_updater/ota_updater.cpp`)
|
||||
Previously ignored — a failed write would silently corrupt the new partition and the device would still try to boot from it. Now checked, aborts the OTA cleanly, and logs the failing offset.
|
||||
|
||||
### Partition size pre-check
|
||||
Reject the update before `esp_ota_begin` if `expected_size > target->size`.
|
||||
|
||||
## Verifying a deployment
|
||||
|
||||
After a server push, watch the device's serial output on the next OTA tick:
|
||||
|
||||
```
|
||||
[OTA] tick: WiFi connected, running check
|
||||
[OTA] check → GET http://logs.research.bike:80/ota/check?version=X.Y.Z
|
||||
[OTA] check response: HTTP 200
|
||||
[OTA] Update: X.Y.Z → A.B.C (N bytes)
|
||||
[OTA] running='app0' (off=…), target='app1' (off=…)
|
||||
[OTA] progress: N/N bytes
|
||||
[OTA] sha256(image)=<hex> ← must match server X-SHA256
|
||||
[OTA] signature OK
|
||||
[OTA] boot partition set to 'app1' — rebooting in 500 ms
|
||||
```
|
||||
|
||||
Then on reboot:
|
||||
|
||||
```
|
||||
[BOOT] running partition 'app1' (off=…) state=N fw=A.B.C
|
||||
```
|
||||
|
||||
The `fw=A.B.C` line is the success signal — it reflects the `FW_VERSION` macro baked into the freshly-booted image, not just what the device claims to be running.
|
||||
|
||||
## Quick reference
|
||||
|
||||
- Plan: `docs/superpowers/plans/2026-05-10-pull-ota-code-signing.md`
|
||||
- Firmware version: `firmware/include/version.h`
|
||||
- OTA library: `firmware/lib/ota_updater/`
|
||||
- HMAC implementation: `firmware/lib/hmac/hmac.cpp`
|
||||
- Provisioning tool: `tools/flash_device.py`
|
||||
- Signing tools: `tools/gen_signing_key.py`, `tools/sign_firmware.py`, `tools/deploy_firmware.py`
|
||||
- Server deploy path: `root@nginx:/root/engagement-api/firmware/` (per server team runbook)
|
||||
3
firmware/include/version.h
Normal file
3
firmware/include/version.h
Normal file
@@ -0,0 +1,3 @@
|
||||
#pragma once
|
||||
// Format: MAJOR.MINOR.PATCH (SemVer) — OTA version compare uses sscanf("%d.%d.%d")
|
||||
#define FW_VERSION "1.0.1"
|
||||
6
firmware/lib/ota_updater/library.json
Normal file
6
firmware/lib/ota_updater/library.json
Normal file
@@ -0,0 +1,6 @@
|
||||
{
|
||||
"name": "ota_updater",
|
||||
"build": {
|
||||
"flags": ["-I$PROJECT_INCLUDE_DIR"]
|
||||
}
|
||||
}
|
||||
4
firmware/lib/ota_updater/ota_pubkey.h
Normal file
4
firmware/lib/ota_updater/ota_pubkey.h
Normal file
@@ -0,0 +1,4 @@
|
||||
#pragma once
|
||||
// Auto-generated by tools/gen_signing_key.py — DO NOT EDIT
|
||||
// ECDSA P-256 public key, uncompressed X9.62 (04 || X || Y)
|
||||
static const uint8_t kOtaPublicKey[65] = {0x04, 0x1c, 0x92, 0x43, 0x23, 0xe9, 0xac, 0xd1, 0xe8, 0x05, 0x32, 0x49, 0x39, 0x12, 0x95, 0xb2, 0x0a, 0x3e, 0xfb, 0x9d, 0xdf, 0xee, 0xd1, 0x98, 0x87, 0x97, 0xa3, 0xb8, 0xcb, 0x2b, 0xa6, 0x06, 0xe0, 0x83, 0x32, 0x71, 0xd2, 0x5f, 0x80, 0x40, 0x68, 0xcd, 0x00, 0xe5, 0x0e, 0xba, 0x13, 0xf6, 0x97, 0x43, 0x6f, 0xe6, 0x4f, 0xd0, 0x95, 0x53, 0x0e, 0xd7, 0x9a, 0x8a, 0x2e, 0x25, 0x52, 0xb4, 0xaf};
|
||||
319
firmware/lib/ota_updater/ota_updater.cpp
Normal file
319
firmware/lib/ota_updater/ota_updater.cpp
Normal file
@@ -0,0 +1,319 @@
|
||||
// firmware/lib/ota_updater/ota_updater.cpp
|
||||
#include "ota_updater.h"
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include <mbedtls/ecdsa.h>
|
||||
#include <mbedtls/ecp.h>
|
||||
#include <mbedtls/bignum.h>
|
||||
|
||||
// ── version comparison ─────────────────────────────────────────────────────
|
||||
|
||||
bool ota_version_newer(const char* current, const char* remote) {
|
||||
int ca = 0, cb = 0, cc = 0;
|
||||
int ra = 0, rb = 0, rc = 0;
|
||||
if (sscanf(current, "%d.%d.%d", &ca, &cb, &cc) != 3) return false;
|
||||
if (sscanf(remote, "%d.%d.%d", &ra, &rb, &rc) != 3) return false;
|
||||
if (ra != ca) return ra > ca;
|
||||
if (rb != cb) return rb > cb;
|
||||
return rc > cc;
|
||||
}
|
||||
|
||||
// ── signature verification ─────────────────────────────────────────────────
|
||||
bool ota_verify_signature_with_key(const uint8_t hash32[32], const uint8_t sig64[64],
|
||||
const uint8_t pubkey65[65]) {
|
||||
mbedtls_ecp_group grp;
|
||||
mbedtls_ecp_point Q;
|
||||
mbedtls_mpi r, s;
|
||||
|
||||
mbedtls_ecp_group_init(&grp);
|
||||
mbedtls_ecp_point_init(&Q);
|
||||
mbedtls_mpi_init(&r);
|
||||
mbedtls_mpi_init(&s);
|
||||
|
||||
bool ok = false;
|
||||
if (mbedtls_ecp_group_load(&grp, MBEDTLS_ECP_DP_SECP256R1) == 0 &&
|
||||
mbedtls_ecp_point_read_binary(&grp, &Q, pubkey65, 65) == 0 &&
|
||||
mbedtls_mpi_read_binary(&r, sig64, 32) == 0 &&
|
||||
mbedtls_mpi_read_binary(&s, sig64 + 32, 32) == 0 &&
|
||||
mbedtls_ecdsa_verify(&grp, hash32, 32, &Q, &r, &s) == 0) {
|
||||
ok = true;
|
||||
}
|
||||
|
||||
mbedtls_ecp_group_free(&grp);
|
||||
mbedtls_ecp_point_free(&Q);
|
||||
mbedtls_mpi_free(&r);
|
||||
mbedtls_mpi_free(&s);
|
||||
return ok;
|
||||
}
|
||||
|
||||
// ── device-only code ───────────────────────────────────────────────────────
|
||||
#ifndef NATIVE_TEST
|
||||
|
||||
#include <Arduino.h>
|
||||
#include <time.h>
|
||||
#include <HTTPClient.h>
|
||||
#include <WiFi.h>
|
||||
#include <ArduinoJson.h>
|
||||
#include <esp_ota_ops.h>
|
||||
#include <mbedtls/sha256.h>
|
||||
#include <mbedtls/base64.h>
|
||||
#include "hmac.h"
|
||||
#include "ota_pubkey.h"
|
||||
#include "version.h"
|
||||
|
||||
bool ota_verify_signature(const uint8_t hash32[32], const uint8_t sig64[64]) {
|
||||
return ota_verify_signature_with_key(hash32, sig64, kOtaPublicKey);
|
||||
}
|
||||
|
||||
static const char* s_server_base = nullptr;
|
||||
static const char* s_device_id = nullptr;
|
||||
static const char* s_hmac_secret = nullptr;
|
||||
static uint32_t s_interval_ms = 21600000UL; // 6 h default
|
||||
static uint32_t s_last_check_ms = 0;
|
||||
|
||||
void ota_updater_init(const char* server_base, const char* device_id,
|
||||
const char* hmac_secret, uint32_t check_interval_ms) {
|
||||
s_server_base = server_base;
|
||||
s_device_id = device_id;
|
||||
s_hmac_secret = hmac_secret;
|
||||
s_interval_ms = check_interval_ms;
|
||||
s_last_check_ms = 0; // force first check on next call
|
||||
}
|
||||
|
||||
static bool add_hmac_headers(HTTPClient& http, const char* method, const char* path) {
|
||||
uint32_t ts = (uint32_t)time(nullptr);
|
||||
if (ts < 1700000000UL) {
|
||||
Serial.printf("[OTA] Clock not synced (ts=%u) — skipping HMAC sign\n", (unsigned)ts);
|
||||
return false;
|
||||
}
|
||||
String sig = hmac_sign(s_hmac_secret, method, path, ts, "");
|
||||
if (sig.isEmpty()) {
|
||||
Serial.println("[OTA] HMAC sign failed");
|
||||
return false;
|
||||
}
|
||||
Serial.printf("[OTA] HMAC headers: device=%s ts=%u sig=%s...\n",
|
||||
s_device_id, (unsigned)ts, sig.substring(0, 12).c_str());
|
||||
http.addHeader("X-Device-Id", s_device_id);
|
||||
http.addHeader("X-Timestamp", String(ts));
|
||||
http.addHeader("X-Signature", sig);
|
||||
return true;
|
||||
}
|
||||
|
||||
static bool download_and_flash(const char* fw_url, size_t expected_size,
|
||||
const uint8_t sig64[64]) {
|
||||
const esp_partition_t* running = esp_ota_get_running_partition();
|
||||
const esp_partition_t* target = esp_ota_get_next_update_partition(nullptr);
|
||||
if (!target) {
|
||||
Serial.println("[OTA] No update partition found");
|
||||
return false;
|
||||
}
|
||||
Serial.printf("[OTA] running='%s' (off=0x%x sz=0x%x), target='%s' (off=0x%x sz=0x%x)\n",
|
||||
running ? running->label : "?",
|
||||
running ? (unsigned)running->address : 0,
|
||||
running ? (unsigned)running->size : 0,
|
||||
target->label,
|
||||
(unsigned)target->address, (unsigned)target->size);
|
||||
|
||||
if (expected_size > target->size) {
|
||||
Serial.printf("[OTA] image (%zu) larger than partition (%u)\n",
|
||||
expected_size, (unsigned)target->size);
|
||||
return false;
|
||||
}
|
||||
|
||||
esp_ota_handle_t handle;
|
||||
esp_err_t er = esp_ota_begin(target, OTA_WITH_SEQUENTIAL_WRITES, &handle);
|
||||
if (er != ESP_OK) {
|
||||
Serial.printf("[OTA] esp_ota_begin failed: %s\n", esp_err_to_name(er));
|
||||
return false;
|
||||
}
|
||||
|
||||
mbedtls_sha256_context sha_ctx;
|
||||
mbedtls_sha256_init(&sha_ctx);
|
||||
mbedtls_sha256_starts(&sha_ctx, 0);
|
||||
|
||||
HTTPClient http;
|
||||
http.begin(fw_url);
|
||||
http.setTimeout(30000);
|
||||
if (!add_hmac_headers(http, "GET", "/ota/firmware")) {
|
||||
Serial.println("[OTA] Aborting firmware download: HMAC sign failed");
|
||||
mbedtls_sha256_free(&sha_ctx);
|
||||
esp_ota_abort(handle);
|
||||
return false;
|
||||
}
|
||||
Serial.printf("[OTA] downloading firmware: %s\n", fw_url);
|
||||
int code = http.GET();
|
||||
Serial.printf("[OTA] firmware response: HTTP %d\n", code);
|
||||
if (code != HTTP_CODE_OK) {
|
||||
String body = http.getString();
|
||||
Serial.printf("[OTA] error body: %s\n", body.c_str());
|
||||
http.end();
|
||||
mbedtls_sha256_free(&sha_ctx);
|
||||
esp_ota_abort(handle);
|
||||
return false;
|
||||
}
|
||||
|
||||
int content_len = http.getSize();
|
||||
Serial.printf("[OTA] Content-Length: %d (expected %zu)\n",
|
||||
content_len, expected_size);
|
||||
|
||||
WiFiClient* stream = http.getStreamPtr();
|
||||
uint8_t buf[4096];
|
||||
size_t written = 0;
|
||||
size_t last_log_at = 0;
|
||||
bool write_failed = false;
|
||||
|
||||
uint32_t start_ms = millis();
|
||||
while (written < expected_size) {
|
||||
size_t want = min((size_t)sizeof(buf), expected_size - written);
|
||||
int got = stream->readBytes(buf, want);
|
||||
if (got <= 0) {
|
||||
Serial.printf("[OTA] stream ended at %zu/%zu bytes (readBytes=%d)\n",
|
||||
written, expected_size, got);
|
||||
break;
|
||||
}
|
||||
esp_err_t we = esp_ota_write(handle, buf, (size_t)got);
|
||||
if (we != ESP_OK) {
|
||||
Serial.printf("[OTA] esp_ota_write failed at offset %zu: %s\n",
|
||||
written, esp_err_to_name(we));
|
||||
write_failed = true;
|
||||
break;
|
||||
}
|
||||
mbedtls_sha256_update(&sha_ctx, buf, (size_t)got);
|
||||
written += (size_t)got;
|
||||
if (written - last_log_at >= 131072 || written == expected_size) {
|
||||
Serial.printf("[OTA] progress: %zu/%zu bytes\n", written, expected_size);
|
||||
last_log_at = written;
|
||||
}
|
||||
}
|
||||
uint32_t elapsed_ms = millis() - start_ms;
|
||||
http.end();
|
||||
Serial.printf("[OTA] download done: %zu bytes in %u ms\n",
|
||||
written, (unsigned)elapsed_ms);
|
||||
|
||||
uint8_t hash[32];
|
||||
mbedtls_sha256_finish(&sha_ctx, hash);
|
||||
mbedtls_sha256_free(&sha_ctx);
|
||||
|
||||
char hex[65];
|
||||
for (int i = 0; i < 32; i++) snprintf(hex + i*2, 3, "%02x", hash[i]);
|
||||
Serial.printf("[OTA] sha256(image)=%s\n", hex);
|
||||
|
||||
if (write_failed) {
|
||||
esp_ota_abort(handle);
|
||||
return false;
|
||||
}
|
||||
if (written != expected_size) {
|
||||
Serial.printf("[OTA] Download truncated (%zu/%zu bytes)\n", written, expected_size);
|
||||
esp_ota_abort(handle);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!ota_verify_signature_with_key(hash, sig64, kOtaPublicKey)) {
|
||||
Serial.println("[OTA] SIGNATURE INVALID — staying on current firmware");
|
||||
esp_ota_abort(handle);
|
||||
return false;
|
||||
}
|
||||
Serial.println("[OTA] signature OK");
|
||||
|
||||
esp_err_t end_err = esp_ota_end(handle);
|
||||
if (end_err != ESP_OK) {
|
||||
Serial.printf("[OTA] esp_ota_end failed: %s\n", esp_err_to_name(end_err));
|
||||
return false;
|
||||
}
|
||||
esp_err_t boot_err = esp_ota_set_boot_partition(target);
|
||||
if (boot_err != ESP_OK) {
|
||||
Serial.printf("[OTA] esp_ota_set_boot_partition failed: %s\n",
|
||||
esp_err_to_name(boot_err));
|
||||
return false;
|
||||
}
|
||||
|
||||
Serial.printf("[OTA] boot partition set to '%s' — rebooting in 500 ms\n",
|
||||
target->label);
|
||||
Serial.flush();
|
||||
delay(500);
|
||||
esp_restart();
|
||||
return true; // unreachable
|
||||
}
|
||||
|
||||
bool ota_updater_check_and_apply() {
|
||||
if (!s_server_base || !s_device_id || !s_hmac_secret) {
|
||||
Serial.println("[OTA] check skipped: updater not initialized");
|
||||
return false;
|
||||
}
|
||||
if (s_last_check_ms != 0 &&
|
||||
(uint32_t)(millis() - s_last_check_ms) < s_interval_ms) {
|
||||
return false;
|
||||
}
|
||||
s_last_check_ms = millis();
|
||||
|
||||
if (WiFi.status() != WL_CONNECTED) {
|
||||
Serial.printf("[OTA] check skipped: WiFi not connected (status=%d)\n",
|
||||
WiFi.status());
|
||||
return false;
|
||||
}
|
||||
|
||||
char check_path[128];
|
||||
snprintf(check_path, sizeof(check_path), "/ota/check?version=%s", FW_VERSION);
|
||||
char check_url[256];
|
||||
snprintf(check_url, sizeof(check_url), "%s%s", s_server_base, check_path);
|
||||
|
||||
Serial.printf("[OTA] check → GET %s (fw=%s)\n", check_url, FW_VERSION);
|
||||
|
||||
HTTPClient http;
|
||||
if (!http.begin(check_url)) {
|
||||
Serial.println("[OTA] http.begin() failed");
|
||||
return false;
|
||||
}
|
||||
if (!add_hmac_headers(http, "GET", check_path)) {
|
||||
Serial.println("[OTA] Aborting check: HMAC sign failed");
|
||||
http.end();
|
||||
return false;
|
||||
}
|
||||
int code = http.GET();
|
||||
Serial.printf("[OTA] check response: HTTP %d\n", code);
|
||||
if (code != HTTP_CODE_OK) {
|
||||
String body = http.getString();
|
||||
Serial.printf("[OTA] error body: %s\n", body.c_str());
|
||||
http.end();
|
||||
return false;
|
||||
}
|
||||
|
||||
JsonDocument doc;
|
||||
DeserializationError err = deserializeJson(doc, http.getStream());
|
||||
http.end();
|
||||
if (err) {
|
||||
Serial.printf("[OTA] JSON parse error: %s\n", err.c_str());
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!doc["update"].as<bool>()) {
|
||||
Serial.printf("[OTA] Firmware up to date (%s)\n", FW_VERSION);
|
||||
return false;
|
||||
}
|
||||
|
||||
const char* remote_ver = doc["version"] | "";
|
||||
size_t fw_size = doc["size"] | 0;
|
||||
const char* sig_b64 = doc["sig_b64"] | "";
|
||||
|
||||
if (fw_size == 0 || strlen(sig_b64) == 0) {
|
||||
log_e("[OTA] Invalid update manifest");
|
||||
return false;
|
||||
}
|
||||
|
||||
log_i("[OTA] Update: %s → %s (%zu bytes)", FW_VERSION, remote_ver, fw_size);
|
||||
|
||||
uint8_t sig64[64];
|
||||
size_t sig_len = 0;
|
||||
if (mbedtls_base64_decode(sig64, sizeof(sig64), &sig_len,
|
||||
(const uint8_t*)sig_b64, strlen(sig_b64)) != 0 ||
|
||||
sig_len != 64) {
|
||||
log_e("[OTA] Bad signature encoding (len=%zu)", sig_len);
|
||||
return false;
|
||||
}
|
||||
|
||||
char fw_url[256];
|
||||
snprintf(fw_url, sizeof(fw_url), "%s/ota/firmware", s_server_base);
|
||||
return download_and_flash(fw_url, fw_size, sig64);
|
||||
}
|
||||
|
||||
#endif // NATIVE_TEST
|
||||
27
firmware/lib/ota_updater/ota_updater.h
Normal file
27
firmware/lib/ota_updater/ota_updater.h
Normal file
@@ -0,0 +1,27 @@
|
||||
// firmware/lib/ota_updater/ota_updater.h
|
||||
#pragma once
|
||||
#include <stdint.h>
|
||||
#include <stddef.h>
|
||||
#include <stdbool.h>
|
||||
|
||||
// One-time init. Call from setup() after WiFi is ready.
|
||||
// server_base: e.g. "http://logs.research.bike:8000"
|
||||
// check_interval_ms: milliseconds between polls (e.g. 6*3600*1000 = 21600000)
|
||||
void ota_updater_init(const char* server_base,
|
||||
const char* device_id,
|
||||
const char* hmac_secret,
|
||||
uint32_t check_interval_ms);
|
||||
|
||||
// Polls server; downloads, verifies, and flashes if newer version available.
|
||||
// Returns true if update was applied (device reboots before returning false path).
|
||||
// Safe to call from any task; blocks during download.
|
||||
bool ota_updater_check_and_apply();
|
||||
|
||||
// Exposed for unit testing — pass an arbitrary 65-byte uncompressed P-256 pubkey.
|
||||
bool ota_version_newer(const char* current, const char* remote);
|
||||
bool ota_verify_signature_with_key(const uint8_t hash32[32], const uint8_t sig64[64],
|
||||
const uint8_t pubkey65[65]);
|
||||
|
||||
// Production wrapper — uses the compiled-in kOtaPublicKey from ota_pubkey.h.
|
||||
// Not callable from native tests (requires ota_pubkey.h / device build).
|
||||
bool ota_verify_signature(const uint8_t hash32[32], const uint8_t sig64[64]);
|
||||
@@ -10,8 +10,11 @@
|
||||
#include "reporter.h"
|
||||
#include "event_log.h"
|
||||
#include "net_guard.h"
|
||||
#include "version.h"
|
||||
#include "ota_updater.h"
|
||||
#include <esp_system.h>
|
||||
#include <esp_task_wdt.h>
|
||||
#include <esp_ota_ops.h>
|
||||
|
||||
// LED on GPIO2 (TimerCamera-F built-in LED) — verify against board schematic
|
||||
// Factory reset: hold GPIO37 (BOOT button) for 5 seconds
|
||||
@@ -93,6 +96,22 @@ static void task_camera(void*) {
|
||||
}
|
||||
}
|
||||
|
||||
static void ota_task(void*) {
|
||||
// Min 10s to avoid pathological fast loops if NVS is corrupted
|
||||
uint32_t interval_ms = g_cfg.ota_interval_s < 10 ? 10000UL : g_cfg.ota_interval_s * 1000UL;
|
||||
Serial.printf("[OTA] task started, interval=%u ms\n", (unsigned)interval_ms);
|
||||
for (;;) {
|
||||
if (WiFi.isConnected()) {
|
||||
Serial.println("[OTA] tick: WiFi connected, running check");
|
||||
ota_updater_check_and_apply();
|
||||
} else {
|
||||
Serial.printf("[OTA] tick: WiFi not connected (status=%d), skipping\n",
|
||||
WiFi.status());
|
||||
}
|
||||
vTaskDelay(pdMS_TO_TICKS(interval_ms));
|
||||
}
|
||||
}
|
||||
|
||||
// Hourly reporter task — runs on core 0
|
||||
static void task_reporter(void*) {
|
||||
uint32_t last_report_ts = 0; // 0 = not initialized yet
|
||||
@@ -175,6 +194,27 @@ void setup() {
|
||||
pinMode(BUTTON_PIN, INPUT_PULLUP);
|
||||
led_set(true); // on = booting
|
||||
|
||||
// OTA rollback guard: if booted from a freshly-flashed OTA image while the
|
||||
// bootloader has rollback enabled, the image is PENDING_VERIFY and will be
|
||||
// rolled back on the next reboot unless we mark it valid. Harmless no-op
|
||||
// when rollback is disabled. Always log the running partition + state so
|
||||
// we can see post-OTA boot behavior on serial.
|
||||
{
|
||||
const esp_partition_t* running = esp_ota_get_running_partition();
|
||||
esp_ota_img_states_t state = ESP_OTA_IMG_UNDEFINED;
|
||||
if (running) {
|
||||
esp_ota_get_state_partition(running, &state);
|
||||
Serial.printf("[BOOT] running partition '%s' (off=0x%x) state=%d fw=%s\n",
|
||||
running->label, (unsigned)running->address,
|
||||
(int)state, FW_VERSION);
|
||||
}
|
||||
if (state == ESP_OTA_IMG_PENDING_VERIFY) {
|
||||
esp_err_t e = esp_ota_mark_app_valid_cancel_rollback();
|
||||
Serial.printf("[BOOT] esp_ota_mark_app_valid_cancel_rollback: %s\n",
|
||||
esp_err_to_name(e));
|
||||
}
|
||||
}
|
||||
|
||||
event_log_init();
|
||||
event_log_write(EVT_BOOT, (uint16_t)esp_reset_reason(), 0);
|
||||
|
||||
@@ -269,6 +309,16 @@ void setup() {
|
||||
|
||||
xTaskCreatePinnedToCore(task_camera, "cam", 8192, nullptr, 2, nullptr, 1);
|
||||
xTaskCreatePinnedToCore(task_reporter, "rep", 8192, nullptr, 1, nullptr, 0);
|
||||
|
||||
// static: ota_updater stores raw pointer; must outlive setup()
|
||||
static String s_ota_base = String("http://") + REPORTER_API_HOST_NAME + ":" + REPORTER_API_PORT;
|
||||
ota_updater_init(
|
||||
s_ota_base.c_str(),
|
||||
g_cfg.device_id.c_str(),
|
||||
g_cfg.hmac_secret.c_str(),
|
||||
g_cfg.ota_interval_s < 10 ? 10000UL : g_cfg.ota_interval_s * 1000UL
|
||||
);
|
||||
xTaskCreate(ota_task, "ota", 8192, nullptr, 1, nullptr);
|
||||
}
|
||||
|
||||
void loop() {
|
||||
|
||||
44
firmware/test/test_ota/test_version.cpp
Normal file
44
firmware/test/test_ota/test_version.cpp
Normal file
@@ -0,0 +1,44 @@
|
||||
// firmware/test/test_ota/test_version.cpp
|
||||
#include <unity.h>
|
||||
|
||||
// Pull in the function under test — include .cpp directly for native builds
|
||||
// so we don't need a separate compilation unit per test.
|
||||
#define NATIVE_TEST
|
||||
#include "../../lib/ota_updater/ota_updater.cpp"
|
||||
|
||||
void setUp() {}
|
||||
void tearDown() {}
|
||||
|
||||
void test_remote_newer_patch() {
|
||||
TEST_ASSERT_TRUE(ota_version_newer("1.0.0", "1.0.1"));
|
||||
}
|
||||
void test_remote_newer_minor() {
|
||||
TEST_ASSERT_TRUE(ota_version_newer("1.0.9", "1.1.0"));
|
||||
}
|
||||
void test_remote_newer_major() {
|
||||
TEST_ASSERT_TRUE(ota_version_newer("0.9.9", "1.0.0"));
|
||||
}
|
||||
void test_same_version() {
|
||||
TEST_ASSERT_FALSE(ota_version_newer("1.2.3", "1.2.3"));
|
||||
}
|
||||
void test_remote_older() {
|
||||
TEST_ASSERT_FALSE(ota_version_newer("1.2.3", "1.2.2"));
|
||||
}
|
||||
void test_malformed_current() {
|
||||
TEST_ASSERT_FALSE(ota_version_newer("bad", "1.0.0"));
|
||||
}
|
||||
void test_malformed_remote() {
|
||||
TEST_ASSERT_FALSE(ota_version_newer("1.0.0", "bad"));
|
||||
}
|
||||
|
||||
int main() {
|
||||
UNITY_BEGIN();
|
||||
RUN_TEST(test_remote_newer_patch);
|
||||
RUN_TEST(test_remote_newer_minor);
|
||||
RUN_TEST(test_remote_newer_major);
|
||||
RUN_TEST(test_same_version);
|
||||
RUN_TEST(test_remote_older);
|
||||
RUN_TEST(test_malformed_current);
|
||||
RUN_TEST(test_malformed_remote);
|
||||
return UNITY_END();
|
||||
}
|
||||
62
firmware/test/test_ota_sig/test_sig_verify.cpp
Normal file
62
firmware/test/test_ota_sig/test_sig_verify.cpp
Normal file
@@ -0,0 +1,62 @@
|
||||
// firmware/test/test_ota_sig/test_sig_verify.cpp
|
||||
#include <unity.h>
|
||||
#include <string.h>
|
||||
#define NATIVE_TEST
|
||||
#include "../../lib/ota_updater/ota_updater.cpp"
|
||||
|
||||
// ── Test vectors generated by Python/cryptography (ECDSA P-256) ────────────
|
||||
static const uint8_t TEST_PUBKEY[65] = {
|
||||
0x04, 0x96, 0x18, 0x6c, 0x8b, 0xb2, 0xdf, 0xea, 0x3f, 0xe4, 0x75, 0x35, 0x0e, 0x8a, 0x3e, 0x7d,
|
||||
0x49, 0x7f, 0x56, 0xb5, 0xb4, 0x1a, 0xae, 0x05, 0xa3, 0x10, 0x6f, 0x02, 0x43, 0x84, 0xb3, 0x1c,
|
||||
0x1f, 0x44, 0xef, 0x08, 0x84, 0x57, 0xca, 0x6e, 0xd8, 0x19, 0x74, 0x10, 0x8d, 0x95, 0xcc, 0x8c,
|
||||
0x61, 0x89, 0x56, 0xea, 0xbc, 0x0c, 0xa2, 0x54, 0xd7, 0x02, 0xf3, 0x1d, 0x67, 0x7c, 0xa5, 0xba,
|
||||
0x42
|
||||
};
|
||||
|
||||
static const uint8_t TEST_HASH[32] = {
|
||||
0x0a, 0x7e, 0x5f, 0x6a, 0x4c, 0x72, 0x11, 0xb7, 0x14, 0x3f, 0x85, 0x59, 0x50, 0x61, 0x8a, 0xa1,
|
||||
0xab, 0xee, 0x7b, 0x57, 0x08, 0x59, 0x56, 0x09, 0x6d, 0x18, 0xaf, 0x70, 0xe6, 0x6e, 0x6c, 0xa8
|
||||
};
|
||||
|
||||
static const uint8_t TEST_SIG[64] = {
|
||||
0x4f, 0xff, 0xc3, 0xc6, 0xd5, 0x04, 0x71, 0x37, 0x87, 0x8c, 0xe1, 0xe5, 0x79, 0xef, 0x59, 0x2a,
|
||||
0x63, 0xde, 0xf6, 0x96, 0x3e, 0x8f, 0x90, 0x2f, 0x46, 0x1f, 0x1b, 0x8a, 0xd5, 0x94, 0xb8, 0x28,
|
||||
0x80, 0xfa, 0xe4, 0x26, 0x14, 0xbf, 0x91, 0x54, 0xbf, 0xa6, 0x2f, 0x67, 0xf9, 0x97, 0x45, 0x3a,
|
||||
0x0f, 0xdc, 0x66, 0xcd, 0x21, 0xb8, 0x91, 0xdb, 0xb9, 0xaa, 0x6b, 0x5d, 0x6c, 0xa5, 0xcb, 0x96
|
||||
};
|
||||
// ──────────────────────────────────────────────────────────────────────────
|
||||
|
||||
void setUp() {}
|
||||
void tearDown() {}
|
||||
|
||||
void test_valid_signature_accepted() {
|
||||
TEST_ASSERT_TRUE(ota_verify_signature_with_key(TEST_HASH, TEST_SIG, TEST_PUBKEY));
|
||||
}
|
||||
|
||||
void test_corrupted_hash_rejected() {
|
||||
uint8_t bad_hash[32];
|
||||
memcpy(bad_hash, TEST_HASH, 32);
|
||||
bad_hash[0] ^= 0xff;
|
||||
TEST_ASSERT_FALSE(ota_verify_signature_with_key(bad_hash, TEST_SIG, TEST_PUBKEY));
|
||||
}
|
||||
|
||||
void test_corrupted_signature_rejected() {
|
||||
uint8_t bad_sig[64];
|
||||
memcpy(bad_sig, TEST_SIG, 64);
|
||||
bad_sig[0] ^= 0xff;
|
||||
TEST_ASSERT_FALSE(ota_verify_signature_with_key(TEST_HASH, bad_sig, TEST_PUBKEY));
|
||||
}
|
||||
|
||||
void test_zero_signature_rejected() {
|
||||
uint8_t zero_sig[64] = {};
|
||||
TEST_ASSERT_FALSE(ota_verify_signature_with_key(TEST_HASH, zero_sig, TEST_PUBKEY));
|
||||
}
|
||||
|
||||
int main() {
|
||||
UNITY_BEGIN();
|
||||
RUN_TEST(test_valid_signature_accepted);
|
||||
RUN_TEST(test_corrupted_hash_rejected);
|
||||
RUN_TEST(test_corrupted_signature_rejected);
|
||||
RUN_TEST(test_zero_signature_rejected);
|
||||
return UNITY_END();
|
||||
}
|
||||
120
server/ota_endpoint.py
Normal file
120
server/ota_endpoint.py
Normal file
@@ -0,0 +1,120 @@
|
||||
# server/ota_endpoint.py
|
||||
"""
|
||||
OTA firmware update endpoints.
|
||||
|
||||
Deployment workflow:
|
||||
1. Generate signing key (one-time):
|
||||
python tools/gen_signing_key.py
|
||||
→ secrets/firmware_signing_key.pem (keep offline)
|
||||
→ firmware/lib/ota_updater/ota_pubkey.h (commit this)
|
||||
|
||||
2. Build and deploy a new firmware version:
|
||||
pio run -e timercam # build
|
||||
python tools/deploy_firmware.py \\
|
||||
firmware/.pio/build/timercam/firmware.bin 1.2.3
|
||||
→ server/firmware/ updated with current.bin, current.sig, manifest.json
|
||||
|
||||
3. Bump FW_VERSION in firmware/include/version.h before each release.
|
||||
|
||||
4. Register in server main app:
|
||||
from server.ota_endpoint import router as ota_router
|
||||
app.include_router(ota_router)
|
||||
Also uncomment Depends(verify_device_hmac) on both route handlers
|
||||
and confirm the HMAC format matches hmac.cpp:
|
||||
method + "\\n" + path + "\\n" + timestamp + "\\n" + sha256_hex(body)
|
||||
|
||||
Note: HMAC auth is currently commented out on route handlers — must be wired
|
||||
before production use. verify_device_hmac must use the same format as hmac.cpp.
|
||||
"""
|
||||
import base64
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
from fastapi import APIRouter
|
||||
from fastapi.responses import FileResponse
|
||||
|
||||
FIRMWARE_DIR = Path(__file__).parent / "firmware"
|
||||
|
||||
router = APIRouter(prefix="/ota", tags=["ota"])
|
||||
|
||||
|
||||
class FirmwareNotFoundError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def _parse_version(v: str) -> tuple:
|
||||
"""Parse semver string to comparable tuple; returns (0,0,0) on malformed input."""
|
||||
try:
|
||||
parts = v.strip().split(".")
|
||||
if len(parts) != 3:
|
||||
return (0, 0, 0)
|
||||
return tuple(int(x) for x in parts)
|
||||
except (ValueError, AttributeError):
|
||||
return (0, 0, 0)
|
||||
|
||||
|
||||
def ota_check_impl(current_version: str, firmware_dir: Path = FIRMWARE_DIR) -> dict:
|
||||
"""
|
||||
Compare device's current_version against staged manifest.
|
||||
Returns {"update": False} when no update is available or manifest is missing.
|
||||
Returns full update payload when server version is strictly newer.
|
||||
"""
|
||||
manifest_path = firmware_dir / "manifest.json"
|
||||
if not manifest_path.exists():
|
||||
return {"update": False}
|
||||
|
||||
try:
|
||||
manifest = json.loads(manifest_path.read_text())
|
||||
version = manifest["version"]
|
||||
size = manifest["size"]
|
||||
sha256 = manifest["sha256"]
|
||||
except (json.JSONDecodeError, KeyError):
|
||||
return {"update": False}
|
||||
|
||||
if _parse_version(version) <= _parse_version(current_version):
|
||||
return {"update": False}
|
||||
|
||||
sig_path = firmware_dir / "current.sig"
|
||||
if not sig_path.exists():
|
||||
return {"update": False}
|
||||
sig_b64 = base64.b64encode(sig_path.read_bytes()).decode()
|
||||
|
||||
return {
|
||||
"update": True,
|
||||
"version": version,
|
||||
"size": size,
|
||||
"sha256": sha256,
|
||||
"sig_b64": sig_b64,
|
||||
}
|
||||
|
||||
|
||||
def ota_firmware_impl(firmware_dir: Path = FIRMWARE_DIR) -> bytes:
|
||||
"""
|
||||
Return raw firmware binary bytes.
|
||||
Raises FirmwareNotFoundError if current.bin is absent.
|
||||
"""
|
||||
bin_path = firmware_dir / "current.bin"
|
||||
if not bin_path.exists():
|
||||
raise FirmwareNotFoundError("No firmware staged")
|
||||
return bin_path.read_bytes()
|
||||
|
||||
|
||||
@router.get("/check")
|
||||
async def ota_check(
|
||||
version: str,
|
||||
# device_id: str = Depends(verify_device_hmac), # uncomment when wiring into app
|
||||
):
|
||||
"""Check whether a firmware update is available for the given device version."""
|
||||
return ota_check_impl(current_version=version)
|
||||
|
||||
|
||||
@router.get("/firmware")
|
||||
async def ota_firmware(
|
||||
# device_id: str = Depends(verify_device_hmac), # uncomment when wiring into app
|
||||
):
|
||||
"""Stream the staged firmware binary to the device."""
|
||||
from fastapi import HTTPException
|
||||
bin_path = FIRMWARE_DIR / "current.bin"
|
||||
if not bin_path.exists():
|
||||
raise HTTPException(status_code=404, detail="No firmware available")
|
||||
return FileResponse(bin_path, media_type="application/octet-stream")
|
||||
83
server/test_ota_endpoint.py
Normal file
83
server/test_ota_endpoint.py
Normal file
@@ -0,0 +1,83 @@
|
||||
# server/test_ota_endpoint.py
|
||||
import base64
|
||||
import hashlib
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from server.ota_endpoint import ota_check_impl, ota_firmware_impl
|
||||
|
||||
|
||||
def write_firmware(firmware_dir: Path, version: str, data: bytes = b"fake_fw") -> None:
|
||||
sig = bytes(64) # zero sig (not validated server-side)
|
||||
manifest = {
|
||||
"version": version,
|
||||
"size": len(data),
|
||||
"sha256": hashlib.sha256(data).hexdigest(),
|
||||
}
|
||||
(firmware_dir / "current.bin").write_bytes(data)
|
||||
(firmware_dir / "current.sig").write_bytes(sig)
|
||||
(firmware_dir / "manifest.json").write_text(json.dumps(manifest))
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def patch_firmware_dir(tmp_path, monkeypatch):
|
||||
import server.ota_endpoint as mod
|
||||
monkeypatch.setattr(mod, "FIRMWARE_DIR", tmp_path)
|
||||
yield tmp_path
|
||||
|
||||
|
||||
def test_check_no_update_same_version(tmp_path):
|
||||
write_firmware(tmp_path, "1.0.0")
|
||||
result = ota_check_impl(current_version="1.0.0", firmware_dir=tmp_path)
|
||||
assert result["update"] is False
|
||||
|
||||
|
||||
def test_check_no_update_newer_local(tmp_path):
|
||||
write_firmware(tmp_path, "1.0.0")
|
||||
result = ota_check_impl(current_version="1.1.0", firmware_dir=tmp_path)
|
||||
assert result["update"] is False
|
||||
|
||||
|
||||
def test_check_update_available(tmp_path):
|
||||
write_firmware(tmp_path, "1.1.0", data=b"new firmware")
|
||||
result = ota_check_impl(current_version="1.0.0", firmware_dir=tmp_path)
|
||||
assert result["update"] is True
|
||||
assert result["version"] == "1.1.0"
|
||||
assert result["size"] == len(b"new firmware")
|
||||
assert "sha256" in result
|
||||
assert "sig_b64" in result
|
||||
sig_bytes = base64.b64decode(result["sig_b64"])
|
||||
assert len(sig_bytes) == 64
|
||||
|
||||
|
||||
def test_check_no_manifest(tmp_path):
|
||||
result = ota_check_impl(current_version="1.0.0", firmware_dir=tmp_path)
|
||||
assert result["update"] is False
|
||||
|
||||
|
||||
def test_firmware_endpoint_returns_binary(tmp_path):
|
||||
fw_data = b"firmware binary content"
|
||||
write_firmware(tmp_path, "1.1.0", data=fw_data)
|
||||
content = ota_firmware_impl(firmware_dir=tmp_path)
|
||||
assert content == fw_data
|
||||
|
||||
|
||||
def test_firmware_endpoint_missing_raises(tmp_path):
|
||||
import server.ota_endpoint as mod
|
||||
with pytest.raises(mod.FirmwareNotFoundError):
|
||||
ota_firmware_impl(firmware_dir=tmp_path)
|
||||
|
||||
|
||||
def test_check_malformed_manifest(tmp_path):
|
||||
(tmp_path / "manifest.json").write_text("not valid json{{{")
|
||||
result = ota_check_impl(current_version="1.0.0", firmware_dir=tmp_path)
|
||||
assert result["update"] is False
|
||||
|
||||
|
||||
def test_check_wrong_arity_version_no_update(tmp_path):
|
||||
write_firmware(tmp_path, "1.2") # wrong arity server version
|
||||
result = ota_check_impl(current_version="1.0.0", firmware_dir=tmp_path)
|
||||
# server "1.2" → (0,0,0) ≤ client (1,0,0) → no update
|
||||
assert result["update"] is False
|
||||
43
tools/deploy_firmware.py
Normal file
43
tools/deploy_firmware.py
Normal file
@@ -0,0 +1,43 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Sign firmware and stage it for the server OTA endpoint."""
|
||||
import argparse, hashlib, json
|
||||
from pathlib import Path
|
||||
|
||||
from sign_firmware import sign_firmware
|
||||
|
||||
|
||||
def deploy(firmware_path: Path, key_path: Path,
|
||||
version: str, output_dir: Path) -> None:
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
data = firmware_path.read_bytes()
|
||||
sig = sign_firmware(firmware_path, key_path)
|
||||
|
||||
(output_dir / "current.bin").write_bytes(data)
|
||||
(output_dir / "current.sig").write_bytes(sig)
|
||||
(output_dir / "manifest.json").write_text(json.dumps({
|
||||
"version": version,
|
||||
"size": len(data),
|
||||
"sha256": hashlib.sha256(data).hexdigest(),
|
||||
}, indent=2))
|
||||
|
||||
print(f"Deployed {firmware_path.name} v{version} → {output_dir}/")
|
||||
|
||||
|
||||
def main() -> None:
|
||||
p = argparse.ArgumentParser(description=__doc__)
|
||||
p.add_argument("firmware", help="Path to .bin")
|
||||
p.add_argument("version", help="Version string, e.g. 1.2.3")
|
||||
p.add_argument("--key", default="secrets/firmware_signing_key.pem")
|
||||
p.add_argument("--out-dir", default="server/firmware")
|
||||
args = p.parse_args()
|
||||
deploy(
|
||||
firmware_path=Path(args.firmware),
|
||||
key_path=Path(args.key),
|
||||
version=args.version,
|
||||
output_dir=Path(args.out_dir),
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
57
tools/gen_signing_key.py
Normal file
57
tools/gen_signing_key.py
Normal file
@@ -0,0 +1,57 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Generate ECDSA P-256 signing keypair for OTA firmware verification."""
|
||||
import argparse
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
|
||||
|
||||
def generate(secrets_dir: Path, header_out: Path) -> None:
|
||||
secrets_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
key = ec.generate_private_key(ec.SECP256R1())
|
||||
|
||||
pem = key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.PKCS8,
|
||||
encryption_algorithm=serialization.NoEncryption(),
|
||||
)
|
||||
key_path = secrets_dir / "firmware_signing_key.pem"
|
||||
key_path.write_bytes(pem)
|
||||
key_path.chmod(0o600)
|
||||
|
||||
pub_bytes = key.public_key().public_bytes(
|
||||
encoding=serialization.Encoding.X962,
|
||||
format=serialization.PublicFormat.UncompressedPoint,
|
||||
)
|
||||
assert len(pub_bytes) == 65 and pub_bytes[0] == 0x04
|
||||
|
||||
hex_values = ", ".join(f"0x{b:02x}" for b in pub_bytes)
|
||||
header = (
|
||||
"#pragma once\n"
|
||||
"// Auto-generated by tools/gen_signing_key.py — DO NOT EDIT\n"
|
||||
"// ECDSA P-256 public key, uncompressed X9.62 (04 || X || Y)\n"
|
||||
f"static const uint8_t kOtaPublicKey[65] = {{{hex_values}}};\n"
|
||||
)
|
||||
header_out.parent.mkdir(parents=True, exist_ok=True)
|
||||
header_out.write_text(header)
|
||||
|
||||
print(f"Private key → {key_path}")
|
||||
print(f"Public key header → {header_out}")
|
||||
|
||||
|
||||
def main() -> None:
|
||||
p = argparse.ArgumentParser(description=__doc__)
|
||||
p.add_argument("--secrets-dir", default="secrets",
|
||||
help="Directory for private key (default: secrets/)")
|
||||
p.add_argument("--header-out",
|
||||
default="firmware/lib/ota_updater/ota_pubkey.h",
|
||||
help="Path to write the C header")
|
||||
args = p.parse_args()
|
||||
generate(Path(args.secrets_dir), Path(args.header_out))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
52
tools/sign_firmware.py
Normal file
52
tools/sign_firmware.py
Normal file
@@ -0,0 +1,52 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Sign a firmware binary with ECDSA P-256. Outputs a raw 64-byte r||s .sig file."""
|
||||
import argparse
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
from cryptography.hazmat.primitives import hashes, serialization
|
||||
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature
|
||||
|
||||
|
||||
def load_private_key(key_path: Path) -> ec.EllipticCurvePrivateKey:
|
||||
key = serialization.load_pem_private_key(key_path.read_bytes(), password=None)
|
||||
if not isinstance(key, ec.EllipticCurvePrivateKey):
|
||||
raise ValueError("Key must be an EC private key")
|
||||
if not isinstance(key.curve, ec.SECP256R1):
|
||||
raise ValueError(f"Key must use SECP256R1 curve, got {key.curve.name}")
|
||||
return key
|
||||
|
||||
|
||||
def sign_firmware(firmware_path: Path, key_path: Path) -> bytes:
|
||||
key = load_private_key(key_path)
|
||||
data = firmware_path.read_bytes()
|
||||
sig_der = key.sign(data, ec.ECDSA(hashes.SHA256()))
|
||||
r, s = decode_dss_signature(sig_der)
|
||||
# Returns raw 64-byte r‖s (not DER) — mbedtls_ecdsa_verify expects this layout
|
||||
return r.to_bytes(32, 'big') + s.to_bytes(32, 'big')
|
||||
|
||||
|
||||
def main() -> None:
|
||||
p = argparse.ArgumentParser(description=__doc__)
|
||||
p.add_argument("firmware", help="Path to firmware .bin")
|
||||
p.add_argument("--key", default="secrets/firmware_signing_key.pem",
|
||||
help="Path to PEM private key")
|
||||
p.add_argument("--out", help="Output .sig path (default: firmware.bin.sig)")
|
||||
args = p.parse_args()
|
||||
|
||||
firmware = Path(args.firmware)
|
||||
key_path = Path(args.key)
|
||||
out_path = Path(args.out) if args.out else firmware.with_suffix(".bin.sig")
|
||||
|
||||
try:
|
||||
sig = sign_firmware(firmware, key_path)
|
||||
except (FileNotFoundError, ValueError) as e:
|
||||
print(f"Error: {e}", file=sys.stderr)
|
||||
raise SystemExit(1)
|
||||
out_path.write_bytes(sig)
|
||||
print(f"Signed {firmware.name} → {out_path} ({len(sig)} bytes)")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
63
tools/test_deploy_firmware.py
Normal file
63
tools/test_deploy_firmware.py
Normal file
@@ -0,0 +1,63 @@
|
||||
import json, hashlib, sys
|
||||
from pathlib import Path
|
||||
import pytest
|
||||
|
||||
REPO_ROOT = Path(__file__).parent.parent
|
||||
sys.path.insert(0, str(REPO_ROOT / "tools"))
|
||||
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
|
||||
from deploy_firmware import deploy
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def key_pem(tmp_path):
|
||||
key = ec.generate_private_key(ec.SECP256R1())
|
||||
pem_path = tmp_path / "key.pem"
|
||||
pem_path.write_bytes(key.private_bytes(
|
||||
serialization.Encoding.PEM,
|
||||
serialization.PrivateFormat.PKCS8,
|
||||
serialization.NoEncryption(),
|
||||
))
|
||||
return pem_path
|
||||
|
||||
|
||||
def test_deploy_writes_all_artifacts(tmp_path, key_pem):
|
||||
firmware = tmp_path / "firmware.bin"
|
||||
firmware.write_bytes(b"fake firmware" * 200)
|
||||
out_dir = tmp_path / "server_firmware"
|
||||
|
||||
deploy(firmware_path=firmware, key_path=key_pem,
|
||||
version="1.2.3", output_dir=out_dir)
|
||||
|
||||
assert (out_dir / "current.bin").exists()
|
||||
assert (out_dir / "current.sig").exists()
|
||||
assert (out_dir / "manifest.json").exists()
|
||||
|
||||
|
||||
def test_manifest_contents(tmp_path, key_pem):
|
||||
data = b"firmware payload"
|
||||
firmware = tmp_path / "fw.bin"
|
||||
firmware.write_bytes(data)
|
||||
out_dir = tmp_path / "out"
|
||||
|
||||
deploy(firmware_path=firmware, key_path=key_pem,
|
||||
version="2.0.1", output_dir=out_dir)
|
||||
|
||||
manifest = json.loads((out_dir / "manifest.json").read_text())
|
||||
assert manifest["version"] == "2.0.1"
|
||||
assert manifest["size"] == len(data)
|
||||
assert manifest["sha256"] == hashlib.sha256(data).hexdigest()
|
||||
|
||||
|
||||
def test_signature_is_64_bytes(tmp_path, key_pem):
|
||||
firmware = tmp_path / "fw.bin"
|
||||
firmware.write_bytes(b"fw")
|
||||
out_dir = tmp_path / "out"
|
||||
|
||||
deploy(firmware_path=firmware, key_path=key_pem,
|
||||
version="1.0.0", output_dir=out_dir)
|
||||
|
||||
sig = (out_dir / "current.sig").read_bytes()
|
||||
assert len(sig) == 64
|
||||
49
tools/test_gen_signing_key.py
Normal file
49
tools/test_gen_signing_key.py
Normal file
@@ -0,0 +1,49 @@
|
||||
import os, subprocess, sys, tempfile
|
||||
from pathlib import Path
|
||||
|
||||
REPO_ROOT = Path(__file__).parent.parent
|
||||
|
||||
def run_gen(secrets_dir, header_path):
|
||||
env = os.environ.copy()
|
||||
result = subprocess.run(
|
||||
[sys.executable, str(REPO_ROOT / "tools/gen_signing_key.py"),
|
||||
"--secrets-dir", str(secrets_dir),
|
||||
"--header-out", str(header_path)],
|
||||
capture_output=True, text=True, env=env
|
||||
)
|
||||
assert result.returncode == 0, result.stderr
|
||||
return result
|
||||
|
||||
def test_private_key_created():
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
header = Path(d) / "ota_pubkey.h"
|
||||
run_gen(d, header)
|
||||
pem = Path(d) / "firmware_signing_key.pem"
|
||||
assert pem.exists()
|
||||
content = pem.read_text()
|
||||
assert "BEGIN PRIVATE KEY" in content
|
||||
|
||||
def test_header_created():
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
header = Path(d) / "ota_pubkey.h"
|
||||
run_gen(d, header)
|
||||
assert header.exists()
|
||||
content = header.read_text()
|
||||
assert "kOtaPublicKey" in content
|
||||
assert "0x04" in content # uncompressed point prefix
|
||||
assert "[65]" in content
|
||||
|
||||
def test_public_key_is_valid_p256_point():
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
header = Path(d) / "ota_pubkey.h"
|
||||
run_gen(d, header)
|
||||
pem = (Path(d) / "firmware_signing_key.pem").read_bytes()
|
||||
priv = serialization.load_pem_private_key(pem, password=None)
|
||||
pub_bytes = priv.public_key().public_bytes(
|
||||
serialization.Encoding.X962,
|
||||
serialization.PublicFormat.UncompressedPoint,
|
||||
)
|
||||
assert len(pub_bytes) == 65
|
||||
assert pub_bytes[0] == 0x04
|
||||
64
tools/test_sign_firmware.py
Normal file
64
tools/test_sign_firmware.py
Normal file
@@ -0,0 +1,64 @@
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from cryptography.exceptions import InvalidSignature
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
from cryptography.hazmat.primitives import hashes, serialization
|
||||
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature
|
||||
|
||||
REPO_ROOT = Path(__file__).parent.parent
|
||||
sys.path.insert(0, str(REPO_ROOT / "tools"))
|
||||
from sign_firmware import sign_firmware, load_private_key
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def keypair(tmp_path):
|
||||
key = ec.generate_private_key(ec.SECP256R1())
|
||||
pem_path = tmp_path / "key.pem"
|
||||
pem_path.write_bytes(key.private_bytes(
|
||||
serialization.Encoding.PEM,
|
||||
serialization.PrivateFormat.PKCS8,
|
||||
serialization.NoEncryption(),
|
||||
))
|
||||
return key, pem_path
|
||||
|
||||
|
||||
def test_signature_is_64_bytes(keypair, tmp_path):
|
||||
key, key_path = keypair
|
||||
firmware = tmp_path / "fw.bin"
|
||||
firmware.write_bytes(b"fake firmware data" * 100)
|
||||
sig = sign_firmware(firmware, key_path)
|
||||
assert len(sig) == 64
|
||||
|
||||
|
||||
def test_signature_verifies(keypair, tmp_path):
|
||||
key, key_path = keypair
|
||||
data = b"test firmware payload"
|
||||
firmware = tmp_path / "fw.bin"
|
||||
firmware.write_bytes(data)
|
||||
sig_raw = sign_firmware(firmware, key_path)
|
||||
|
||||
# Convert raw r||s back to DER for cryptography lib verify
|
||||
r = int.from_bytes(sig_raw[:32], 'big')
|
||||
s = int.from_bytes(sig_raw[32:], 'big')
|
||||
from cryptography.hazmat.primitives.asymmetric.utils import encode_dss_signature
|
||||
sig_der = encode_dss_signature(r, s)
|
||||
|
||||
key.public_key().verify(sig_der, data, ec.ECDSA(hashes.SHA256()))
|
||||
|
||||
|
||||
def test_wrong_key_fails_verification(keypair, tmp_path):
|
||||
key, key_path = keypair
|
||||
firmware = tmp_path / "fw.bin"
|
||||
firmware.write_bytes(b"firmware")
|
||||
sig_raw = sign_firmware(firmware, key_path)
|
||||
|
||||
other_key = ec.generate_private_key(ec.SECP256R1())
|
||||
r = int.from_bytes(sig_raw[:32], 'big')
|
||||
s = int.from_bytes(sig_raw[32:], 'big')
|
||||
from cryptography.hazmat.primitives.asymmetric.utils import encode_dss_signature
|
||||
sig_der = encode_dss_signature(r, s)
|
||||
|
||||
with pytest.raises(InvalidSignature):
|
||||
other_key.public_key().verify(sig_der, b"firmware", ec.ECDSA(hashes.SHA256()))
|
||||
Reference in New Issue
Block a user