From 609f3aacd22e956d9e428d6bc45ef491fb694a50 Mon Sep 17 00:00:00 2001 From: Emil Lerch Date: Wed, 24 Jun 2026 10:45:51 -0700 Subject: [PATCH] resolve TODO surrounding cache ttl on supplementary providers --- TODO.md | 39 ------- src/cache/store.zig | 258 +++++++++++++++++++++++++++++++++++++++----- src/service.zig | 25 +++-- 3 files changed, 246 insertions(+), 76 deletions(-) diff --git a/TODO.md b/TODO.md index d60b97a..fe01bb9 100644 --- a/TODO.md +++ b/TODO.md @@ -403,45 +403,6 @@ Eastern rather than a rolling window. This would avoid unnecessary refetches during the trading day and ensure a fetch shortly after close gets fresh data. Probably alleviated by the cron job approach. -## Cache TTL semantics on merge writes — priority LOW - -The `writeMerged` primitive in `cache/store.zig` rewrites `dividends.srf` / -`splits.srf` with `expires = now + ttl` whenever it adds a new record or -upgrades fields on an existing one. This is conceptually wrong: TTL should -reflect "when do we expect new information from the primary provider?", -which is a property of the conversation with that provider — not of the -file's last-modification time. Adding a 25-year-old historical dividend -that Tiingo just supplied tells us nothing about Polygon's freshness; we -shouldn't bump the file's expiry as a side effect. - -The cleaner design: - -- Cache file's `#!expires=` reflects "when did Polygon (the primary) last - say `here's everything I have`?" -- Tiingo merge writes preserve the existing expires, only rewriting records. -- Only `fetchCached`'s post-Polygon-fetch write bumps expires. - -In practice the current behavior caused exactly one observable problem: a -one-time TTL herd on 2026-06-04 when the new merge code's first run added -pre-2010 Tiingo backfill across 23+ symbols in a single overnight burst, -and they all inherited that day's clock for `expires = now + 14d`. We -manually re-staggered (`stagger_cache_ttls.py`) and moved on. - -Steady-state risk: minimal. The merge primitive's "skip if nothing changed" -branch means no-op refreshes don't bump expires. New entries from genuinely -new dividends are spread across the calendar by the dividends themselves -(quarterly cadence varies per ticker). Field upgrades stop firing once -Polygon's metadata is in place. - -When this could matter again: -- Adding a third source for div/splits (TTL semantics get murkier). -- Wiping and rebuilding the server cache (one-time herd recurs). -- A long pause in nightly refreshes followed by a backlog of merge writes. - -Fix would be small: thread `?expires_override` into `writeMerged` and have -the merge path call `serializeWithMeta` with the existing expires (from the -read) when source_hint isn't the primary. - ## On-demand server-side fetch for new symbols Currently the server's SRF endpoints (`/candles`, `/dividends`, etc.) are pure diff --git a/src/cache/store.zig b/src/cache/store.zig index dc29059..458aaa9 100644 --- a/src/cache/store.zig +++ b/src/cache/store.zig @@ -346,7 +346,16 @@ pub const Store = struct { } pub fn CacheResult(comptime T: type) type { - return struct { data: DataFor(T), timestamp: i64 }; + return struct { + data: DataFor(T), + timestamp: i64, + /// The on-disk `#!expires=` directive, or null when the + /// file carried none. Surfaced so the merge path can + /// preserve the primary provider's freshness clock when a + /// secondary source supplements the file (see + /// `writeSupplement`). Most callers ignore it. + expires: ?i64 = null, + }; } pub const Freshness = enum { fresh_only, any }; @@ -404,12 +413,32 @@ pub const Store = struct { const timestamp = it.created orelse std.Io.Timestamp.now(self.io, .real).toSeconds(); const items = deserializeOptions(allocator, &it) catch return null; - return .{ .data = items, .timestamp = timestamp }; + return .{ .data = items, .timestamp = timestamp, .expires = it.expires }; } return readSlice(T, self.io, allocator, data, postProcess, freshness); } + /// Who owns the `#!expires=` freshness clock on a merge write. + /// + /// `#!expires=` answers "when do we next expect to consult the + /// primary provider for this data type?" Only a fetch from that + /// primary may move it. A secondary source that opportunistically + /// supplements the file (e.g. Tiingo dividend rows arriving as a + /// side effect of a candle fetch) must merge its records without + /// resetting the clock, otherwise it masks a due primary refresh. + const ExpiryPolicy = enum { + /// Primary fetch: stamp `expires = now + ttl` (+ jitter). The + /// authoritative freshness signal. + bump, + /// Secondary supplement: keep the existing on-disk `expires`. + /// When there is no existing file yet, fall back to `bump` so + /// a brand-new symbol still gets an initial TTL (this is also + /// what keeps Tiingo-only setups, with no primary key, serving + /// freshly-written data instead of immediately re-missing). + preserve, + }; + /// Serialize data and write to cache with the given TTL. /// /// For `Dividend` and `Split`, this dispatches to `writeMerged`, @@ -442,7 +471,7 @@ pub const Store = struct { source_hint: ?[]const u8, ) void { if (T == Dividend or T == Split) { - self.writeMerged(T, symbol, items, ttl, source_hint); + self.writeMerged(T, symbol, items, ttl, source_hint, .bump); return; } const expires = computeExpires(std.Io.Timestamp.now(self.io, .real).toSeconds(), ttl, symbol); @@ -468,11 +497,40 @@ pub const Store = struct { }; } + /// Merge supplementary `Dividend` / `Split` records from a + /// non-authoritative source into the cache **without** resetting + /// the `#!expires=` freshness clock. Use this for data that + /// arrives as a side effect of fetching something else (the + /// canonical case: Tiingo dividend/split rows piggybacking on a + /// candle fetch in `populateAllFromTiingo`). The records are + /// merged via the same sorted-union semantics as + /// `writeWithSource`; only the expiry handling differs — the + /// existing on-disk expires is preserved so the next primary + /// (Polygon) fetch decides freshness. If no file exists yet, an + /// initial TTL is established (see `ExpiryPolicy.preserve`). + /// + /// `source_hint` names the source in the `info(cache)` log lines + /// (e.g. `"tiingo"`). + pub fn writeSupplement( + self: *Store, + comptime T: type, + symbol: []const u8, + items: DataFor(T), + source_hint: ?[]const u8, + ) void { + comptime std.debug.assert(T == Dividend or T == Split); + // ttl is consulted only on the no-existing-file fallback + // inside `writeMerged`; derive it from the data type rather + // than asking the caller for a value that's usually ignored. + self.writeMerged(T, symbol, items, dataTypeFor(T).ttl(), source_hint, .preserve); + } + /// Sorted-union write for `Dividend` and `Split`. Reads the /// existing cache file, merges `incoming` into it, sorts the - /// union descending by date, and writes the result. If nothing - /// changed, the existing file is left untouched (no mtime bump, - /// no I/O). + /// union descending by date, and writes the result. The file is + /// always rewritten (even on a no-op merge) so the `#!expires=` + /// directive is handled per `expiry`; see the note at the write + /// step below. /// /// Two kinds of merge happen on each incoming entry: /// @@ -502,6 +560,11 @@ pub const Store = struct { /// `source_hint`, when present, names the source in the log /// lines (e.g. `"polygon"` / `"tiingo"`). Defaults to /// `"fetch"` when null. + /// + /// `expiry` selects whether this write owns the freshness clock + /// (`.bump`, the primary-fetch path via `writeWithSource`) or + /// preserves whatever the primary last set (`.preserve`, the + /// supplement path via `writeSupplement`). See `ExpiryPolicy`. fn writeMerged( self: *Store, comptime T: type, @@ -509,6 +572,7 @@ pub const Store = struct { incoming: []const T, ttl: TtlSpec, source_hint: ?[]const u8, + expiry: ExpiryPolicy, ) void { comptime std.debug.assert(T == Dividend or T == Split); @@ -525,6 +589,9 @@ pub const Store = struct { // the deinit handling — they're a pair. const existing_result = self.read(self.allocator, T, symbol, null, .any); const existing: []const T = if (existing_result) |r| r.data else &.{}; + // Snapshot the primary's freshness clock before we rewrite, so + // `.preserve` writes can put it back unchanged. + const existing_expires: ?i64 = if (existing_result) |r| r.expires else null; defer if (existing_result != null) { if (comptime @hasDecl(T, "deinit")) { for (existing) |item| item.deinit(self.allocator); @@ -573,14 +640,16 @@ pub const Store = struct { } // Note: even when nothing was added or upgraded, fall through - // and rewrite the file. The on-disk `#!expires=` directive - // needs to be refreshed every time we successfully fetched - // and merged, otherwise an aged-out file stays aged-out: - // every subsequent refresh pays the full provider rate- - // limiter cost only to discover no changes and skip the - // write, locking the cache into a permanent slow-path. The - // write itself is a sub-millisecond atomic rename of a tiny - // file, so saving it isn't worth the bookkeeping. + // and rewrite the file. On the `.bump` (primary) path the + // on-disk `#!expires=` directive needs refreshing every time + // we successfully fetched and merged, otherwise an aged-out + // file stays aged-out: every subsequent refresh pays the full + // provider rate-limiter cost only to discover no changes and + // skip the write, locking the cache into a permanent slow-path. + // The write itself is a sub-millisecond atomic rename of a + // tiny file, so saving it isn't worth the bookkeeping. On the + // `.preserve` (supplement) path we still rewrite to land merged + // records, but we put the existing expires back untouched. // Sort descending by date (newest first), matching the // existing on-disk convention. @@ -588,7 +657,16 @@ pub const Store = struct { // Serialize via the same generic path as `write` for // non-merged types, but write the union we just built. - const expires = computeExpires(std.Io.Timestamp.now(self.io, .real).toSeconds(), ttl, symbol); + // + // wall-clock required: TTL math for the `.bump` path, and the + // no-existing-file fallback for `.preserve`. + const now_s = std.Io.Timestamp.now(self.io, .real).toSeconds(); + const expires: i64 = switch (expiry) { + .bump => computeExpires(now_s, ttl, symbol), + // Keep the primary's clock. Brand-new file (no prior + // expires) → establish one like a first fetch would. + .preserve => existing_expires orelse computeExpires(now_s, ttl, symbol), + }; const data_type = dataTypeFor(T); const srf_data = serializeWithMeta(T, self.io, self.allocator, merged.items, .{ .expires = expires }) catch |err| { log.warn("{s}: failed to serialize {s}: {s}", .{ symbol, @tagName(data_type), @errorName(err) }); @@ -1612,7 +1690,7 @@ pub const Store = struct { const result = items.toOwnedSlice(allocator) catch return null; items = .empty; // prevent defer from freeing the returned slice - return .{ .data = result, .timestamp = timestamp }; + return .{ .data = result, .timestamp = timestamp, .expires = it.expires }; } /// Generic SRF serializer: emit directives (including `#!created=`) then data records. @@ -2028,14 +2106,17 @@ test "diskStats: empty cache is all zeros; populated counts symbols/files/bytes" } test "writeMerged Dividend: no-change merge still rewrites to refresh expires" { - // The "nothing changed" path used to skip the rewrite as an - // optimization. Problem: once the on-disk `#!expires=` aged - // past TTL, every subsequent refresh paid the full provider - // rate-limiter cost only to discover no changes and skip the - // write again, locking the cache into a permanent slow-path. + // PRIMARY path (`writeWithSource` / .bump). The "nothing + // changed" path used to skip the rewrite as an optimization. + // Problem: once the on-disk `#!expires=` aged past TTL, every + // subsequent refresh paid the full provider rate-limiter cost + // only to discover no changes and skip the write again, locking + // the cache into a permanent slow-path. // // Now we always rewrite. The write is a sub-millisecond atomic // rename of a tiny file; saving it isn't worth the bookkeeping. + // (Contrast: the supplement / .preserve path keeps the existing + // expires — see the writeSupplement tests below.) const allocator = std.testing.allocator; const io = std.testing.io; var tmp = std.testing.tmpDir(.{}); @@ -2081,6 +2162,131 @@ test "writeMerged Dividend: no-change merge still rewrites to refresh expires" { try std.testing.expect(new_expires - now_s > 13 * std.time.s_per_day); } +test "writeSupplement Dividend: preserves an existing future expires" { + // SUPPLEMENT path (.preserve). A Tiingo-driven supplement must + // merge new records without moving the primary's freshness + // clock. Seed a known future expires, supplement a brand-new + // dividend, and confirm the on-disk expires is exactly the + // seeded value (not re-stamped to now+TTL). + const allocator = std.testing.allocator; + const io = std.testing.io; + var tmp = std.testing.tmpDir(.{}); + defer tmp.cleanup(); + const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator); + defer allocator.free(dir_path); + + var s = Store.init(io, allocator, dir_path); + + const now_s = std.Io.Timestamp.now(io, .real).toSeconds(); + const seed_expires = now_s + 100 * std.time.s_per_day; + const seed_divs = [_]Dividend{ + .{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50, .type = .regular }, + }; + const seed_bytes = try Store.serializeWithMeta(Dividend, io, allocator, &seed_divs, .{ .expires = seed_expires }); + defer allocator.free(seed_bytes); + try s.writeRaw("TEST", .dividends, seed_bytes); + + // Tiingo supplies a genuinely-new (months-earlier) dividend, so a + // real merge + rewrite happens. + var incoming = [_]Dividend{ + .{ .ex_date = Date.fromYmd(2024, 2, 15), .amount = 0.50 }, + }; + s.writeSupplement(Dividend, "TEST", incoming[0..], "tiingo"); + + const path = try std.fs.path.join(allocator, &.{ dir_path, "TEST", "dividends.srf" }); + defer allocator.free(path); + const data = try std.Io.Dir.cwd().readFileAlloc(io, path, allocator, .limited(1024 * 1024)); + defer allocator.free(data); + var reader = std.Io.Reader.fixed(data); + const it = try srf.iterator(&reader, allocator, .{ .parse_allocator = .none }); + defer it.deinit(); + + // Expires preserved exactly — the candle-driven Tiingo write did + // not reset Polygon's clock. + try std.testing.expectEqual(seed_expires, it.expires orelse return error.ExpiresMissing); + + // And the merge actually landed (union grew to 2). + const result = s.read(s.allocator, Dividend, "TEST", null, .any) orelse return error.NoCache; + defer allocator.free(result.data); + defer for (result.data) |d| d.deinit(allocator); + try std.testing.expectEqual(@as(usize, 2), result.data.len); +} + +test "writeSupplement Dividend: preserves an aged-out expires (does not refresh)" { + // Direct contrast with the .bump no-change test above: a Tiingo + // supplement on an already-expired file leaves the past expires + // in place, so the next getDividends still treats it as stale and + // consults the primary (Polygon). Tiingo backfilling old data is + // not a reason to consider the primary's data fresh. + const allocator = std.testing.allocator; + const io = std.testing.io; + var tmp = std.testing.tmpDir(.{}); + defer tmp.cleanup(); + const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator); + defer allocator.free(dir_path); + + var s = Store.init(io, allocator, dir_path); + + const now_s = std.Io.Timestamp.now(io, .real).toSeconds(); + const seed_expires = now_s - 30 * std.time.s_per_day; + const seed_divs = [_]Dividend{ + .{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50, .type = .regular }, + }; + const seed_bytes = try Store.serializeWithMeta(Dividend, io, allocator, &seed_divs, .{ .expires = seed_expires }); + defer allocator.free(seed_bytes); + try s.writeRaw("TEST", .dividends, seed_bytes); + + var incoming = [_]Dividend{ + .{ .ex_date = Date.fromYmd(2024, 2, 15), .amount = 0.50 }, + }; + s.writeSupplement(Dividend, "TEST", incoming[0..], "tiingo"); + + const path = try std.fs.path.join(allocator, &.{ dir_path, "TEST", "dividends.srf" }); + defer allocator.free(path); + const data = try std.Io.Dir.cwd().readFileAlloc(io, path, allocator, .limited(1024 * 1024)); + defer allocator.free(data); + var reader = std.Io.Reader.fixed(data); + const it = try srf.iterator(&reader, allocator, .{ .parse_allocator = .none }); + defer it.deinit(); + + // Still the seeded past value — NOT bumped to now+TTL. + try std.testing.expectEqual(seed_expires, it.expires orelse return error.ExpiresMissing); +} + +test "writeSupplement Dividend: no existing file establishes an initial TTL" { + // No primary clock exists yet, so there's nothing to preserve. + // A supplement to a brand-new symbol behaves like a first fetch: + // it stamps a normal jittered TTL so the freshly-written data is + // actually served (important for setups with no primary key). + const allocator = std.testing.allocator; + const io = std.testing.io; + var tmp = std.testing.tmpDir(.{}); + defer tmp.cleanup(); + const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator); + defer allocator.free(dir_path); + + var s = Store.init(io, allocator, dir_path); + + const now_s = std.Io.Timestamp.now(io, .real).toSeconds(); + var incoming = [_]Dividend{ + .{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50 }, + }; + s.writeSupplement(Dividend, "TEST", incoming[0..], "tiingo"); + + const path = try std.fs.path.join(allocator, &.{ dir_path, "TEST", "dividends.srf" }); + defer allocator.free(path); + const data = try std.Io.Dir.cwd().readFileAlloc(io, path, allocator, .limited(1024 * 1024)); + defer allocator.free(data); + var reader = std.Io.Reader.fixed(data); + const it = try srf.iterator(&reader, allocator, .{ .parse_allocator = .none }); + defer it.deinit(); + + // ~14d TTL with ±11% jitter; comfortably bracket the band. + const new_expires = it.expires orelse return error.ExpiresMissing; + try std.testing.expect(new_expires - now_s > 13 * std.time.s_per_day); + try std.testing.expect(new_expires - now_s < 16 * std.time.s_per_day); +} + test "writeMerged Dividend: field-level upgrade fills nulls (Tiingo-then-Polygon)" { // Simulates the order that happens during a server refresh: // Tiingo writes its sparse view first (via populateAllFromTiingo @@ -2101,7 +2307,7 @@ test "writeMerged Dividend: field-level upgrade fills nulls (Tiingo-then-Polygon var tiingo_view = [_]Dividend{ .{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50 }, }; - s.writeWithSource(Dividend, "TEST", tiingo_view[0..], .{ .seconds = Ttl.dividends }, "tiingo"); + s.writeSupplement(Dividend, "TEST", tiingo_view[0..], "tiingo"); // Polygon second: rich — pay_date, record_date, type, currency. var polygon_view = [_]Dividend{ @@ -2376,7 +2582,7 @@ test "writeMerged Dividend: near-match dedup catches last-biz-day vs calendar-en .amount = 0.003422654, }, }; - s.writeWithSource(Dividend, "FDRXX", tiingo_view[0..], .{ .seconds = Ttl.dividends }, "tiingo"); + s.writeSupplement(Dividend, "FDRXX", tiingo_view[0..], "tiingo"); const result = s.read(s.allocator, Dividend, "FDRXX", null, .any) orelse return error.NoCache; defer allocator.free(result.data); @@ -2482,7 +2688,7 @@ test "writeMerged Dividend: near-match dedup tolerates Tiingo amount rounding" { var tiingo_view = [_]Dividend{ .{ .ex_date = Date.fromYmd(2024, 8, 30), .amount = 0.04 }, }; - s.writeWithSource(Dividend, "FAGIX", tiingo_view[0..], .{ .seconds = Ttl.dividends }, "tiingo"); + s.writeSupplement(Dividend, "FAGIX", tiingo_view[0..], "tiingo"); const result = s.read(s.allocator, Dividend, "FAGIX", null, .any) orelse return error.NoCache; defer allocator.free(result.data); @@ -2544,7 +2750,7 @@ test "writeMerged Split: SPYM-style supplementary entry added" { var tiingo_view = [_]Split{ .{ .date = Date.fromYmd(2017, 10, 16), .numerator = 4, .denominator = 1 }, }; - s.write(Split, "SPYM", tiingo_view[0..], .{ .seconds = Ttl.splits }); + s.writeSupplement(Split, "SPYM", tiingo_view[0..], "tiingo"); const result = s.read(s.allocator, Split, "SPYM", null, .any) orelse return error.NoCache; defer allocator.free(result.data); @@ -2577,7 +2783,7 @@ test "writeMerged Split: forward-looking Polygon entry preserved across Tiingo r var tiingo_view = [_]Split{ .{ .date = Date.fromYmd(2020, 8, 31), .numerator = 4, .denominator = 1 }, }; - s.write(Split, "TEST", tiingo_view[0..], .{ .seconds = Ttl.splits }); + s.writeSupplement(Split, "TEST", tiingo_view[0..], "tiingo"); const result = s.read(s.allocator, Split, "TEST", null, .any) orelse return error.NoCache; defer allocator.free(result.data); diff --git a/src/service.zig b/src/service.zig index 6b83d9d..f7422f5 100644 --- a/src/service.zig +++ b/src/service.zig @@ -532,11 +532,12 @@ pub const DataService = struct { // 2026-06-15 ex_date), which Tiingo's price-series // response does not. Tiingo opportunistically // supplements the cache via `populateAllFromTiingo` - // when candle fetches happen — that path uses the - // sorted-union write semantics in - // `cache.Store.writeMerged`, so Polygon's entries - // and Tiingo's entries coexist in `dividends.srf` - // without overwriting each other. + // when candle fetches happen — that path uses + // `cache.Store.writeSupplement`, whose sorted-union + // merge lets Polygon's and Tiingo's entries coexist in + // `dividends.srf` without overwriting each other, and + // which preserves the `#!expires=` clock so a Tiingo + // candle fetch never masks a due Polygon refresh. var pg = try self.getProvider(Polygon); return pg.fetchDividends(self.allocator, symbol, null, null); }, @@ -596,12 +597,14 @@ pub const DataService = struct { if (triple.candles.len > 0) { s.cacheCandles(symbol, triple.candles, .tiingo, 0); } - // Dividends and splits use the merge write path so Tiingo's - // view supplements rather than replaces existing (typically - // Polygon-sourced) records. New entries are logged with - // "tiingo" attribution. - s.writeWithSource(Dividend, symbol, triple.dividends, cache.DataType.dividends.ttl(), "tiingo"); - s.writeWithSource(Split, symbol, triple.splits, cache.DataType.splits.ttl(), "tiingo"); + // Dividends and splits use the supplement write path: Tiingo's + // view merges into existing (typically Polygon-sourced) records + // without resetting the `#!expires=` freshness clock, since + // Polygon — not this opportunistic candle-driven write — owns + // when div/split data is next due for a primary refresh. New + // entries are logged with "tiingo" attribution. + s.writeSupplement(Dividend, symbol, triple.dividends, "tiingo"); + s.writeSupplement(Split, symbol, triple.splits, "tiingo"); return triple; }