resolve TODO surrounding cache ttl on supplementary providers

This commit is contained in:
Emil Lerch 2026-06-24 10:45:51 -07:00
parent 7e6102cc5f
commit 609f3aacd2
Signed by: lobo
GPG key ID: A7B62D657EF764F8
3 changed files with 246 additions and 76 deletions

39
TODO.md
View file

@ -403,45 +403,6 @@ Eastern rather than a rolling window. This would avoid unnecessary refetches
during the trading day and ensure a fetch shortly after close gets fresh data.
Probably alleviated by the cron job approach.
## Cache TTL semantics on merge writes — priority LOW
The `writeMerged` primitive in `cache/store.zig` rewrites `dividends.srf` /
`splits.srf` with `expires = now + ttl` whenever it adds a new record or
upgrades fields on an existing one. This is conceptually wrong: TTL should
reflect "when do we expect new information from the primary provider?",
which is a property of the conversation with that provider — not of the
file's last-modification time. Adding a 25-year-old historical dividend
that Tiingo just supplied tells us nothing about Polygon's freshness; we
shouldn't bump the file's expiry as a side effect.
The cleaner design:
- Cache file's `#!expires=` reflects "when did Polygon (the primary) last
say `here's everything I have`?"
- Tiingo merge writes preserve the existing expires, only rewriting records.
- Only `fetchCached`'s post-Polygon-fetch write bumps expires.
In practice the current behavior caused exactly one observable problem: a
one-time TTL herd on 2026-06-04 when the new merge code's first run added
pre-2010 Tiingo backfill across 23+ symbols in a single overnight burst,
and they all inherited that day's clock for `expires = now + 14d`. We
manually re-staggered (`stagger_cache_ttls.py`) and moved on.
Steady-state risk: minimal. The merge primitive's "skip if nothing changed"
branch means no-op refreshes don't bump expires. New entries from genuinely
new dividends are spread across the calendar by the dividends themselves
(quarterly cadence varies per ticker). Field upgrades stop firing once
Polygon's metadata is in place.
When this could matter again:
- Adding a third source for div/splits (TTL semantics get murkier).
- Wiping and rebuilding the server cache (one-time herd recurs).
- A long pause in nightly refreshes followed by a backlog of merge writes.
Fix would be small: thread `?expires_override` into `writeMerged` and have
the merge path call `serializeWithMeta` with the existing expires (from the
read) when source_hint isn't the primary.
## On-demand server-side fetch for new symbols
Currently the server's SRF endpoints (`/candles`, `/dividends`, etc.) are pure

258
src/cache/store.zig vendored
View file

@ -346,7 +346,16 @@ pub const Store = struct {
}
pub fn CacheResult(comptime T: type) type {
return struct { data: DataFor(T), timestamp: i64 };
return struct {
data: DataFor(T),
timestamp: i64,
/// The on-disk `#!expires=` directive, or null when the
/// file carried none. Surfaced so the merge path can
/// preserve the primary provider's freshness clock when a
/// secondary source supplements the file (see
/// `writeSupplement`). Most callers ignore it.
expires: ?i64 = null,
};
}
pub const Freshness = enum { fresh_only, any };
@ -404,12 +413,32 @@ pub const Store = struct {
const timestamp = it.created orelse std.Io.Timestamp.now(self.io, .real).toSeconds();
const items = deserializeOptions(allocator, &it) catch return null;
return .{ .data = items, .timestamp = timestamp };
return .{ .data = items, .timestamp = timestamp, .expires = it.expires };
}
return readSlice(T, self.io, allocator, data, postProcess, freshness);
}
/// Who owns the `#!expires=` freshness clock on a merge write.
///
/// `#!expires=` answers "when do we next expect to consult the
/// primary provider for this data type?" Only a fetch from that
/// primary may move it. A secondary source that opportunistically
/// supplements the file (e.g. Tiingo dividend rows arriving as a
/// side effect of a candle fetch) must merge its records without
/// resetting the clock, otherwise it masks a due primary refresh.
const ExpiryPolicy = enum {
/// Primary fetch: stamp `expires = now + ttl` (+ jitter). The
/// authoritative freshness signal.
bump,
/// Secondary supplement: keep the existing on-disk `expires`.
/// When there is no existing file yet, fall back to `bump` so
/// a brand-new symbol still gets an initial TTL (this is also
/// what keeps Tiingo-only setups, with no primary key, serving
/// freshly-written data instead of immediately re-missing).
preserve,
};
/// Serialize data and write to cache with the given TTL.
///
/// For `Dividend` and `Split`, this dispatches to `writeMerged`,
@ -442,7 +471,7 @@ pub const Store = struct {
source_hint: ?[]const u8,
) void {
if (T == Dividend or T == Split) {
self.writeMerged(T, symbol, items, ttl, source_hint);
self.writeMerged(T, symbol, items, ttl, source_hint, .bump);
return;
}
const expires = computeExpires(std.Io.Timestamp.now(self.io, .real).toSeconds(), ttl, symbol);
@ -468,11 +497,40 @@ pub const Store = struct {
};
}
/// Merge supplementary `Dividend` / `Split` records from a
/// non-authoritative source into the cache **without** resetting
/// the `#!expires=` freshness clock. Use this for data that
/// arrives as a side effect of fetching something else (the
/// canonical case: Tiingo dividend/split rows piggybacking on a
/// candle fetch in `populateAllFromTiingo`). The records are
/// merged via the same sorted-union semantics as
/// `writeWithSource`; only the expiry handling differs the
/// existing on-disk expires is preserved so the next primary
/// (Polygon) fetch decides freshness. If no file exists yet, an
/// initial TTL is established (see `ExpiryPolicy.preserve`).
///
/// `source_hint` names the source in the `info(cache)` log lines
/// (e.g. `"tiingo"`).
pub fn writeSupplement(
self: *Store,
comptime T: type,
symbol: []const u8,
items: DataFor(T),
source_hint: ?[]const u8,
) void {
comptime std.debug.assert(T == Dividend or T == Split);
// ttl is consulted only on the no-existing-file fallback
// inside `writeMerged`; derive it from the data type rather
// than asking the caller for a value that's usually ignored.
self.writeMerged(T, symbol, items, dataTypeFor(T).ttl(), source_hint, .preserve);
}
/// Sorted-union write for `Dividend` and `Split`. Reads the
/// existing cache file, merges `incoming` into it, sorts the
/// union descending by date, and writes the result. If nothing
/// changed, the existing file is left untouched (no mtime bump,
/// no I/O).
/// union descending by date, and writes the result. The file is
/// always rewritten (even on a no-op merge) so the `#!expires=`
/// directive is handled per `expiry`; see the note at the write
/// step below.
///
/// Two kinds of merge happen on each incoming entry:
///
@ -502,6 +560,11 @@ pub const Store = struct {
/// `source_hint`, when present, names the source in the log
/// lines (e.g. `"polygon"` / `"tiingo"`). Defaults to
/// `"fetch"` when null.
///
/// `expiry` selects whether this write owns the freshness clock
/// (`.bump`, the primary-fetch path via `writeWithSource`) or
/// preserves whatever the primary last set (`.preserve`, the
/// supplement path via `writeSupplement`). See `ExpiryPolicy`.
fn writeMerged(
self: *Store,
comptime T: type,
@ -509,6 +572,7 @@ pub const Store = struct {
incoming: []const T,
ttl: TtlSpec,
source_hint: ?[]const u8,
expiry: ExpiryPolicy,
) void {
comptime std.debug.assert(T == Dividend or T == Split);
@ -525,6 +589,9 @@ pub const Store = struct {
// the deinit handling they're a pair.
const existing_result = self.read(self.allocator, T, symbol, null, .any);
const existing: []const T = if (existing_result) |r| r.data else &.{};
// Snapshot the primary's freshness clock before we rewrite, so
// `.preserve` writes can put it back unchanged.
const existing_expires: ?i64 = if (existing_result) |r| r.expires else null;
defer if (existing_result != null) {
if (comptime @hasDecl(T, "deinit")) {
for (existing) |item| item.deinit(self.allocator);
@ -573,14 +640,16 @@ pub const Store = struct {
}
// Note: even when nothing was added or upgraded, fall through
// and rewrite the file. The on-disk `#!expires=` directive
// needs to be refreshed every time we successfully fetched
// and merged, otherwise an aged-out file stays aged-out:
// every subsequent refresh pays the full provider rate-
// limiter cost only to discover no changes and skip the
// write, locking the cache into a permanent slow-path. The
// write itself is a sub-millisecond atomic rename of a tiny
// file, so saving it isn't worth the bookkeeping.
// and rewrite the file. On the `.bump` (primary) path the
// on-disk `#!expires=` directive needs refreshing every time
// we successfully fetched and merged, otherwise an aged-out
// file stays aged-out: every subsequent refresh pays the full
// provider rate-limiter cost only to discover no changes and
// skip the write, locking the cache into a permanent slow-path.
// The write itself is a sub-millisecond atomic rename of a
// tiny file, so saving it isn't worth the bookkeeping. On the
// `.preserve` (supplement) path we still rewrite to land merged
// records, but we put the existing expires back untouched.
// Sort descending by date (newest first), matching the
// existing on-disk convention.
@ -588,7 +657,16 @@ pub const Store = struct {
// Serialize via the same generic path as `write` for
// non-merged types, but write the union we just built.
const expires = computeExpires(std.Io.Timestamp.now(self.io, .real).toSeconds(), ttl, symbol);
//
// wall-clock required: TTL math for the `.bump` path, and the
// no-existing-file fallback for `.preserve`.
const now_s = std.Io.Timestamp.now(self.io, .real).toSeconds();
const expires: i64 = switch (expiry) {
.bump => computeExpires(now_s, ttl, symbol),
// Keep the primary's clock. Brand-new file (no prior
// expires) establish one like a first fetch would.
.preserve => existing_expires orelse computeExpires(now_s, ttl, symbol),
};
const data_type = dataTypeFor(T);
const srf_data = serializeWithMeta(T, self.io, self.allocator, merged.items, .{ .expires = expires }) catch |err| {
log.warn("{s}: failed to serialize {s}: {s}", .{ symbol, @tagName(data_type), @errorName(err) });
@ -1612,7 +1690,7 @@ pub const Store = struct {
const result = items.toOwnedSlice(allocator) catch return null;
items = .empty; // prevent defer from freeing the returned slice
return .{ .data = result, .timestamp = timestamp };
return .{ .data = result, .timestamp = timestamp, .expires = it.expires };
}
/// Generic SRF serializer: emit directives (including `#!created=`) then data records.
@ -2028,14 +2106,17 @@ test "diskStats: empty cache is all zeros; populated counts symbols/files/bytes"
}
test "writeMerged Dividend: no-change merge still rewrites to refresh expires" {
// The "nothing changed" path used to skip the rewrite as an
// optimization. Problem: once the on-disk `#!expires=` aged
// past TTL, every subsequent refresh paid the full provider
// rate-limiter cost only to discover no changes and skip the
// write again, locking the cache into a permanent slow-path.
// PRIMARY path (`writeWithSource` / .bump). The "nothing
// changed" path used to skip the rewrite as an optimization.
// Problem: once the on-disk `#!expires=` aged past TTL, every
// subsequent refresh paid the full provider rate-limiter cost
// only to discover no changes and skip the write again, locking
// the cache into a permanent slow-path.
//
// Now we always rewrite. The write is a sub-millisecond atomic
// rename of a tiny file; saving it isn't worth the bookkeeping.
// (Contrast: the supplement / .preserve path keeps the existing
// expires see the writeSupplement tests below.)
const allocator = std.testing.allocator;
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
@ -2081,6 +2162,131 @@ test "writeMerged Dividend: no-change merge still rewrites to refresh expires" {
try std.testing.expect(new_expires - now_s > 13 * std.time.s_per_day);
}
test "writeSupplement Dividend: preserves an existing future expires" {
// SUPPLEMENT path (.preserve). A Tiingo-driven supplement must
// merge new records without moving the primary's freshness
// clock. Seed a known future expires, supplement a brand-new
// dividend, and confirm the on-disk expires is exactly the
// seeded value (not re-stamped to now+TTL).
const allocator = std.testing.allocator;
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
defer allocator.free(dir_path);
var s = Store.init(io, allocator, dir_path);
const now_s = std.Io.Timestamp.now(io, .real).toSeconds();
const seed_expires = now_s + 100 * std.time.s_per_day;
const seed_divs = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50, .type = .regular },
};
const seed_bytes = try Store.serializeWithMeta(Dividend, io, allocator, &seed_divs, .{ .expires = seed_expires });
defer allocator.free(seed_bytes);
try s.writeRaw("TEST", .dividends, seed_bytes);
// Tiingo supplies a genuinely-new (months-earlier) dividend, so a
// real merge + rewrite happens.
var incoming = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 2, 15), .amount = 0.50 },
};
s.writeSupplement(Dividend, "TEST", incoming[0..], "tiingo");
const path = try std.fs.path.join(allocator, &.{ dir_path, "TEST", "dividends.srf" });
defer allocator.free(path);
const data = try std.Io.Dir.cwd().readFileAlloc(io, path, allocator, .limited(1024 * 1024));
defer allocator.free(data);
var reader = std.Io.Reader.fixed(data);
const it = try srf.iterator(&reader, allocator, .{ .parse_allocator = .none });
defer it.deinit();
// Expires preserved exactly the candle-driven Tiingo write did
// not reset Polygon's clock.
try std.testing.expectEqual(seed_expires, it.expires orelse return error.ExpiresMissing);
// And the merge actually landed (union grew to 2).
const result = s.read(s.allocator, Dividend, "TEST", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
defer for (result.data) |d| d.deinit(allocator);
try std.testing.expectEqual(@as(usize, 2), result.data.len);
}
test "writeSupplement Dividend: preserves an aged-out expires (does not refresh)" {
// Direct contrast with the .bump no-change test above: a Tiingo
// supplement on an already-expired file leaves the past expires
// in place, so the next getDividends still treats it as stale and
// consults the primary (Polygon). Tiingo backfilling old data is
// not a reason to consider the primary's data fresh.
const allocator = std.testing.allocator;
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
defer allocator.free(dir_path);
var s = Store.init(io, allocator, dir_path);
const now_s = std.Io.Timestamp.now(io, .real).toSeconds();
const seed_expires = now_s - 30 * std.time.s_per_day;
const seed_divs = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50, .type = .regular },
};
const seed_bytes = try Store.serializeWithMeta(Dividend, io, allocator, &seed_divs, .{ .expires = seed_expires });
defer allocator.free(seed_bytes);
try s.writeRaw("TEST", .dividends, seed_bytes);
var incoming = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 2, 15), .amount = 0.50 },
};
s.writeSupplement(Dividend, "TEST", incoming[0..], "tiingo");
const path = try std.fs.path.join(allocator, &.{ dir_path, "TEST", "dividends.srf" });
defer allocator.free(path);
const data = try std.Io.Dir.cwd().readFileAlloc(io, path, allocator, .limited(1024 * 1024));
defer allocator.free(data);
var reader = std.Io.Reader.fixed(data);
const it = try srf.iterator(&reader, allocator, .{ .parse_allocator = .none });
defer it.deinit();
// Still the seeded past value NOT bumped to now+TTL.
try std.testing.expectEqual(seed_expires, it.expires orelse return error.ExpiresMissing);
}
test "writeSupplement Dividend: no existing file establishes an initial TTL" {
// No primary clock exists yet, so there's nothing to preserve.
// A supplement to a brand-new symbol behaves like a first fetch:
// it stamps a normal jittered TTL so the freshly-written data is
// actually served (important for setups with no primary key).
const allocator = std.testing.allocator;
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
defer allocator.free(dir_path);
var s = Store.init(io, allocator, dir_path);
const now_s = std.Io.Timestamp.now(io, .real).toSeconds();
var incoming = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50 },
};
s.writeSupplement(Dividend, "TEST", incoming[0..], "tiingo");
const path = try std.fs.path.join(allocator, &.{ dir_path, "TEST", "dividends.srf" });
defer allocator.free(path);
const data = try std.Io.Dir.cwd().readFileAlloc(io, path, allocator, .limited(1024 * 1024));
defer allocator.free(data);
var reader = std.Io.Reader.fixed(data);
const it = try srf.iterator(&reader, allocator, .{ .parse_allocator = .none });
defer it.deinit();
// ~14d TTL with ±11% jitter; comfortably bracket the band.
const new_expires = it.expires orelse return error.ExpiresMissing;
try std.testing.expect(new_expires - now_s > 13 * std.time.s_per_day);
try std.testing.expect(new_expires - now_s < 16 * std.time.s_per_day);
}
test "writeMerged Dividend: field-level upgrade fills nulls (Tiingo-then-Polygon)" {
// Simulates the order that happens during a server refresh:
// Tiingo writes its sparse view first (via populateAllFromTiingo
@ -2101,7 +2307,7 @@ test "writeMerged Dividend: field-level upgrade fills nulls (Tiingo-then-Polygon
var tiingo_view = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50 },
};
s.writeWithSource(Dividend, "TEST", tiingo_view[0..], .{ .seconds = Ttl.dividends }, "tiingo");
s.writeSupplement(Dividend, "TEST", tiingo_view[0..], "tiingo");
// Polygon second: rich pay_date, record_date, type, currency.
var polygon_view = [_]Dividend{
@ -2376,7 +2582,7 @@ test "writeMerged Dividend: near-match dedup catches last-biz-day vs calendar-en
.amount = 0.003422654,
},
};
s.writeWithSource(Dividend, "FDRXX", tiingo_view[0..], .{ .seconds = Ttl.dividends }, "tiingo");
s.writeSupplement(Dividend, "FDRXX", tiingo_view[0..], "tiingo");
const result = s.read(s.allocator, Dividend, "FDRXX", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
@ -2482,7 +2688,7 @@ test "writeMerged Dividend: near-match dedup tolerates Tiingo amount rounding" {
var tiingo_view = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 8, 30), .amount = 0.04 },
};
s.writeWithSource(Dividend, "FAGIX", tiingo_view[0..], .{ .seconds = Ttl.dividends }, "tiingo");
s.writeSupplement(Dividend, "FAGIX", tiingo_view[0..], "tiingo");
const result = s.read(s.allocator, Dividend, "FAGIX", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
@ -2544,7 +2750,7 @@ test "writeMerged Split: SPYM-style supplementary entry added" {
var tiingo_view = [_]Split{
.{ .date = Date.fromYmd(2017, 10, 16), .numerator = 4, .denominator = 1 },
};
s.write(Split, "SPYM", tiingo_view[0..], .{ .seconds = Ttl.splits });
s.writeSupplement(Split, "SPYM", tiingo_view[0..], "tiingo");
const result = s.read(s.allocator, Split, "SPYM", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
@ -2577,7 +2783,7 @@ test "writeMerged Split: forward-looking Polygon entry preserved across Tiingo r
var tiingo_view = [_]Split{
.{ .date = Date.fromYmd(2020, 8, 31), .numerator = 4, .denominator = 1 },
};
s.write(Split, "TEST", tiingo_view[0..], .{ .seconds = Ttl.splits });
s.writeSupplement(Split, "TEST", tiingo_view[0..], "tiingo");
const result = s.read(s.allocator, Split, "TEST", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);

View file

@ -532,11 +532,12 @@ pub const DataService = struct {
// 2026-06-15 ex_date), which Tiingo's price-series
// response does not. Tiingo opportunistically
// supplements the cache via `populateAllFromTiingo`
// when candle fetches happen that path uses the
// sorted-union write semantics in
// `cache.Store.writeMerged`, so Polygon's entries
// and Tiingo's entries coexist in `dividends.srf`
// without overwriting each other.
// when candle fetches happen that path uses
// `cache.Store.writeSupplement`, whose sorted-union
// merge lets Polygon's and Tiingo's entries coexist in
// `dividends.srf` without overwriting each other, and
// which preserves the `#!expires=` clock so a Tiingo
// candle fetch never masks a due Polygon refresh.
var pg = try self.getProvider(Polygon);
return pg.fetchDividends(self.allocator, symbol, null, null);
},
@ -596,12 +597,14 @@ pub const DataService = struct {
if (triple.candles.len > 0) {
s.cacheCandles(symbol, triple.candles, .tiingo, 0);
}
// Dividends and splits use the merge write path so Tiingo's
// view supplements rather than replaces existing (typically
// Polygon-sourced) records. New entries are logged with
// "tiingo" attribution.
s.writeWithSource(Dividend, symbol, triple.dividends, cache.DataType.dividends.ttl(), "tiingo");
s.writeWithSource(Split, symbol, triple.splits, cache.DataType.splits.ttl(), "tiingo");
// Dividends and splits use the supplement write path: Tiingo's
// view merges into existing (typically Polygon-sourced) records
// without resetting the `#!expires=` freshness clock, since
// Polygon not this opportunistic candle-driven write owns
// when div/split data is next due for a primary refresh. New
// entries are logged with "tiingo" attribution.
s.writeSupplement(Dividend, symbol, triple.dividends, "tiingo");
s.writeSupplement(Split, symbol, triple.splits, "tiingo");
return triple;
}