From 42b8ff1b38914c4131d4f2a692678195ddde8987 Mon Sep 17 00:00:00 2001 From: Emil Lerch Date: Fri, 8 May 2026 11:50:50 -0700 Subject: [PATCH] more robuster error capturing - looks like torn srf is in http transit --- src/cache/store.zig | 410 ++++++++++++++++++++++++++++++++++++++++++++ src/service.zig | 28 ++- 2 files changed, 436 insertions(+), 2 deletions(-) diff --git a/src/cache/store.zig b/src/cache/store.zig index c531e87..215dbde 100644 --- a/src/cache/store.zig +++ b/src/cache/store.zig @@ -2,6 +2,7 @@ const std = @import("std"); const log = std.log.scoped(.cache); const srf = @import("srf"); const atomic = @import("../atomic.zig"); +const version = @import("../version.zig"); const Date = @import("../models/date.zig").Date; const Candle = @import("../models/candle.zig").Candle; const Dividend = @import("../models/dividend.zig").Dividend; @@ -133,6 +134,18 @@ pub const Store = struct { const data = raw orelse return null; defer self.allocator.free(data); + // Read-path self-heal: if a cached candles_daily file doesn't + // look like a complete SRF doc, archive the torn body for + // post-mortem and wipe the candle pair (daily + meta) so the + // next call re-fetches from scratch. Limited to Candle on + // purpose — it's the only data type with a recurring tear + // history + sibling meta to keep in sync, and the only one + // where a silent re-parse-miss every run would be wasted work. + if (T == Candle and !looksCompleteSrf(data)) { + self.selfHealTornCandles(symbol, data); + return null; + } + if (T == EtfProfile or T == OptionsChain) { const is_negative = std.mem.eql(u8, data, negative_cache_content); if (is_negative) { @@ -343,6 +356,217 @@ pub const Store = struct { return true; } + /// Why a body was classified as torn. Surfaced in the archived + /// `.meta` sidecar so post-mortems can distinguish between detection + /// paths. + pub const FailureReason = enum { + /// `looksCompleteSrf` rejected the body bytes after an HTTP + /// response. The data reached us from the network but didn't + /// satisfy the structural sanity check. + looks_complete_srf_failed, + /// A cached file on disk failed to parse when read. Indicates + /// either an earlier torn write that slipped past the guard, or + /// local filesystem corruption. + srf_parse_failed, + /// The integrity digest advertised by the server (e.g. an + /// `ETag: "sha256:..."`) did not match the body bytes we + /// received. Reserved for the HTTP integrity work in Milestone 2. + etag_mismatch, + }; + + /// Context captured alongside a torn body. All fields are optional + /// because the exact detection site determines which signals are + /// available. Consumed by `archiveTornBody` to produce the SRF + /// `.meta` sidecar. + pub const TornMeta = struct { + failure_reason: FailureReason, + /// HTTP status code when the body originated from an HTTP + /// response. Null for parse failures discovered on local read. + http_status: ?u16 = null, + /// Raw `Content-Length` header value from the HTTP response, + /// when the detection path has access to it. + http_content_length: ?u64 = null, + /// Source URL when the body came from an HTTP response. + server_url: ?[]const u8 = null, + /// Raw value of the server's `ETag` header, if present. Captured + /// verbatim (including surrounding quotes and any `sha256:` or + /// other scheme prefix) so post-mortem can see exactly what the + /// server advertised. Paired with `body_sha256` in the sidecar, + /// an `etag_mismatch` failure is trivially explained. + server_etag: ?[]const u8 = null, + }; + + /// Archive a torn cache body for post-mortem diagnosis. + /// + /// Writes two sibling files under `{cache_dir}/_torn/`: + /// `{symbol}_{data_type}_{unix_ts}.bin` — the raw bytes as received + /// `{symbol}_{data_type}_{unix_ts}.meta` — SRF-formatted context + /// + /// The `.meta` file mirrors the rest of the cache's on-disk format + /// so tooling can iterate it with the standard SRF reader. Content: + /// type::tear_metadata + /// symbol:: + /// data_type:: + /// unix_ts:num: + /// iso_ts:: + /// body_length:num: + /// body_sha256::<64-hex> + /// failure_reason:: + /// zfin_commit:: + /// Schema for the SRF `.meta` sidecar emitted by `archiveTornBody`. + /// Each field becomes a `key:type:value` entry in a single record + /// under a `#!srfv1` header. Optional fields with `null` defaults + /// are silently skipped by `srf.fmtFrom` when unset — which is the + /// behavior we want for the http_*, server_*, and `?[]const u8` + /// fields that only some detection paths populate. + const TearRecord = struct { + /// Fixed marker so `_torn/*.meta` is distinguishable at a glance + /// from any other SRF record a future reader might encounter in + /// the cache tree. + type: []const u8, + symbol: []const u8, + data_type: DataType, + unix_ts: i64, + iso_ts: []const u8, + body_length: u64, + body_sha256: []const u8, + failure_reason: FailureReason, + zfin_commit: []const u8, + http_status: ?u16 = null, + http_content_length: ?u64 = null, + server_url: ?[]const u8 = null, + server_etag: ?[]const u8 = null, + last_200_bytes_hex: []const u8, + }; + + pub fn archiveTornBody( + allocator: std.mem.Allocator, + cache_dir: []const u8, + symbol: []const u8, + data_type: DataType, + bytes: []const u8, + meta: TornMeta, + ) !void { + // Ensure the _torn/ directory exists. + const torn_dir = try std.fs.path.join(allocator, &.{ cache_dir, "_torn" }); + defer allocator.free(torn_dir); + std.fs.cwd().makePath(torn_dir) catch |err| switch (err) { + error.PathAlreadyExists => {}, + else => return err, + }; + + const ts = std.time.timestamp(); + + const bin_name = try std.fmt.allocPrint( + allocator, + "{s}_{s}_{d}.bin", + .{ symbol, @tagName(data_type), ts }, + ); + defer allocator.free(bin_name); + const bin_path = try std.fs.path.join(allocator, &.{ torn_dir, bin_name }); + defer allocator.free(bin_path); + + const meta_name = try std.fmt.allocPrint( + allocator, + "{s}_{s}_{d}.meta", + .{ symbol, @tagName(data_type), ts }, + ); + defer allocator.free(meta_name); + const meta_path = try std.fs.path.join(allocator, &.{ torn_dir, meta_name }); + defer allocator.free(meta_path); + + // Write the raw body first — if this fails we don't bother with + // the sidecar, since the sidecar is only useful paired with bytes. + try atomic.writeFileAtomic(allocator, bin_path, bytes); + + // Compute sha256 of the body for the sidecar. + var hash: [std.crypto.hash.sha2.Sha256.digest_length]u8 = undefined; + std.crypto.hash.sha2.Sha256.hash(bytes, &hash, .{}); + var hash_hex: [std.crypto.hash.sha2.Sha256.digest_length * 2]u8 = undefined; + _ = std.fmt.bufPrint(&hash_hex, "{x}", .{&hash}) catch unreachable; + + // ISO-8601 UTC timestamp — computed by hand to avoid pulling in + // a dependency. Format: YYYY-MM-DDTHH:MM:SSZ. + const epoch_seconds = std.time.epoch.EpochSeconds{ .secs = @intCast(ts) }; + const day_seconds = epoch_seconds.getDaySeconds(); + const epoch_day = epoch_seconds.getEpochDay(); + const year_day = epoch_day.calculateYearDay(); + const month_day = year_day.calculateMonthDay(); + var iso_buf: [32]u8 = undefined; + const iso_ts = std.fmt.bufPrint(&iso_buf, "{d:0>4}-{d:0>2}-{d:0>2}T{d:0>2}:{d:0>2}:{d:0>2}Z", .{ + year_day.year, + @intFromEnum(month_day.month), + month_day.day_index + 1, + day_seconds.getHoursIntoDay(), + day_seconds.getMinutesIntoHour(), + day_seconds.getSecondsIntoMinute(), + }) catch unreachable; + + // Last 200 bytes as lowercase hex so the sidecar is + // grep-friendly regardless of what binary garbage sits in the + // torn tail. + const tail_len: usize = @min(bytes.len, 200); + const tail = bytes[bytes.len - tail_len ..]; + const tail_hex = try allocator.alloc(u8, tail_len * 2); + defer allocator.free(tail_hex); + _ = std.fmt.bufPrint(tail_hex, "{x}", .{tail}) catch unreachable; + + const record = TearRecord{ + .type = "tear_metadata", + .symbol = symbol, + .data_type = data_type, + .unix_ts = ts, + .iso_ts = iso_ts, + .body_length = bytes.len, + .body_sha256 = &hash_hex, + .failure_reason = meta.failure_reason, + .zfin_commit = version.version_string, + .http_status = meta.http_status, + .http_content_length = meta.http_content_length, + .server_url = meta.server_url, + .server_etag = meta.server_etag, + .last_200_bytes_hex = tail_hex, + }; + + var aw: std.Io.Writer.Allocating = .init(allocator); + defer aw.deinit(); + const records = [_]TearRecord{record}; + try aw.writer.print("{f}", .{srf.fmtFrom(TearRecord, allocator, &records, .{})}); + + try atomic.writeFileAtomic(allocator, meta_path, aw.writer.buffered()); + } + + /// Read-path self-heal for candle data. On detecting a torn + /// `candles_daily.srf` at read time, archive the bytes with full + /// diagnostic context, then wipe both `candles_daily.srf` and its + /// companion `candles_meta.srf` so the next fetch cycle pulls a + /// clean pair. + /// + /// Best-effort throughout — archive failures are logged at debug + /// and do NOT block the cache invalidation. The goal is to keep + /// the read path recoverable; diagnostics are a bonus. + fn selfHealTornCandles(self: *Store, symbol: []const u8, data: []const u8) void { + archiveTornBody( + self.allocator, + self.cache_dir, + symbol, + .candles_daily, + data, + .{ .failure_reason = .srf_parse_failed }, + ) catch |err| { + log.debug( + "{s}: failed to archive torn candles_daily body ({d} bytes): {s}", + .{ symbol, data.len, @errorName(err) }, + ); + }; + log.debug( + "{s}: self-heal wiping torn candles_daily+candles_meta pair ({d} bytes)", + .{ symbol, data.len }, + ); + self.clearData(symbol, .candles_daily); + self.clearData(symbol, .candles_meta); + } + /// Check if a cached data file is a negative entry (fetch_failed marker). /// Negative entries are always considered "fresh" -- they never expire. pub fn isNegative(self: *Store, symbol: []const u8, data_type: DataType) bool { @@ -1105,6 +1329,192 @@ test "looksCompleteSrf: well-formed body accepted" { try std.testing.expect(Store.looksCompleteSrf("#!srfv1\n")); } +test "archiveTornBody writes .bin + .meta pair with expected SRF content" { + var tmp = std.testing.tmpDir(.{}); + defer tmp.cleanup(); + + const dir_path = try tmp.dir.realpathAlloc(testing.allocator, "."); + defer testing.allocator.free(dir_path); + + // Shape mirrors the canonical FRDM torn body: a header plus a + // complete record, then a record that was cut mid-date-field with + // no terminating newline. + const torn_bytes = "#!srfv1\ndate::2026-04-22,open:num:62.82,close:num:63.23\ndate::2026-04"; + + try Store.archiveTornBody( + testing.allocator, + dir_path, + "FRDM", + .candles_daily, + torn_bytes, + .{ + .failure_reason = .looks_complete_srf_failed, + .http_status = 200, + .http_content_length = 214637, + .server_url = "https://example.test/FRDM/candles", + .server_etag = "\"sha256:deadbeefcafe\"", + }, + ); + + // Find the produced files. The timestamp in the name is produced + // at write time so we can't predict it — list the _torn dir and + // assert exactly one `.bin` + one `.meta`, and that they share a + // prefix (so they're unambiguously paired). + const torn_dir_path = try std.fs.path.join(testing.allocator, &.{ dir_path, "_torn" }); + defer testing.allocator.free(torn_dir_path); + + var torn_dir = try std.fs.cwd().openDir(torn_dir_path, .{ .iterate = true }); + defer torn_dir.close(); + + var bin_name_buf: [128]u8 = undefined; + var meta_name_buf: [128]u8 = undefined; + var bin_len: usize = 0; + var meta_len: usize = 0; + + var it = torn_dir.iterate(); + while (try it.next()) |entry| { + if (std.mem.endsWith(u8, entry.name, ".bin")) { + @memcpy(bin_name_buf[0..entry.name.len], entry.name); + bin_len = entry.name.len; + } else if (std.mem.endsWith(u8, entry.name, ".meta")) { + @memcpy(meta_name_buf[0..entry.name.len], entry.name); + meta_len = entry.name.len; + } + } + try std.testing.expect(bin_len > 0); + try std.testing.expect(meta_len > 0); + + // Prefix without extension should match (same symbol + data_type + // + timestamp). + const bin_prefix = bin_name_buf[0 .. bin_len - ".bin".len]; + const meta_prefix = meta_name_buf[0 .. meta_len - ".meta".len]; + try std.testing.expectEqualStrings(bin_prefix, meta_prefix); + try std.testing.expect(std.mem.startsWith(u8, bin_prefix, "FRDM_candles_daily_")); + + // .bin round-trips verbatim. + const bin_path = try std.fs.path.join(testing.allocator, &.{ torn_dir_path, bin_name_buf[0..bin_len] }); + defer testing.allocator.free(bin_path); + const bin_contents = try std.fs.cwd().readFileAlloc(testing.allocator, bin_path, 1024 * 1024); + defer testing.allocator.free(bin_contents); + try std.testing.expectEqualStrings(torn_bytes, bin_contents); + + // .meta is valid SRF and carries the fields we care about. + const meta_path = try std.fs.path.join(testing.allocator, &.{ torn_dir_path, meta_name_buf[0..meta_len] }); + defer testing.allocator.free(meta_path); + const meta_contents = try std.fs.cwd().readFileAlloc(testing.allocator, meta_path, 1024 * 1024); + defer testing.allocator.free(meta_contents); + + try std.testing.expect(std.mem.startsWith(u8, meta_contents, "#!srfv1\n")); + try std.testing.expect(std.mem.indexOf(u8, meta_contents, "symbol::FRDM") != null); + try std.testing.expect(std.mem.indexOf(u8, meta_contents, "data_type::candles_daily") != null); + try std.testing.expect(std.mem.indexOf(u8, meta_contents, "failure_reason::looks_complete_srf_failed") != null); + try std.testing.expect(std.mem.indexOf(u8, meta_contents, "http_status:num:200") != null); + try std.testing.expect(std.mem.indexOf(u8, meta_contents, "http_content_length:num:214637") != null); + try std.testing.expect(std.mem.indexOf(u8, meta_contents, "server_url::https://example.test/FRDM/candles") != null); + try std.testing.expect(std.mem.indexOf(u8, meta_contents, "server_etag::\"sha256:deadbeefcafe\"") != null); + // Body length matches the torn slice exactly. + var len_buf: [32]u8 = undefined; + const len_str = try std.fmt.bufPrint(&len_buf, "body_length:num:{d}", .{torn_bytes.len}); + try std.testing.expect(std.mem.indexOf(u8, meta_contents, len_str) != null); + // Hex-encoded sha256 — length check (64 hex chars). + const sha_idx = std.mem.indexOf(u8, meta_contents, "body_sha256::").?; + const sha_start = sha_idx + "body_sha256::".len; + try std.testing.expect(meta_contents.len - sha_start >= 64); + for (meta_contents[sha_start .. sha_start + 64]) |c| { + try std.testing.expect(std.ascii.isHex(c)); + } + // last_200_bytes_hex is exactly 2 * min(body_len, 200) chars. + const tail_idx = std.mem.indexOf(u8, meta_contents, "last_200_bytes_hex::").?; + const tail_start = tail_idx + "last_200_bytes_hex::".len; + const tail_end = std.mem.indexOfScalarPos(u8, meta_contents, tail_start, '\n').?; + try std.testing.expectEqual(@as(usize, torn_bytes.len * 2), tail_end - tail_start); +} + +test "Store.read self-heals torn candles_daily and wipes the pair" { + var tmp = std.testing.tmpDir(.{}); + defer tmp.cleanup(); + + const dir_path = try tmp.dir.realpathAlloc(testing.allocator, "."); + defer testing.allocator.free(dir_path); + + var store = Store.init(testing.allocator, dir_path); + try store.ensureSymbolDir("FRDM"); + + // Seed a torn daily and an intact meta — the exact state we saw + // on disk for FRDM in the 2026-05-08 incident. + const torn_daily = "#!srfv1\ndate::2026-04-22,open:num:62.82,close:num:63.23\ndate::2026-04"; + const intact_meta = "#!srfv1\n#!expires=9999999999\n#!created=1777000000\nlast_close:num:67.11,last_date::2026-05-07\n"; + try store.writeRaw("FRDM", .candles_daily, torn_daily); + try store.writeRaw("FRDM", .candles_meta, intact_meta); + + // Reading candles MUST signal cache miss. + const result = store.read(Candle, "FRDM", null, .any); + try std.testing.expect(result == null); + + // Both files are wiped. + const daily_path = try std.fs.path.join(testing.allocator, &.{ dir_path, "FRDM", "candles_daily.srf" }); + defer testing.allocator.free(daily_path); + const meta_path = try std.fs.path.join(testing.allocator, &.{ dir_path, "FRDM", "candles_meta.srf" }); + defer testing.allocator.free(meta_path); + try std.testing.expectError(error.FileNotFound, std.fs.cwd().access(daily_path, .{})); + try std.testing.expectError(error.FileNotFound, std.fs.cwd().access(meta_path, .{})); + + // And the torn body was archived under _torn/. + const torn_dir_path = try std.fs.path.join(testing.allocator, &.{ dir_path, "_torn" }); + defer testing.allocator.free(torn_dir_path); + var torn_dir = try std.fs.cwd().openDir(torn_dir_path, .{ .iterate = true }); + defer torn_dir.close(); + var found_bin = false; + var found_meta = false; + var it = torn_dir.iterate(); + while (try it.next()) |entry| { + if (std.mem.startsWith(u8, entry.name, "FRDM_candles_daily_")) { + if (std.mem.endsWith(u8, entry.name, ".bin")) found_bin = true; + if (std.mem.endsWith(u8, entry.name, ".meta")) found_meta = true; + } + } + try std.testing.expect(found_bin); + try std.testing.expect(found_meta); +} + +test "Store.read does not self-heal an intact candles_daily" { + var tmp = std.testing.tmpDir(.{}); + defer tmp.cleanup(); + + const dir_path = try tmp.dir.realpathAlloc(testing.allocator, "."); + defer testing.allocator.free(dir_path); + + var store = Store.init(testing.allocator, dir_path); + try store.ensureSymbolDir("OK"); + + // Seed a complete (well-formed) candles_daily.srf — a single + // record with the full OHLCV field set the Candle type expects. + const good_daily = + "#!srfv1\n" ++ + "date::2026-04-22,open:num:62.82,high:num:63.24,low:num:62.6,close:num:63.23,adj_close:num:63.23,volume:num:189473\n"; + const good_meta = "#!srfv1\n#!expires=9999999999\n#!created=1777000000\nlast_close:num:63.23,last_date::2026-04-22\n"; + try store.writeRaw("OK", .candles_daily, good_daily); + try store.writeRaw("OK", .candles_meta, good_meta); + + // Reading should succeed, and the cache files must still be there. + const result = store.read(Candle, "OK", null, .any); + try std.testing.expect(result != null); + if (result) |r| { + defer testing.allocator.free(r.data); + try std.testing.expectEqual(@as(usize, 1), r.data.len); + } + + const daily_path = try std.fs.path.join(testing.allocator, &.{ dir_path, "OK", "candles_daily.srf" }); + defer testing.allocator.free(daily_path); + const daily_stat = try std.fs.cwd().statFile(daily_path); + try std.testing.expect(daily_stat.size > 0); + + // And no _torn/ directory was created. + const torn_dir_path = try std.fs.path.join(testing.allocator, &.{ dir_path, "_torn" }); + defer testing.allocator.free(torn_dir_path); + try std.testing.expectError(error.FileNotFound, std.fs.cwd().access(torn_dir_path, .{})); +} + test "Store.dataTypeFor maps model types correctly" { try std.testing.expectEqual(DataType.candles_daily, Store.dataTypeFor(Candle)); try std.testing.expectEqual(DataType.dividends, Store.dataTypeFor(Dividend)); diff --git a/src/service.zig b/src/service.zig index 44c2ec0..cff8f07 100644 --- a/src/service.zig +++ b/src/service.zig @@ -1424,9 +1424,33 @@ pub const DataService = struct { // to the cache otherwise, producing the classic SRF parse error // on the next read: // error(srf): custom parse of value YYYY-MM failed : InvalidDateFormat + // + // When the check rejects a body, archive the raw bytes + context + // under `{cache_dir}/_torn/` so the next time this recurs we + // have ammunition for root-cause analysis. The log line is kept + // at debug level on purpose — user explicitly asked that routine + // rejections not be noisy in production runs. The `.meta` + // sidecar on disk is the durable signal. if (!cache.Store.looksCompleteSrf(response.body)) { - log.warn( - "{s}: rejecting torn {s} server response ({d} bytes) — not writing to cache", + cache.Store.archiveTornBody( + self.allocator(), + self.config.cache_dir, + symbol, + data_type, + response.body, + .{ + .failure_reason = .looks_complete_srf_failed, + .http_status = @intFromEnum(response.status), + .server_url = full_url, + }, + ) catch |err| { + log.debug( + "{s}: failed to archive torn {s} body: {s}", + .{ symbol, @tagName(data_type), @errorName(err) }, + ); + }; + log.debug( + "{s}: rejecting torn {s} server response ({d} bytes) — archived under _torn/, not writing to cache", .{ symbol, @tagName(data_type), response.body.len }, ); return false;