check etag header if present, give a single retry on body validation error

This commit is contained in:
Emil Lerch 2026-05-08 12:21:31 -07:00
parent 42b8ff1b38
commit 896347692c
Signed by: lobo
GPG key ID: A7B62D657EF764F8
3 changed files with 334 additions and 33 deletions

51
src/cache/store.zig vendored
View file

@ -396,23 +396,6 @@ pub const Store = struct {
server_etag: ?[]const u8 = null,
};
/// Archive a torn cache body for post-mortem diagnosis.
///
/// Writes two sibling files under `{cache_dir}/_torn/`:
/// `{symbol}_{data_type}_{unix_ts}.bin` the raw bytes as received
/// `{symbol}_{data_type}_{unix_ts}.meta` SRF-formatted context
///
/// The `.meta` file mirrors the rest of the cache's on-disk format
/// so tooling can iterate it with the standard SRF reader. Content:
/// type::tear_metadata
/// symbol::<SYM>
/// data_type::<tag>
/// unix_ts:num:<N>
/// iso_ts::<YYYY-MM-DDTHH:MM:SSZ>
/// body_length:num:<N>
/// body_sha256::<64-hex>
/// failure_reason::<tag>
/// zfin_commit::<version_string>
/// Schema for the SRF `.meta` sidecar emitted by `archiveTornBody`.
/// Each field becomes a `key:type:value` entry in a single record
/// under a `#!srfv1` header. Optional fields with `null` defaults
@ -439,6 +422,29 @@ pub const Store = struct {
last_200_bytes_hex: []const u8,
};
/// Archive a torn cache body for post-mortem diagnosis.
///
/// Writes two sibling files under `{cache_dir}/_torn/`:
/// `{symbol}_{data_type}_{unix_ms}.bin` the raw bytes as received
/// `{symbol}_{data_type}_{unix_ms}.meta` SRF-formatted context
///
/// Filenames carry a millisecond-resolution timestamp so a retry
/// loop that tears twice within the same wall-clock second
/// produces distinct archive pairs two back-to-back captures are
/// the most valuable forensic signal we can produce (byte offsets,
/// tail shapes, and time deltas are all impossible to infer from a
/// single failure).
///
/// The `.meta` file mirrors the rest of the cache's on-disk format
/// so tooling can iterate it with the standard SRF reader. See
/// `TearRecord` above for the schema. Required fields are always
/// emitted; `http_status`, `http_content_length`, `server_url`,
/// and `server_etag` appear only when the caller populated them
/// in `TornMeta`.
///
/// Best-effort: a failure here should never mask the original
/// detection path's return value. Callers log.debug the outcome and
/// move on.
pub fn archiveTornBody(
allocator: std.mem.Allocator,
cache_dir: []const u8,
@ -455,12 +461,19 @@ pub const Store = struct {
else => return err,
};
// Second-resolution timestamp for the SRF `unix_ts` field (plus
// ISO rendering). Millisecond-resolution timestamp is used in
// the filename so retries within the same wall-clock second
// produce distinct archive entries rather than overwriting each
// other two back-to-back tears from a refresh retry are the
// most valuable forensic signal we can capture.
const ts = std.time.timestamp();
const ts_ms = std.time.milliTimestamp();
const bin_name = try std.fmt.allocPrint(
allocator,
"{s}_{s}_{d}.bin",
.{ symbol, @tagName(data_type), ts },
.{ symbol, @tagName(data_type), ts_ms },
);
defer allocator.free(bin_name);
const bin_path = try std.fs.path.join(allocator, &.{ torn_dir, bin_name });
@ -469,7 +482,7 @@ pub const Store = struct {
const meta_name = try std.fmt.allocPrint(
allocator,
"{s}_{s}_{d}.meta",
.{ symbol, @tagName(data_type), ts },
.{ symbol, @tagName(data_type), ts_ms },
);
defer allocator.free(meta_name);
const meta_path = try std.fs.path.join(allocator, &.{ torn_dir, meta_name });

View file

@ -18,13 +18,84 @@ pub const HttpError = error{
pub const Response = struct {
status: std.http.Status,
body: []const u8,
/// Raw `ETag` header value from the server, if present. Owned by the
/// same allocator as `body`. Captured verbatim (including quotes and
/// any `sha256:` scheme prefix) so diagnostic archival can record
/// it as-is.
etag: ?[]const u8,
allocator: std.mem.Allocator,
pub fn deinit(self: *Response) void {
self.allocator.free(self.body);
if (self.etag) |e| self.allocator.free(e);
}
/// Integrity check outcome.
pub const IntegrityResult = union(enum) {
/// No ETag present, or the ETag wasn't a recognized sha256
/// shape. Verification is skipped the caller should treat
/// this the same as a successful verification.
not_applicable,
/// Server's advertised sha256 matches the body's actual
/// sha256. The body is byte-exact with what the server sent.
ok,
/// Mismatch between server's advertised sha256 and the body's
/// actual sha256. Indicates truncation or corruption in
/// transit. `expected_hex` and `actual_hex` are each 64 chars
/// of lowercase hex; they reference internal buffers of the
/// result and are valid for the lifetime of this struct.
mismatch: struct {
expected_hex: [64]u8,
actual_hex: [64]u8,
},
};
/// Verify the body's sha256 against the server's `ETag` header.
///
/// Recognizes `ETag: "sha256:<64-hex>"` (quoted or unquoted, prefix
/// is case-insensitive). Other ETag shapes weak etags, md5, etc.
/// return `.not_applicable` so deployments with non-sha256 etags
/// don't get their requests rejected.
pub fn verifyIntegrity(self: *const Response) IntegrityResult {
const etag = self.etag orelse return .not_applicable;
const expected_hex = parseSha256Etag(etag) orelse return .not_applicable;
var actual: [std.crypto.hash.sha2.Sha256.digest_length]u8 = undefined;
std.crypto.hash.sha2.Sha256.hash(self.body, &actual, .{});
var actual_hex: [std.crypto.hash.sha2.Sha256.digest_length * 2]u8 = undefined;
_ = std.fmt.bufPrint(&actual_hex, "{x}", .{&actual}) catch unreachable;
if (std.ascii.eqlIgnoreCase(&actual_hex, expected_hex)) return .ok;
var result: IntegrityResult = .{ .mismatch = .{
.expected_hex = undefined,
.actual_hex = undefined,
} };
// expected_hex may be uppercase depending on server copy as
// lowercase for stable comparison downstream.
for (expected_hex, 0..) |c, i| result.mismatch.expected_hex[i] = std.ascii.toLower(c);
@memcpy(&result.mismatch.actual_hex, &actual_hex);
return result;
}
};
/// Extract the hex portion of a `"sha256:<hex>"` ETag. Accepts both
/// quoted and unquoted forms (both are commonly written in the wild),
/// and the `sha256:` prefix is case-insensitive. Returns null for any
/// other shape callers should then skip the integrity check rather
/// than failing the request.
fn parseSha256Etag(etag: []const u8) ?[]const u8 {
var v = etag;
if (v.len >= 2 and v[0] == '"' and v[v.len - 1] == '"') v = v[1 .. v.len - 1];
const prefix = "sha256:";
if (v.len <= prefix.len) return null;
if (!std.ascii.eqlIgnoreCase(v[0..prefix.len], prefix)) return null;
const hex = v[prefix.len..];
if (hex.len != std.crypto.hash.sha2.Sha256.digest_length * 2) return null;
for (hex) |c| if (!std.ascii.isHex(c)) return null;
return hex;
}
/// Thin HTTP client wrapper with retry and error classification.
pub const Client = struct {
allocator: std.mem.Allocator,
@ -77,27 +148,73 @@ pub const Client = struct {
}
fn doRequest(self: *Client, method: std.http.Method, url: []const u8, body: ?[]const u8, extra_headers: []const std.http.Header) HttpError!Response {
var aw: std.Io.Writer.Allocating = .init(self.allocator);
const uri = std.Uri.parse(url) catch return HttpError.RequestFailed;
const result = self.http_client.fetch(.{
.location = .{ .url = url },
.method = method,
.payload = body,
var req = self.http_client.request(method, uri, .{
.redirect_behavior = @enumFromInt(3),
.extra_headers = extra_headers,
.response_writer = &aw.writer,
}) catch {
aw.deinit();
}) catch return HttpError.RequestFailed;
defer req.deinit();
if (body) |payload| {
var send_buf: [4096]u8 = undefined;
req.transfer_encoding = .{ .content_length = payload.len };
var bw = req.sendBodyUnflushed(&send_buf) catch return HttpError.RequestFailed;
bw.writer.writeAll(payload) catch return HttpError.RequestFailed;
bw.end() catch return HttpError.RequestFailed;
req.connection.?.flush() catch return HttpError.RequestFailed;
} else {
req.sendBodiless() catch return HttpError.RequestFailed;
}
// Matches the default redirect capacity in std.http.Client.fetch.
var redirect_buffer: [8 * 1024]u8 = undefined;
var response = req.receiveHead(&redirect_buffer) catch return HttpError.RequestFailed;
// Capture the ETag (if any) from the response head BEFORE
// draining the body. `Response.reader()` invalidates
// `head.bytes`, and `iterateHeaders` reads from that slice, so
// anything we want must be duplicated now.
const etag_owned: ?[]const u8 = blk: {
var it = response.head.iterateHeaders();
while (it.next()) |h| {
if (std.ascii.eqlIgnoreCase(h.name, "etag")) {
const dup = self.allocator.dupe(u8, h.value) catch
return HttpError.OutOfMemory;
break :blk dup;
}
}
break :blk null;
};
errdefer if (etag_owned) |e| self.allocator.free(e);
// Drain the body. `readerDecompressing` is adaptive: for
// identity-encoded responses (the zfin server's default) it
// hands back the transfer reader unchanged zero-cost. For
// gzip/deflate/zstd it wraps the transfer reader with the
// appropriate decompressor. The decompress buffer is only
// touched on the compressed paths; sized at 64 KiB as a
// reasonable default for the unlikely case a provider endpoint
// starts sending compressed SRF/JSON.
var aw: std.Io.Writer.Allocating = .init(self.allocator);
errdefer aw.deinit();
var transfer_buffer: [4096]u8 = undefined;
var decompress: std.http.Decompress = undefined;
var decompress_buffer: [64 * 1024]u8 = undefined;
const reader = response.readerDecompressing(&transfer_buffer, &decompress, &decompress_buffer);
_ = reader.streamRemaining(&aw.writer) catch {
return HttpError.RequestFailed;
};
const resp_body = aw.toOwnedSlice() catch {
aw.deinit();
return HttpError.OutOfMemory;
};
return .{
.status = result.status,
.status = response.head.status,
.body = resp_body,
.etag = etag_owned,
.allocator = self.allocator,
};
}
@ -107,6 +224,7 @@ pub const Client = struct {
.ok => return response,
else => {
response.allocator.free(response.body);
if (response.etag) |e| response.allocator.free(e);
return switch (response.status) {
.too_many_requests => HttpError.RateLimited,
.unauthorized, .forbidden => HttpError.Unauthorized,
@ -163,3 +281,95 @@ test "buildUrl" {
defer allocator.free(url);
try std.testing.expectEqualStrings("https://api.example.com/v1/data?symbol=AAPL&apikey=test123", url);
}
test "parseSha256Etag: quoted form" {
const hex = parseSha256Etag("\"sha256:0402d084abcbd4e40993ebe1e55e0beb400ad77c8c5354a46b047c821e36d3b9\"") orelse unreachable;
try std.testing.expectEqualStrings("0402d084abcbd4e40993ebe1e55e0beb400ad77c8c5354a46b047c821e36d3b9", hex);
}
test "parseSha256Etag: unquoted form" {
const hex = parseSha256Etag("sha256:deadbeefcafedeadbeefcafedeadbeefcafedeadbeefcafedeadbeefcafe1234") orelse unreachable;
try std.testing.expectEqualStrings("deadbeefcafedeadbeefcafedeadbeefcafedeadbeefcafedeadbeefcafe1234", hex);
}
test "parseSha256Etag: case-insensitive prefix" {
const hex = parseSha256Etag("\"SHA256:0402d084abcbd4e40993ebe1e55e0beb400ad77c8c5354a46b047c821e36d3b9\"") orelse unreachable;
try std.testing.expectEqualStrings("0402d084abcbd4e40993ebe1e55e0beb400ad77c8c5354a46b047c821e36d3b9", hex);
}
test "parseSha256Etag: wrong scheme returns null" {
try std.testing.expectEqual(@as(?[]const u8, null), parseSha256Etag("\"md5:deadbeef\""));
try std.testing.expectEqual(@as(?[]const u8, null), parseSha256Etag("W/\"weak-etag\""));
try std.testing.expectEqual(@as(?[]const u8, null), parseSha256Etag(""));
}
test "parseSha256Etag: wrong hex length returns null" {
try std.testing.expectEqual(@as(?[]const u8, null), parseSha256Etag("\"sha256:deadbeef\""));
try std.testing.expectEqual(@as(?[]const u8, null), parseSha256Etag("\"sha256:0402d084abcbd4e40993ebe1e55e0beb400ad77c8c5354a46b047c821e36d3b9aa\""));
}
test "parseSha256Etag: non-hex character returns null" {
try std.testing.expectEqual(@as(?[]const u8, null), parseSha256Etag("\"sha256:ZZ02d084abcbd4e40993ebe1e55e0beb400ad77c8c5354a46b047c821e36d3b9\""));
}
test "Response.verifyIntegrity: no etag returns not_applicable" {
var body_buf = [_]u8{ 'h', 'i' };
var response = Response{
.status = .ok,
.body = &body_buf,
.etag = null,
.allocator = std.testing.allocator,
};
const result = response.verifyIntegrity();
try std.testing.expect(result == .not_applicable);
}
test "Response.verifyIntegrity: non-sha256 etag returns not_applicable" {
var body_buf = [_]u8{ 'h', 'i' };
var response = Response{
.status = .ok,
.body = &body_buf,
.etag = "W/\"weak-etag\"",
.allocator = std.testing.allocator,
};
const result = response.verifyIntegrity();
try std.testing.expect(result == .not_applicable);
}
test "Response.verifyIntegrity: matching sha256 returns ok" {
// sha256("hello world") = b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9
const body = "hello world";
var response = Response{
.status = .ok,
.body = body,
.etag = "\"sha256:b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9\"",
.allocator = std.testing.allocator,
};
const result = response.verifyIntegrity();
try std.testing.expect(result == .ok);
}
test "Response.verifyIntegrity: mismatched sha256 returns mismatch" {
const body = "hello world";
// Intentionally wrong digest.
var response = Response{
.status = .ok,
.body = body,
.etag = "\"sha256:0000000000000000000000000000000000000000000000000000000000000000\"",
.allocator = std.testing.allocator,
};
const result = response.verifyIntegrity();
switch (result) {
.mismatch => |m| {
try std.testing.expectEqualStrings(
"0000000000000000000000000000000000000000000000000000000000000000",
&m.expected_hex,
);
try std.testing.expectEqualStrings(
"b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9",
&m.actual_hex,
);
},
else => try std.testing.expect(false),
}
}

View file

@ -1390,6 +1390,18 @@ pub const DataService = struct {
/// Try to sync a cache file from the configured zfin-server.
/// Returns true if the file was successfully synced, false on any error.
/// Silently returns false if no server is configured.
///
/// Applies a single retry with a short delay when the first attempt
/// fails at the HTTP layer OR produces a torn body (integrity
/// mismatch / `looksCompleteSrf` rejection). Motivation: refreshes
/// fan out 20+ symbols across 8 parallel threads, and the tear
/// pattern we've observed so far looks transient per-connection.
/// One retry papers over single-packet hiccups without dramatically
/// extending refresh wall time. If the retry also fails the
/// archive grows by one more `.bin`/`.meta` pair two captures
/// from the same refresh are the most valuable diagnostic signal
/// we can produce (same body shape? same byte offset? same time
/// delta? all answers we can't get from a single failure).
fn syncFromServer(self: *DataService, symbol: []const u8, data_type: cache.DataType) bool {
const server_url = self.config.server_url orelse return false;
const endpoint = switch (data_type) {
@ -1406,6 +1418,32 @@ pub const DataService = struct {
const full_url = std.fmt.allocPrint(self.allocator(), "{s}/{s}{s}", .{ server_url, symbol, endpoint }) catch return false;
defer self.allocator().free(full_url);
const max_attempts: u8 = 2;
const retry_delay_ms: u64 = 250;
var attempt: u8 = 0;
while (attempt < max_attempts) : (attempt += 1) {
if (attempt > 0) {
log.debug(
"{s}: retrying {s} server sync (attempt {d}/{d}) after {d}ms delay",
.{ symbol, @tagName(data_type), attempt + 1, max_attempts, retry_delay_ms },
);
std.Thread.sleep(retry_delay_ms * std.time.ns_per_ms);
}
switch (self.tryOneSync(symbol, data_type, full_url)) {
.ok => return true,
// Torn or network error retry if attempts remain.
.torn, .net_err => {},
}
}
return false;
}
const SyncAttempt = enum { ok, torn, net_err };
/// One attempt at syncing a file from the server. Archives a torn
/// body when detected but does NOT retry the caller decides that.
fn tryOneSync(self: *DataService, symbol: []const u8, data_type: cache.DataType, full_url: []const u8) SyncAttempt {
log.debug("{s}: syncing {s} from server", .{ symbol, @tagName(data_type) });
var client = http.Client.init(self.allocator());
@ -1413,10 +1451,49 @@ pub const DataService = struct {
var response = client.get(full_url) catch |err| {
log.debug("{s}: server sync failed for {s}: {s}", .{ symbol, @tagName(data_type), @errorName(err) });
return false;
return .net_err;
};
defer response.deinit();
// Integrity check: if the server advertised an ETag in
// `"sha256:<hex>"` form, compare the body's actual sha256
// against it. Catches mid-stream truncation that Zig's
// std.http.Client.fetch silently accepts on the Content-Length
// path (EndOfStream from a cut transport is swallowed as a
// normal termination). Archive the mismatching body with the
// advertised etag so post-mortem can see exactly what was
// promised vs what arrived. Deployments with no ETag or a
// non-sha256 etag fall through to `looksCompleteSrf` below
// (backward-compatible with pre-fix servers).
switch (response.verifyIntegrity()) {
.mismatch => |m| {
cache.Store.archiveTornBody(
self.allocator(),
self.config.cache_dir,
symbol,
data_type,
response.body,
.{
.failure_reason = .etag_mismatch,
.http_status = @intFromEnum(response.status),
.server_url = full_url,
.server_etag = response.etag,
},
) catch |err| {
log.debug(
"{s}: failed to archive etag-mismatch {s} body: {s}",
.{ symbol, @tagName(data_type), @errorName(err) },
);
};
log.debug(
"{s}: {s} server response failed integrity check ({d} bytes, expected sha256={s}, actual={s}) — archived under _torn/, not writing to cache",
.{ symbol, @tagName(data_type), response.body.len, m.expected_hex, m.actual_hex },
);
return .torn;
},
.ok, .not_applicable => {},
}
// Validate the response body looks like a complete SRF file before
// writing it to cache. This guards against HTTP body truncation
// (TCP reset, Content-Length mismatch, proxy that flushed a
@ -1442,6 +1519,7 @@ pub const DataService = struct {
.failure_reason = .looks_complete_srf_failed,
.http_status = @intFromEnum(response.status),
.server_url = full_url,
.server_etag = response.etag,
},
) catch |err| {
log.debug(
@ -1453,17 +1531,17 @@ pub const DataService = struct {
"{s}: rejecting torn {s} server response ({d} bytes) — archived under _torn/, not writing to cache",
.{ symbol, @tagName(data_type), response.body.len },
);
return false;
return .torn;
}
// Write to local cache
var s = self.store();
s.writeRaw(symbol, data_type, response.body) catch |err| {
log.debug("{s}: failed to write synced {s} to cache: {s}", .{ symbol, @tagName(data_type), @errorName(err) });
return false;
return .net_err;
};
log.debug("{s}: synced {s} from server ({d} bytes)", .{ symbol, @tagName(data_type), response.body.len });
return true;
return .ok;
}
/// Sync candle data (both daily and meta) from the server.