From 896347692c3d84d19c6b4f7e8e46ef9ac2f84a3f Mon Sep 17 00:00:00 2001 From: Emil Lerch Date: Fri, 8 May 2026 12:21:31 -0700 Subject: [PATCH] check etag header if present, give a single retry on body validation error --- src/cache/store.zig | 51 ++++++---- src/net/http.zig | 230 ++++++++++++++++++++++++++++++++++++++++++-- src/service.zig | 86 ++++++++++++++++- 3 files changed, 334 insertions(+), 33 deletions(-) diff --git a/src/cache/store.zig b/src/cache/store.zig index 215dbde..420812d 100644 --- a/src/cache/store.zig +++ b/src/cache/store.zig @@ -396,23 +396,6 @@ pub const Store = struct { server_etag: ?[]const u8 = null, }; - /// Archive a torn cache body for post-mortem diagnosis. - /// - /// Writes two sibling files under `{cache_dir}/_torn/`: - /// `{symbol}_{data_type}_{unix_ts}.bin` — the raw bytes as received - /// `{symbol}_{data_type}_{unix_ts}.meta` — SRF-formatted context - /// - /// The `.meta` file mirrors the rest of the cache's on-disk format - /// so tooling can iterate it with the standard SRF reader. Content: - /// type::tear_metadata - /// symbol:: - /// data_type:: - /// unix_ts:num: - /// iso_ts:: - /// body_length:num: - /// body_sha256::<64-hex> - /// failure_reason:: - /// zfin_commit:: /// Schema for the SRF `.meta` sidecar emitted by `archiveTornBody`. /// Each field becomes a `key:type:value` entry in a single record /// under a `#!srfv1` header. Optional fields with `null` defaults @@ -439,6 +422,29 @@ pub const Store = struct { last_200_bytes_hex: []const u8, }; + /// Archive a torn cache body for post-mortem diagnosis. + /// + /// Writes two sibling files under `{cache_dir}/_torn/`: + /// `{symbol}_{data_type}_{unix_ms}.bin` — the raw bytes as received + /// `{symbol}_{data_type}_{unix_ms}.meta` — SRF-formatted context + /// + /// Filenames carry a millisecond-resolution timestamp so a retry + /// loop that tears twice within the same wall-clock second + /// produces distinct archive pairs — two back-to-back captures are + /// the most valuable forensic signal we can produce (byte offsets, + /// tail shapes, and time deltas are all impossible to infer from a + /// single failure). + /// + /// The `.meta` file mirrors the rest of the cache's on-disk format + /// so tooling can iterate it with the standard SRF reader. See + /// `TearRecord` above for the schema. Required fields are always + /// emitted; `http_status`, `http_content_length`, `server_url`, + /// and `server_etag` appear only when the caller populated them + /// in `TornMeta`. + /// + /// Best-effort: a failure here should never mask the original + /// detection path's return value. Callers log.debug the outcome and + /// move on. pub fn archiveTornBody( allocator: std.mem.Allocator, cache_dir: []const u8, @@ -455,12 +461,19 @@ pub const Store = struct { else => return err, }; + // Second-resolution timestamp for the SRF `unix_ts` field (plus + // ISO rendering). Millisecond-resolution timestamp is used in + // the filename so retries within the same wall-clock second + // produce distinct archive entries rather than overwriting each + // other — two back-to-back tears from a refresh retry are the + // most valuable forensic signal we can capture. const ts = std.time.timestamp(); + const ts_ms = std.time.milliTimestamp(); const bin_name = try std.fmt.allocPrint( allocator, "{s}_{s}_{d}.bin", - .{ symbol, @tagName(data_type), ts }, + .{ symbol, @tagName(data_type), ts_ms }, ); defer allocator.free(bin_name); const bin_path = try std.fs.path.join(allocator, &.{ torn_dir, bin_name }); @@ -469,7 +482,7 @@ pub const Store = struct { const meta_name = try std.fmt.allocPrint( allocator, "{s}_{s}_{d}.meta", - .{ symbol, @tagName(data_type), ts }, + .{ symbol, @tagName(data_type), ts_ms }, ); defer allocator.free(meta_name); const meta_path = try std.fs.path.join(allocator, &.{ torn_dir, meta_name }); diff --git a/src/net/http.zig b/src/net/http.zig index 8e103e3..7c8f46f 100644 --- a/src/net/http.zig +++ b/src/net/http.zig @@ -18,13 +18,84 @@ pub const HttpError = error{ pub const Response = struct { status: std.http.Status, body: []const u8, + /// Raw `ETag` header value from the server, if present. Owned by the + /// same allocator as `body`. Captured verbatim (including quotes and + /// any `sha256:` scheme prefix) so diagnostic archival can record + /// it as-is. + etag: ?[]const u8, allocator: std.mem.Allocator, pub fn deinit(self: *Response) void { self.allocator.free(self.body); + if (self.etag) |e| self.allocator.free(e); + } + + /// Integrity check outcome. + pub const IntegrityResult = union(enum) { + /// No ETag present, or the ETag wasn't a recognized sha256 + /// shape. Verification is skipped — the caller should treat + /// this the same as a successful verification. + not_applicable, + /// Server's advertised sha256 matches the body's actual + /// sha256. The body is byte-exact with what the server sent. + ok, + /// Mismatch between server's advertised sha256 and the body's + /// actual sha256. Indicates truncation or corruption in + /// transit. `expected_hex` and `actual_hex` are each 64 chars + /// of lowercase hex; they reference internal buffers of the + /// result and are valid for the lifetime of this struct. + mismatch: struct { + expected_hex: [64]u8, + actual_hex: [64]u8, + }, + }; + + /// Verify the body's sha256 against the server's `ETag` header. + /// + /// Recognizes `ETag: "sha256:<64-hex>"` (quoted or unquoted, prefix + /// is case-insensitive). Other ETag shapes — weak etags, md5, etc. + /// — return `.not_applicable` so deployments with non-sha256 etags + /// don't get their requests rejected. + pub fn verifyIntegrity(self: *const Response) IntegrityResult { + const etag = self.etag orelse return .not_applicable; + const expected_hex = parseSha256Etag(etag) orelse return .not_applicable; + + var actual: [std.crypto.hash.sha2.Sha256.digest_length]u8 = undefined; + std.crypto.hash.sha2.Sha256.hash(self.body, &actual, .{}); + var actual_hex: [std.crypto.hash.sha2.Sha256.digest_length * 2]u8 = undefined; + _ = std.fmt.bufPrint(&actual_hex, "{x}", .{&actual}) catch unreachable; + + if (std.ascii.eqlIgnoreCase(&actual_hex, expected_hex)) return .ok; + + var result: IntegrityResult = .{ .mismatch = .{ + .expected_hex = undefined, + .actual_hex = undefined, + } }; + // expected_hex may be uppercase depending on server — copy as + // lowercase for stable comparison downstream. + for (expected_hex, 0..) |c, i| result.mismatch.expected_hex[i] = std.ascii.toLower(c); + @memcpy(&result.mismatch.actual_hex, &actual_hex); + return result; } }; +/// Extract the hex portion of a `"sha256:"` ETag. Accepts both +/// quoted and unquoted forms (both are commonly written in the wild), +/// and the `sha256:` prefix is case-insensitive. Returns null for any +/// other shape — callers should then skip the integrity check rather +/// than failing the request. +fn parseSha256Etag(etag: []const u8) ?[]const u8 { + var v = etag; + if (v.len >= 2 and v[0] == '"' and v[v.len - 1] == '"') v = v[1 .. v.len - 1]; + const prefix = "sha256:"; + if (v.len <= prefix.len) return null; + if (!std.ascii.eqlIgnoreCase(v[0..prefix.len], prefix)) return null; + const hex = v[prefix.len..]; + if (hex.len != std.crypto.hash.sha2.Sha256.digest_length * 2) return null; + for (hex) |c| if (!std.ascii.isHex(c)) return null; + return hex; +} + /// Thin HTTP client wrapper with retry and error classification. pub const Client = struct { allocator: std.mem.Allocator, @@ -77,27 +148,73 @@ pub const Client = struct { } fn doRequest(self: *Client, method: std.http.Method, url: []const u8, body: ?[]const u8, extra_headers: []const std.http.Header) HttpError!Response { - var aw: std.Io.Writer.Allocating = .init(self.allocator); + const uri = std.Uri.parse(url) catch return HttpError.RequestFailed; - const result = self.http_client.fetch(.{ - .location = .{ .url = url }, - .method = method, - .payload = body, + var req = self.http_client.request(method, uri, .{ + .redirect_behavior = @enumFromInt(3), .extra_headers = extra_headers, - .response_writer = &aw.writer, - }) catch { - aw.deinit(); + }) catch return HttpError.RequestFailed; + defer req.deinit(); + + if (body) |payload| { + var send_buf: [4096]u8 = undefined; + req.transfer_encoding = .{ .content_length = payload.len }; + var bw = req.sendBodyUnflushed(&send_buf) catch return HttpError.RequestFailed; + bw.writer.writeAll(payload) catch return HttpError.RequestFailed; + bw.end() catch return HttpError.RequestFailed; + req.connection.?.flush() catch return HttpError.RequestFailed; + } else { + req.sendBodiless() catch return HttpError.RequestFailed; + } + + // Matches the default redirect capacity in std.http.Client.fetch. + var redirect_buffer: [8 * 1024]u8 = undefined; + var response = req.receiveHead(&redirect_buffer) catch return HttpError.RequestFailed; + + // Capture the ETag (if any) from the response head BEFORE + // draining the body. `Response.reader()` invalidates + // `head.bytes`, and `iterateHeaders` reads from that slice, so + // anything we want must be duplicated now. + const etag_owned: ?[]const u8 = blk: { + var it = response.head.iterateHeaders(); + while (it.next()) |h| { + if (std.ascii.eqlIgnoreCase(h.name, "etag")) { + const dup = self.allocator.dupe(u8, h.value) catch + return HttpError.OutOfMemory; + break :blk dup; + } + } + break :blk null; + }; + errdefer if (etag_owned) |e| self.allocator.free(e); + + // Drain the body. `readerDecompressing` is adaptive: for + // identity-encoded responses (the zfin server's default) it + // hands back the transfer reader unchanged — zero-cost. For + // gzip/deflate/zstd it wraps the transfer reader with the + // appropriate decompressor. The decompress buffer is only + // touched on the compressed paths; sized at 64 KiB as a + // reasonable default for the unlikely case a provider endpoint + // starts sending compressed SRF/JSON. + var aw: std.Io.Writer.Allocating = .init(self.allocator); + errdefer aw.deinit(); + + var transfer_buffer: [4096]u8 = undefined; + var decompress: std.http.Decompress = undefined; + var decompress_buffer: [64 * 1024]u8 = undefined; + const reader = response.readerDecompressing(&transfer_buffer, &decompress, &decompress_buffer); + _ = reader.streamRemaining(&aw.writer) catch { return HttpError.RequestFailed; }; const resp_body = aw.toOwnedSlice() catch { - aw.deinit(); return HttpError.OutOfMemory; }; return .{ - .status = result.status, + .status = response.head.status, .body = resp_body, + .etag = etag_owned, .allocator = self.allocator, }; } @@ -107,6 +224,7 @@ pub const Client = struct { .ok => return response, else => { response.allocator.free(response.body); + if (response.etag) |e| response.allocator.free(e); return switch (response.status) { .too_many_requests => HttpError.RateLimited, .unauthorized, .forbidden => HttpError.Unauthorized, @@ -163,3 +281,95 @@ test "buildUrl" { defer allocator.free(url); try std.testing.expectEqualStrings("https://api.example.com/v1/data?symbol=AAPL&apikey=test123", url); } + +test "parseSha256Etag: quoted form" { + const hex = parseSha256Etag("\"sha256:0402d084abcbd4e40993ebe1e55e0beb400ad77c8c5354a46b047c821e36d3b9\"") orelse unreachable; + try std.testing.expectEqualStrings("0402d084abcbd4e40993ebe1e55e0beb400ad77c8c5354a46b047c821e36d3b9", hex); +} + +test "parseSha256Etag: unquoted form" { + const hex = parseSha256Etag("sha256:deadbeefcafedeadbeefcafedeadbeefcafedeadbeefcafedeadbeefcafe1234") orelse unreachable; + try std.testing.expectEqualStrings("deadbeefcafedeadbeefcafedeadbeefcafedeadbeefcafedeadbeefcafe1234", hex); +} + +test "parseSha256Etag: case-insensitive prefix" { + const hex = parseSha256Etag("\"SHA256:0402d084abcbd4e40993ebe1e55e0beb400ad77c8c5354a46b047c821e36d3b9\"") orelse unreachable; + try std.testing.expectEqualStrings("0402d084abcbd4e40993ebe1e55e0beb400ad77c8c5354a46b047c821e36d3b9", hex); +} + +test "parseSha256Etag: wrong scheme returns null" { + try std.testing.expectEqual(@as(?[]const u8, null), parseSha256Etag("\"md5:deadbeef\"")); + try std.testing.expectEqual(@as(?[]const u8, null), parseSha256Etag("W/\"weak-etag\"")); + try std.testing.expectEqual(@as(?[]const u8, null), parseSha256Etag("")); +} + +test "parseSha256Etag: wrong hex length returns null" { + try std.testing.expectEqual(@as(?[]const u8, null), parseSha256Etag("\"sha256:deadbeef\"")); + try std.testing.expectEqual(@as(?[]const u8, null), parseSha256Etag("\"sha256:0402d084abcbd4e40993ebe1e55e0beb400ad77c8c5354a46b047c821e36d3b9aa\"")); +} + +test "parseSha256Etag: non-hex character returns null" { + try std.testing.expectEqual(@as(?[]const u8, null), parseSha256Etag("\"sha256:ZZ02d084abcbd4e40993ebe1e55e0beb400ad77c8c5354a46b047c821e36d3b9\"")); +} + +test "Response.verifyIntegrity: no etag returns not_applicable" { + var body_buf = [_]u8{ 'h', 'i' }; + var response = Response{ + .status = .ok, + .body = &body_buf, + .etag = null, + .allocator = std.testing.allocator, + }; + const result = response.verifyIntegrity(); + try std.testing.expect(result == .not_applicable); +} + +test "Response.verifyIntegrity: non-sha256 etag returns not_applicable" { + var body_buf = [_]u8{ 'h', 'i' }; + var response = Response{ + .status = .ok, + .body = &body_buf, + .etag = "W/\"weak-etag\"", + .allocator = std.testing.allocator, + }; + const result = response.verifyIntegrity(); + try std.testing.expect(result == .not_applicable); +} + +test "Response.verifyIntegrity: matching sha256 returns ok" { + // sha256("hello world") = b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9 + const body = "hello world"; + var response = Response{ + .status = .ok, + .body = body, + .etag = "\"sha256:b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9\"", + .allocator = std.testing.allocator, + }; + const result = response.verifyIntegrity(); + try std.testing.expect(result == .ok); +} + +test "Response.verifyIntegrity: mismatched sha256 returns mismatch" { + const body = "hello world"; + // Intentionally wrong digest. + var response = Response{ + .status = .ok, + .body = body, + .etag = "\"sha256:0000000000000000000000000000000000000000000000000000000000000000\"", + .allocator = std.testing.allocator, + }; + const result = response.verifyIntegrity(); + switch (result) { + .mismatch => |m| { + try std.testing.expectEqualStrings( + "0000000000000000000000000000000000000000000000000000000000000000", + &m.expected_hex, + ); + try std.testing.expectEqualStrings( + "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9", + &m.actual_hex, + ); + }, + else => try std.testing.expect(false), + } +} diff --git a/src/service.zig b/src/service.zig index cff8f07..3c70e06 100644 --- a/src/service.zig +++ b/src/service.zig @@ -1390,6 +1390,18 @@ pub const DataService = struct { /// Try to sync a cache file from the configured zfin-server. /// Returns true if the file was successfully synced, false on any error. /// Silently returns false if no server is configured. + /// + /// Applies a single retry with a short delay when the first attempt + /// fails at the HTTP layer OR produces a torn body (integrity + /// mismatch / `looksCompleteSrf` rejection). Motivation: refreshes + /// fan out 20+ symbols across 8 parallel threads, and the tear + /// pattern we've observed so far looks transient per-connection. + /// One retry papers over single-packet hiccups without dramatically + /// extending refresh wall time. If the retry also fails the + /// archive grows by one more `.bin`/`.meta` pair — two captures + /// from the same refresh are the most valuable diagnostic signal + /// we can produce (same body shape? same byte offset? same time + /// delta? all answers we can't get from a single failure). fn syncFromServer(self: *DataService, symbol: []const u8, data_type: cache.DataType) bool { const server_url = self.config.server_url orelse return false; const endpoint = switch (data_type) { @@ -1406,6 +1418,32 @@ pub const DataService = struct { const full_url = std.fmt.allocPrint(self.allocator(), "{s}/{s}{s}", .{ server_url, symbol, endpoint }) catch return false; defer self.allocator().free(full_url); + const max_attempts: u8 = 2; + const retry_delay_ms: u64 = 250; + + var attempt: u8 = 0; + while (attempt < max_attempts) : (attempt += 1) { + if (attempt > 0) { + log.debug( + "{s}: retrying {s} server sync (attempt {d}/{d}) after {d}ms delay", + .{ symbol, @tagName(data_type), attempt + 1, max_attempts, retry_delay_ms }, + ); + std.Thread.sleep(retry_delay_ms * std.time.ns_per_ms); + } + switch (self.tryOneSync(symbol, data_type, full_url)) { + .ok => return true, + // Torn or network error — retry if attempts remain. + .torn, .net_err => {}, + } + } + return false; + } + + const SyncAttempt = enum { ok, torn, net_err }; + + /// One attempt at syncing a file from the server. Archives a torn + /// body when detected but does NOT retry — the caller decides that. + fn tryOneSync(self: *DataService, symbol: []const u8, data_type: cache.DataType, full_url: []const u8) SyncAttempt { log.debug("{s}: syncing {s} from server", .{ symbol, @tagName(data_type) }); var client = http.Client.init(self.allocator()); @@ -1413,10 +1451,49 @@ pub const DataService = struct { var response = client.get(full_url) catch |err| { log.debug("{s}: server sync failed for {s}: {s}", .{ symbol, @tagName(data_type), @errorName(err) }); - return false; + return .net_err; }; defer response.deinit(); + // Integrity check: if the server advertised an ETag in + // `"sha256:"` form, compare the body's actual sha256 + // against it. Catches mid-stream truncation that Zig's + // std.http.Client.fetch silently accepts on the Content-Length + // path (EndOfStream from a cut transport is swallowed as a + // normal termination). Archive the mismatching body with the + // advertised etag so post-mortem can see exactly what was + // promised vs what arrived. Deployments with no ETag or a + // non-sha256 etag fall through to `looksCompleteSrf` below + // (backward-compatible with pre-fix servers). + switch (response.verifyIntegrity()) { + .mismatch => |m| { + cache.Store.archiveTornBody( + self.allocator(), + self.config.cache_dir, + symbol, + data_type, + response.body, + .{ + .failure_reason = .etag_mismatch, + .http_status = @intFromEnum(response.status), + .server_url = full_url, + .server_etag = response.etag, + }, + ) catch |err| { + log.debug( + "{s}: failed to archive etag-mismatch {s} body: {s}", + .{ symbol, @tagName(data_type), @errorName(err) }, + ); + }; + log.debug( + "{s}: {s} server response failed integrity check ({d} bytes, expected sha256={s}, actual={s}) — archived under _torn/, not writing to cache", + .{ symbol, @tagName(data_type), response.body.len, m.expected_hex, m.actual_hex }, + ); + return .torn; + }, + .ok, .not_applicable => {}, + } + // Validate the response body looks like a complete SRF file before // writing it to cache. This guards against HTTP body truncation // (TCP reset, Content-Length mismatch, proxy that flushed a @@ -1442,6 +1519,7 @@ pub const DataService = struct { .failure_reason = .looks_complete_srf_failed, .http_status = @intFromEnum(response.status), .server_url = full_url, + .server_etag = response.etag, }, ) catch |err| { log.debug( @@ -1453,17 +1531,17 @@ pub const DataService = struct { "{s}: rejecting torn {s} server response ({d} bytes) — archived under _torn/, not writing to cache", .{ symbol, @tagName(data_type), response.body.len }, ); - return false; + return .torn; } // Write to local cache var s = self.store(); s.writeRaw(symbol, data_type, response.body) catch |err| { log.debug("{s}: failed to write synced {s} to cache: {s}", .{ symbol, @tagName(data_type), @errorName(err) }); - return false; + return .net_err; }; log.debug("{s}: synced {s} from server ({d} bytes)", .{ symbol, @tagName(data_type), response.body.len }); - return true; + return .ok; } /// Sync candle data (both daily and meta) from the server.