introduce jitter capabilities to ttl in prep for new data types in store

This commit is contained in:
Emil Lerch 2026-05-26 10:53:38 -07:00
parent 4f13144365
commit dfd64bf511
2 changed files with 257 additions and 53 deletions

290
src/cache/store.zig vendored
View file

@ -45,8 +45,114 @@ pub const Ttl = struct {
pub const earnings: i64 = 30 * s_per_day;
/// ETF profiles refresh monthly
pub const etf_profile: i64 = 30 * s_per_day;
/// Per-symbol classification record (sector / industry / country /
/// inception_date / CIK) sourced from Wikidata. The data changes
/// rarely enough that 90 days is comfortable; jitter is layered
/// on at the call site to avoid thundering-herd refresh.
pub const classification: i64 = 90 * s_per_day;
/// Per-symbol ETF metrics (NPORT-P profile + sector breakdown +
/// top holdings). Updated when the fund files its next NPORT-P,
/// which is quarterly. 90-day TTL with call-site jitter strikes
/// the right balance between staleness and refresh load.
pub const etf_metrics: i64 = 90 * s_per_day;
/// Per-CIK entity facts from EDGAR XBRL (initially shares
/// outstanding; future variants for revenue, net income, etc).
/// Refreshes on quarterly filing cadence; 30-day TTL gives a
/// fortnightly margin around each fiscal-quarter boundary.
pub const entity_facts: i64 = 30 * s_per_day;
};
/// Cache TTL specification with optional per-key expiration jitter.
///
/// Existing call sites that just want a fixed expiration use
/// `.{ .seconds = Ttl.X }` and get the same exact-TTL behavior they
/// always had. Call sites that want thundering-herd defense set
/// `.jitter_pct = N` to spread expirations within ±N% of the base,
/// keyed deterministically by the cache entry's key. The same key
/// produces the same expiration across repeated writes useful for
/// keeping expected-vs-actual debugging tractable.
///
/// Policy decision (which `jitter_pct` is right for which data type)
/// lives at the call site, not in this struct. The cache only
/// mechanises "compute final expiration from base + spread."
pub const TtlSpec = struct {
/// Base TTL in seconds. Sentinel `-1` means "never expires" and
/// bypasses jitter entirely.
seconds: i64,
/// Optional thundering-herd defense. When non-zero, the actual
/// expiration is offset by a per-key-deterministic amount within
/// ±(seconds * jitter_pct / 100). Default 0 = no spread, exact
/// `seconds` TTL applied.
///
/// Typical values: 8 for 30/90-day TTLs, 0 for short TTLs where
/// the absolute spread would be larger than meaningful refresh
/// drift.
jitter_pct: u8 = 0,
};
/// Compute the absolute expiration timestamp for a cache write.
///
/// The cache layer's freshness check compares `now` against this
/// expiration. Returning a precise number gives the exact moment a
/// cache entry transitions from fresh to stale.
///
/// Behavior by `spec` shape:
///
/// 1. Negative-sentinel TTL (`spec.seconds < 0`): caller is asking
/// for "never expires" (e.g. `Ttl.candles_historical = -1`).
/// Pass the sentinel through unchanged so freshness checks
/// treat it as effectively-infinite. Jitter does not apply
/// there's no meaningful expiration to spread.
///
/// 2. No jitter (`spec.jitter_pct == 0`): exact `now + seconds`.
/// Identical to the bare-TTL behavior all callers had before
/// `TtlSpec` existed. This is the default.
///
/// 3. Jittered (`spec.jitter_pct > 0`): the actual expiration is
/// offset from the base by a per-key-deterministic amount within
/// ±(seconds * jitter_pct / 100). Two distinct keys typically
/// get distinct offsets; the same key always gets the same
/// offset. The hash function is `std.hash.Wyhash` keyed on
/// `key` only no wall-clock or RNG state so the result is
/// reproducible across processes and across rewrites.
///
/// Why hash-based and not `std.Random`: the property we want is
/// "spread across the population of cache keys," not "unpredictable
/// per write." Hash-by-key gives that exactly: distinct keys map
/// to distinct offsets without coordination. A PRNG would also work
/// in expectation, but at the cost of (a) drifting the expiration
/// each time the same key is rewritten and (b) introducing seed-
/// management to keep tests reproducible. Determinism by key keeps
/// debugging tractable: see an unexpected expiration in a cache
/// file recompute it to confirm.
pub fn computeExpires(now_s: i64, spec: TtlSpec, key: []const u8) i64 {
// Case 1: never-expires sentinel passes through unchanged.
if (spec.seconds < 0) return now_s + spec.seconds;
// Case 2: no jitter requested exact base TTL.
if (spec.jitter_pct == 0) return now_s + spec.seconds;
// Case 3: jitter requested. Compute the maximum offset on either
// side of the base. If percent-of-base rounds to zero, fall back
// to exact (avoids modulo-by-zero and any pretense of spread).
const max_offset = @divFloor(spec.seconds * @as(i64, spec.jitter_pct), 100);
if (max_offset <= 0) return now_s + spec.seconds;
// Map the key's Wyhash into the inclusive range [-max_offset,
// +max_offset]. Modulo by 2*max_offset+1 (the count of integers
// in that range) gives a uniform-ish distribution; subtract
// max_offset to recenter.
const range_size: u64 = @intCast(2 * max_offset + 1);
const hash = std.hash.Wyhash.hash(0, key);
const positive_offset: i64 = @intCast(hash % range_size);
const signed_offset = positive_offset - max_offset;
return now_s + spec.seconds + signed_offset;
}
pub const DataType = enum {
candles_daily,
candles_meta,
@ -56,6 +162,19 @@ pub const DataType = enum {
earnings,
etf_profile,
meta,
/// Per-symbol classification record sourced from Wikidata.
/// Stored at `<cache_dir>/<symbol>/classification.srf`.
classification,
/// Per-symbol NPORT-P-derived ETF metrics (tagged union of
/// profile + sector + holding rows). Stored at
/// `<cache_dir>/<symbol>/etf_metrics.srf`.
etf_metrics,
/// Per-CIK XBRL-derived entity facts (tagged union; initially
/// just shares-outstanding). Stored at
/// `<cache_dir>/<cik>/entity_facts.srf` note CIK-keyed, not
/// symbol-keyed, so a single dual-class issuer (BRK.A / BRK.B)
/// has one shared facts file.
entity_facts,
pub fn fileName(self: DataType) []const u8 {
return switch (self) {
@ -67,6 +186,9 @@ pub const DataType = enum {
.earnings => "earnings.srf",
.etf_profile => "etf_profile.srf",
.meta => "meta.srf",
.classification => "classification.srf",
.etf_metrics => "etf_metrics.srf",
.entity_facts => "entity_facts.srf",
};
}
@ -77,6 +199,9 @@ pub const DataType = enum {
.options => Ttl.options,
.earnings => Ttl.earnings,
.etf_profile => Ttl.etf_profile,
.classification => Ttl.classification,
.etf_metrics => Ttl.etf_metrics,
.entity_facts => Ttl.entity_facts,
.candles_daily, .candles_meta, .meta => 0,
};
}
@ -207,7 +332,7 @@ pub const Store = struct {
comptime T: type,
symbol: []const u8,
items: DataFor(T),
ttl: i64,
ttl: TtlSpec,
) void {
self.writeWithSource(T, symbol, items, ttl, null);
}
@ -222,14 +347,14 @@ pub const Store = struct {
comptime T: type,
symbol: []const u8,
items: DataFor(T),
ttl: i64,
ttl: TtlSpec,
source_hint: ?[]const u8,
) void {
if (T == Dividend or T == Split) {
self.writeMerged(T, symbol, items, ttl, source_hint);
return;
}
const expires = std.Io.Timestamp.now(self.io, .real).toSeconds() + ttl;
const expires = computeExpires(std.Io.Timestamp.now(self.io, .real).toSeconds(), ttl, symbol);
const data_type = dataTypeFor(T);
if (T == EtfProfile) {
const srf_data = serializeEtfProfile(self.io, self.allocator, items, .{ .expires = expires }) catch |err| {
@ -302,7 +427,7 @@ pub const Store = struct {
comptime T: type,
symbol: []const u8,
incoming: []const T,
ttl: i64,
ttl: TtlSpec,
source_hint: ?[]const u8,
) void {
comptime std.debug.assert(T == Dividend or T == Split);
@ -379,7 +504,7 @@ pub const Store = struct {
// Serialize via the same generic path as `write` for
// non-merged types, but write the union we just built.
const expires = std.Io.Timestamp.now(self.io, .real).toSeconds() + ttl;
const expires = computeExpires(std.Io.Timestamp.now(self.io, .real).toSeconds(), ttl, symbol);
const data_type = dataTypeFor(T);
const srf_data = serializeWithMeta(T, self.io, self.allocator, merged.items, .{ .expires = expires }) catch |err| {
log.warn("{s}: failed to serialize {s}: {s}", .{ symbol, @tagName(data_type), @errorName(err) });
@ -625,7 +750,7 @@ pub const Store = struct {
/// Ensure the cache directory for a symbol exists.
pub fn ensureSymbolDir(self: *Store, symbol: []const u8) !void {
const path = try self.symbolPath(symbol, "");
const path = try self.entryPath(symbol, "");
defer self.allocator.free(path);
std.Io.Dir.cwd().createDirPath(self.io, path) catch |err| switch (err) {
error.PathAlreadyExists => {},
@ -635,7 +760,7 @@ pub const Store = struct {
/// Clear all cached data for a symbol.
pub fn clearSymbol(self: *Store, symbol: []const u8) !void {
const path = try self.symbolPath(symbol, "");
const path = try self.entryPath(symbol, "");
defer self.allocator.free(path);
// Best-effort clear: deleting a non-existent symbol dir is
// a no-op success from the caller's POV, so log + continue.
@ -911,7 +1036,7 @@ pub const Store = struct {
/// Check if a cached data file is a negative entry (fetch_failed marker).
/// Negative entries are always considered "fresh" -- they never expire.
pub fn isNegative(self: *Store, symbol: []const u8, data_type: DataType) bool {
const path = self.symbolPath(symbol, data_type.fileName()) catch return false;
const path = self.entryPath(symbol, data_type.fileName()) catch return false;
defer self.allocator.free(path);
const file = std.Io.Dir.cwd().openFile(self.io, path, .{}) catch return false;
@ -926,7 +1051,7 @@ pub const Store = struct {
/// Clear a specific data type for a symbol.
pub fn clearData(self: *Store, symbol: []const u8, data_type: DataType) void {
const path = self.symbolPath(symbol, data_type.fileName()) catch return;
const path = self.entryPath(symbol, data_type.fileName()) catch return;
defer self.allocator.free(path);
std.Io.Dir.cwd().deleteFile(self.io, path) catch |err| std.log.debug("clearData deleteFile({s}): {t}", .{ path, err });
}
@ -1039,7 +1164,7 @@ pub const Store = struct {
// Private I/O
fn readRaw(self: *Store, symbol: []const u8, data_type: DataType) !?[]const u8 {
const path = try self.symbolPath(symbol, data_type.fileName());
const path = try self.entryPath(symbol, data_type.fileName());
defer self.allocator.free(path);
return std.Io.Dir.cwd().readFileAlloc(self.io, path, self.allocator, .limited(50 * 1024 * 1024)) catch |err| switch (err) {
@ -1067,7 +1192,7 @@ pub const Store = struct {
/// value.
pub fn writeRaw(self: *Store, symbol: []const u8, data_type: DataType, data: []const u8) !void {
try self.ensureSymbolDir(symbol);
const path = try self.symbolPath(symbol, data_type.fileName());
const path = try self.entryPath(symbol, data_type.fileName());
defer self.allocator.free(path);
try atomic.writeFileAtomic(self.io, self.allocator, path, data);
@ -1087,7 +1212,7 @@ pub const Store = struct {
/// callers are expected to fall back to a full rewrite path in that
/// case (see `appendCandles`).
fn appendRaw(self: *Store, symbol: []const u8, data_type: DataType, data: []const u8) !void {
const path = try self.symbolPath(symbol, data_type.fileName());
const path = try self.entryPath(symbol, data_type.fileName());
defer self.allocator.free(path);
const existing = std.Io.Dir.cwd().readFileAlloc(self.io, path, self.allocator, .limited(50 * 1024 * 1024)) catch |err| switch (err) {
@ -1104,11 +1229,19 @@ pub const Store = struct {
try atomic.writeFileAtomic(self.io, self.allocator, path, combined);
}
fn symbolPath(self: *Store, symbol: []const u8, file_name: []const u8) ![]const u8 {
/// Build the on-disk path for a cache entry under `<cache_dir>/<key>/<file_name>`.
/// `key` is the entry's primary key typically a ticker symbol,
/// but can also be a CIK (for entity-keyed data) or other stable
/// identifier. The cache layer is agnostic to which kind of key
/// the caller passed; the directory name is just whatever string
/// was supplied. Pass an empty `file_name` to get the entry's
/// directory path (used by housekeeping calls that operate on
/// the directory rather than a file inside it).
fn entryPath(self: *Store, key: []const u8, file_name: []const u8) ![]const u8 {
if (file_name.len == 0) {
return std.fs.path.join(self.allocator, &.{ self.cache_dir, symbol });
return std.fs.path.join(self.allocator, &.{ self.cache_dir, key });
}
return std.fs.path.join(self.allocator, &.{ self.cache_dir, symbol, file_name });
return std.fs.path.join(self.allocator, &.{ self.cache_dir, key, file_name });
}
// Private serialization: generic
@ -1547,7 +1680,7 @@ test "writeMerged Dividend: empty cache writes input sorted descending" {
.{ .ex_date = Date.fromYmd(2024, 8, 15), .amount = 0.55 },
.{ .ex_date = Date.fromYmd(2024, 2, 15), .amount = 0.48 },
};
s.write(Dividend, "TEST", incoming[0..], Ttl.dividends);
s.write(Dividend, "TEST", incoming[0..], .{ .seconds = Ttl.dividends });
const result = s.read(Dividend, "TEST", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
@ -1577,14 +1710,14 @@ test "writeMerged Dividend: existing entries preserved on key collision" {
.type = .regular,
},
};
s.write(Dividend, "TEST", initial[0..], Ttl.dividends);
s.write(Dividend, "TEST", initial[0..], .{ .seconds = Ttl.dividends });
// Second write: same ex_date, sparser entry (Tiingo-style: no pay_date, no type).
// Existing entry should win.
var incoming = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.99, .type = .unknown },
};
s.write(Dividend, "TEST", incoming[0..], Ttl.dividends);
s.write(Dividend, "TEST", incoming[0..], .{ .seconds = Ttl.dividends });
const result = s.read(Dividend, "TEST", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
@ -1610,13 +1743,13 @@ test "writeMerged Dividend: union sorted desc, new entry added" {
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50, .type = .regular },
.{ .ex_date = Date.fromYmd(2024, 2, 15), .amount = 0.48, .type = .regular },
};
s.write(Dividend, "TEST", initial[0..], Ttl.dividends);
s.write(Dividend, "TEST", initial[0..], .{ .seconds = Ttl.dividends });
// New ex_date that wasn't already present.
var incoming = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 8, 15), .amount = 0.55 },
};
s.write(Dividend, "TEST", incoming[0..], Ttl.dividends);
s.write(Dividend, "TEST", incoming[0..], .{ .seconds = Ttl.dividends });
const result = s.read(Dividend, "TEST", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
@ -1640,7 +1773,7 @@ test "writeMerged Dividend: no-op when nothing new" {
var initial = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50, .type = .regular },
};
s.write(Dividend, "TEST", initial[0..], Ttl.dividends);
s.write(Dividend, "TEST", initial[0..], .{ .seconds = Ttl.dividends });
// Capture file mtime before second (no-op) write.
const path = try std.fs.path.join(allocator, &.{ dir_path, "TEST", "dividends.srf" });
@ -1654,7 +1787,7 @@ test "writeMerged Dividend: no-op when nothing new" {
var repeat = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50, .type = .regular },
};
s.write(Dividend, "TEST", repeat[0..], Ttl.dividends);
s.write(Dividend, "TEST", repeat[0..], .{ .seconds = Ttl.dividends });
const stat_after = try std.Io.Dir.cwd().statFile(io, path, .{});
try std.testing.expectEqual(stat_before.mtime, stat_after.mtime);
@ -1680,7 +1813,7 @@ test "writeMerged Dividend: field-level upgrade fills nulls (Tiingo-then-Polygon
var tiingo_view = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50 },
};
s.writeWithSource(Dividend, "TEST", tiingo_view[0..], Ttl.dividends, "tiingo");
s.writeWithSource(Dividend, "TEST", tiingo_view[0..], .{ .seconds = Ttl.dividends }, "tiingo");
// Polygon second: rich pay_date, record_date, type, currency.
var polygon_view = [_]Dividend{
@ -1695,7 +1828,7 @@ test "writeMerged Dividend: field-level upgrade fills nulls (Tiingo-then-Polygon
// covered by a separate test.
},
};
s.writeWithSource(Dividend, "TEST", polygon_view[0..], Ttl.dividends, "polygon");
s.writeWithSource(Dividend, "TEST", polygon_view[0..], .{ .seconds = Ttl.dividends }, "polygon");
const result = s.read(Dividend, "TEST", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
@ -1726,12 +1859,12 @@ test "writeMerged Dividend: type unknown counts as null and gets upgraded" {
var tiingo_view = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50, .type = .unknown },
};
s.writeWithSource(Dividend, "TEST", tiingo_view[0..], Ttl.dividends, "tiingo");
s.writeWithSource(Dividend, "TEST", tiingo_view[0..], .{ .seconds = Ttl.dividends }, "tiingo");
var polygon_view = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50, .type = .special },
};
s.writeWithSource(Dividend, "TEST", polygon_view[0..], Ttl.dividends, "polygon");
s.writeWithSource(Dividend, "TEST", polygon_view[0..], .{ .seconds = Ttl.dividends }, "polygon");
const result = s.read(Dividend, "TEST", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
@ -1763,7 +1896,7 @@ test "writeMerged Dividend: non-null fields are not overwritten" {
.type = .regular,
},
};
s.writeWithSource(Dividend, "TEST", first[0..], Ttl.dividends, "polygon");
s.writeWithSource(Dividend, "TEST", first[0..], .{ .seconds = Ttl.dividends }, "polygon");
// Second write with a different (and wrong) pay_date and type.
var second = [_]Dividend{
@ -1774,7 +1907,7 @@ test "writeMerged Dividend: non-null fields are not overwritten" {
.type = .special,
},
};
s.writeWithSource(Dividend, "TEST", second[0..], Ttl.dividends, "polygon");
s.writeWithSource(Dividend, "TEST", second[0..], .{ .seconds = Ttl.dividends }, "polygon");
const result = s.read(Dividend, "TEST", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
@ -1807,7 +1940,7 @@ test "writeMerged Dividend: upgrade is no-op when both have same fields" {
.type = .regular,
},
};
s.writeWithSource(Dividend, "TEST", initial[0..], Ttl.dividends, "polygon");
s.writeWithSource(Dividend, "TEST", initial[0..], .{ .seconds = Ttl.dividends }, "polygon");
const path = try std.fs.path.join(allocator, &.{ dir_path, "TEST", "dividends.srf" });
defer allocator.free(path);
@ -1823,7 +1956,7 @@ test "writeMerged Dividend: upgrade is no-op when both have same fields" {
.type = .regular,
},
};
s.writeWithSource(Dividend, "TEST", repeat[0..], Ttl.dividends, "polygon");
s.writeWithSource(Dividend, "TEST", repeat[0..], .{ .seconds = Ttl.dividends }, "polygon");
const stat_after = try std.Io.Dir.cwd().statFile(io, path, .{});
try std.testing.expectEqual(stat_before.mtime, stat_after.mtime);
@ -1854,7 +1987,7 @@ test "writeMerged Dividend: near-match dedup catches last-biz-day vs calendar-en
.type = .regular,
},
};
s.writeWithSource(Dividend, "FDRXX", polygon_view[0..], Ttl.dividends, "polygon");
s.writeWithSource(Dividend, "FDRXX", polygon_view[0..], .{ .seconds = Ttl.dividends }, "polygon");
// Tiingo's view: 2025-08-29 (Friday last business day),
// identical amount.
@ -1864,7 +1997,7 @@ test "writeMerged Dividend: near-match dedup catches last-biz-day vs calendar-en
.amount = 0.003422654,
},
};
s.writeWithSource(Dividend, "FDRXX", tiingo_view[0..], Ttl.dividends, "tiingo");
s.writeWithSource(Dividend, "FDRXX", tiingo_view[0..], .{ .seconds = Ttl.dividends }, "tiingo");
const result = s.read(Dividend, "FDRXX", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
@ -1893,13 +2026,13 @@ test "writeMerged Dividend: near-match dedup respects 3-day window upper bound"
var first = [_]Dividend{
.{ .ex_date = Date.fromYmd(2025, 8, 31), .amount = 0.003422654, .type = .regular },
};
s.writeWithSource(Dividend, "TEST", first[0..], Ttl.dividends, "polygon");
s.writeWithSource(Dividend, "TEST", first[0..], .{ .seconds = Ttl.dividends }, "polygon");
// 4 days earlier outside the ±3 day window.
var second = [_]Dividend{
.{ .ex_date = Date.fromYmd(2025, 8, 27), .amount = 0.003422654 },
};
s.writeWithSource(Dividend, "TEST", second[0..], Ttl.dividends, "tiingo");
s.writeWithSource(Dividend, "TEST", second[0..], .{ .seconds = Ttl.dividends }, "tiingo");
const result = s.read(Dividend, "TEST", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
@ -1926,13 +2059,13 @@ test "writeMerged Dividend: near-match dedup respects amount tolerance" {
var first = [_]Dividend{
.{ .ex_date = Date.fromYmd(2025, 8, 31), .amount = 0.003422654, .type = .regular },
};
s.writeWithSource(Dividend, "TEST", first[0..], Ttl.dividends, "polygon");
s.writeWithSource(Dividend, "TEST", first[0..], .{ .seconds = Ttl.dividends }, "polygon");
// 2 days earlier, very different amount (>1% relative).
var second = [_]Dividend{
.{ .ex_date = Date.fromYmd(2025, 8, 29), .amount = 0.005 },
};
s.writeWithSource(Dividend, "TEST", second[0..], Ttl.dividends, "tiingo");
s.writeWithSource(Dividend, "TEST", second[0..], .{ .seconds = Ttl.dividends }, "tiingo");
const result = s.read(Dividend, "TEST", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
@ -1964,13 +2097,13 @@ test "writeMerged Dividend: near-match dedup tolerates Tiingo amount rounding" {
.type = .regular,
},
};
s.writeWithSource(Dividend, "FAGIX", polygon_view[0..], Ttl.dividends, "polygon");
s.writeWithSource(Dividend, "FAGIX", polygon_view[0..], .{ .seconds = Ttl.dividends }, "polygon");
// Tiingo's view: 1 day earlier, amount rounded to 2 decimals.
var tiingo_view = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 8, 30), .amount = 0.04 },
};
s.writeWithSource(Dividend, "FAGIX", tiingo_view[0..], Ttl.dividends, "tiingo");
s.writeWithSource(Dividend, "FAGIX", tiingo_view[0..], .{ .seconds = Ttl.dividends }, "tiingo");
const result = s.read(Dividend, "FAGIX", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
@ -1999,7 +2132,7 @@ test "writeMerged Split: near-match dedup is a no-op (no amount field)" {
var first = [_]Split{
.{ .date = Date.fromYmd(2024, 6, 10), .numerator = 10, .denominator = 1 },
};
s.writeWithSource(Split, "TEST", first[0..], Ttl.splits, "polygon");
s.writeWithSource(Split, "TEST", first[0..], .{ .seconds = Ttl.splits }, "polygon");
// 1 day apart (would be in near-match window if it applied to
// splits). Still kept as distinct because the dedup logic
@ -2007,7 +2140,7 @@ test "writeMerged Split: near-match dedup is a no-op (no amount field)" {
var second = [_]Split{
.{ .date = Date.fromYmd(2024, 6, 11), .numerator = 10, .denominator = 1 },
};
s.writeWithSource(Split, "TEST", second[0..], Ttl.splits, "tiingo");
s.writeWithSource(Split, "TEST", second[0..], .{ .seconds = Ttl.splits }, "tiingo");
const result = s.read(Split, "TEST", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
@ -2026,13 +2159,13 @@ test "writeMerged Split: SPYM-style supplementary entry added" {
var s = Store.init(io, allocator, dir_path);
// Polygon's view: empty (the bug case Polygon doesn't carry SPYM's 2017 split).
var initial = [_]Split{};
s.write(Split, "SPYM", initial[0..], Ttl.splits);
s.write(Split, "SPYM", initial[0..], .{ .seconds = Ttl.splits });
// Tiingo supplements with the 2017 4:1 split.
var tiingo_view = [_]Split{
.{ .date = Date.fromYmd(2017, 10, 16), .numerator = 4, .denominator = 1 },
};
s.write(Split, "SPYM", tiingo_view[0..], Ttl.splits);
s.write(Split, "SPYM", tiingo_view[0..], .{ .seconds = Ttl.splits });
const result = s.read(Split, "SPYM", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
@ -2058,14 +2191,14 @@ test "writeMerged Split: forward-looking Polygon entry preserved across Tiingo r
.{ .date = Date.fromYmd(2026, 12, 1), .numerator = 2, .denominator = 1 },
.{ .date = Date.fromYmd(2020, 8, 31), .numerator = 4, .denominator = 1 },
};
s.write(Split, "TEST", polygon_view[0..], Ttl.splits);
s.write(Split, "TEST", polygon_view[0..], .{ .seconds = Ttl.splits });
// Tiingo refresh: only knows about historical entries (its own data
// doesn't include forward-looking yet-to-occur splits).
var tiingo_view = [_]Split{
.{ .date = Date.fromYmd(2020, 8, 31), .numerator = 4, .denominator = 1 },
};
s.write(Split, "TEST", tiingo_view[0..], Ttl.splits);
s.write(Split, "TEST", tiingo_view[0..], .{ .seconds = Ttl.splits });
const result = s.read(Split, "TEST", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
@ -2192,6 +2325,12 @@ test "TTL constants are reasonable" {
// Earnings and ETF profiles refresh monthly
try std.testing.expectEqual(@as(i64, 30 * std.time.s_per_day), Ttl.earnings);
try std.testing.expectEqual(@as(i64, 30 * std.time.s_per_day), Ttl.etf_profile);
// New types: classification (90d) and etf_metrics (90d) refresh
// quarterly; entity_facts (30d) refreshes monthly.
try std.testing.expectEqual(@as(i64, 90 * std.time.s_per_day), Ttl.classification);
try std.testing.expectEqual(@as(i64, 90 * std.time.s_per_day), Ttl.etf_metrics);
try std.testing.expectEqual(@as(i64, 30 * std.time.s_per_day), Ttl.entity_facts);
}
test "DataType.ttl returns correct values" {
@ -2200,6 +2339,9 @@ test "DataType.ttl returns correct values" {
try std.testing.expectEqual(Ttl.options, DataType.options.ttl());
try std.testing.expectEqual(Ttl.earnings, DataType.earnings.ttl());
try std.testing.expectEqual(Ttl.etf_profile, DataType.etf_profile.ttl());
try std.testing.expectEqual(Ttl.classification, DataType.classification.ttl());
try std.testing.expectEqual(Ttl.etf_metrics, DataType.etf_metrics.ttl());
try std.testing.expectEqual(Ttl.entity_facts, DataType.entity_facts.ttl());
// These types have no TTL (0 = managed elsewhere)
try std.testing.expectEqual(@as(i64, 0), DataType.candles_daily.ttl());
@ -2216,6 +2358,9 @@ test "DataType.fileName returns correct file names" {
try std.testing.expectEqualStrings("earnings.srf", DataType.earnings.fileName());
try std.testing.expectEqualStrings("etf_profile.srf", DataType.etf_profile.fileName());
try std.testing.expectEqualStrings("meta.srf", DataType.meta.fileName());
try std.testing.expectEqualStrings("classification.srf", DataType.classification.fileName());
try std.testing.expectEqualStrings("etf_metrics.srf", DataType.etf_metrics.fileName());
try std.testing.expectEqualStrings("entity_facts.srf", DataType.entity_facts.fileName());
}
test "negative_cache_content format" {
@ -2224,6 +2369,59 @@ test "negative_cache_content format" {
try std.testing.expect(std.mem.indexOf(u8, Store.negative_cache_content, "fetch_failed") != null);
}
test "computeExpires with jitter_pct=0 produces exact base TTL" {
const now: i64 = 1_000_000;
const ttl: i64 = 86_400; // one day
const e1 = computeExpires(now, .{ .seconds = ttl }, "AAPL");
const e2 = computeExpires(now, .{ .seconds = ttl }, "MSFT");
const e3 = computeExpires(now, .{ .seconds = ttl, .jitter_pct = 0 }, "VTI");
try std.testing.expectEqual(now + ttl, e1);
try std.testing.expectEqual(now + ttl, e2);
try std.testing.expectEqual(now + ttl, e3);
}
test "computeExpires with jitter_pct>0 spreads within bounds and is deterministic" {
const now: i64 = 1_000_000;
const ttl: i64 = 86_400 * 90; // 90 days
const jitter_pct: u8 = 8;
const span: i64 = @divFloor(ttl * jitter_pct, 100);
// Determinism: same key, same expiration.
const a1 = computeExpires(now, .{ .seconds = ttl, .jitter_pct = jitter_pct }, "AAPL");
const a2 = computeExpires(now, .{ .seconds = ttl, .jitter_pct = jitter_pct }, "AAPL");
try std.testing.expectEqual(a1, a2);
// Each key's expiration falls within ±span of the base.
const keys = [_][]const u8{ "AAPL", "MSFT", "GOOGL", "VTI", "SPY", "BND", "GLD" };
for (keys) |key| {
const exp = computeExpires(now, .{ .seconds = ttl, .jitter_pct = jitter_pct }, key);
const offset = exp - now - ttl;
try std.testing.expect(offset >= -span);
try std.testing.expect(offset <= span);
}
}
test "computeExpires with jitter_pct>0 spreads different keys to different expirations" {
const now: i64 = 1_000_000;
const ttl: i64 = 86_400 * 90;
const a = computeExpires(now, .{ .seconds = ttl, .jitter_pct = 8 }, "AAPL");
const m = computeExpires(now, .{ .seconds = ttl, .jitter_pct = 8 }, "MSFT");
const g = computeExpires(now, .{ .seconds = ttl, .jitter_pct = 8 }, "GOOGL");
// At least two of the three should differ defends against the
// hash collapsing distinct inputs to the same offset.
const all_equal = a == m and m == g;
try std.testing.expect(!all_equal);
}
test "computeExpires bypasses jitter for negative-sentinel TTL" {
// Ttl.candles_historical = -1 means "never expires." Jitter
// must not fold a negative sentinel into something that looks
// like a finite expiration in the past.
const now: i64 = 1_000_000;
const e = computeExpires(now, .{ .seconds = -1, .jitter_pct = 8 }, "AAPL");
try std.testing.expectEqual(now - 1, e);
}
test "looksCompleteSrf: empty is invalid" {
try std.testing.expect(!Store.looksCompleteSrf(""));
}
@ -2633,7 +2831,7 @@ test "writeRaw atomicity: concurrent readers never observe a truncated file" {
}
fn reader(self: *@This()) void {
const path = self.store.symbolPath("SYM", DataType.candles_daily.fileName()) catch return;
const path = self.store.entryPath("SYM", DataType.candles_daily.fileName()) catch return;
defer self.store.allocator.free(path);
while (!self.stop.load(.acquire)) {
@ -2730,7 +2928,7 @@ test "appendRaw atomicity: concurrent readers see either pre- or post-append, ne
}
fn reader(self: *@This()) void {
const path = self.store.symbolPath("SYM", DataType.candles_daily.fileName()) catch return;
const path = self.store.entryPath("SYM", DataType.candles_daily.fileName()) catch return;
defer self.store.allocator.free(path);
while (!self.stop.load(.acquire)) {

View file

@ -401,7 +401,7 @@ pub const DataService = struct {
const retried = self.fetchFromProvider(T, symbol) catch {
return DataError.FetchFailed;
};
s.writeWithSource(T, symbol, retried, data_type.ttl(), sourceHintFor(T));
s.writeWithSource(T, symbol, retried, .{ .seconds = data_type.ttl() }, sourceHintFor(T));
return .{ .data = retried, .source = .fetched, .timestamp = std.Io.Timestamp.now(self.io, .real).toSeconds(), .allocator = self.allocator };
}
// Only NotFound (provider says "this symbol genuinely has
@ -415,7 +415,7 @@ pub const DataService = struct {
return DataError.FetchFailed;
};
s.writeWithSource(T, symbol, fetched, data_type.ttl(), sourceHintFor(T));
s.writeWithSource(T, symbol, fetched, .{ .seconds = data_type.ttl() }, sourceHintFor(T));
return .{ .data = fetched, .source = .fetched, .timestamp = std.Io.Timestamp.now(self.io, .real).toSeconds(), .allocator = self.allocator };
}
@ -509,8 +509,8 @@ pub const DataService = struct {
// view supplements rather than replaces existing (typically
// Polygon-sourced) records. New entries are logged with
// "tiingo" attribution.
s.writeWithSource(Dividend, symbol, triple.dividends, cache.DataType.dividends.ttl(), "tiingo");
s.writeWithSource(Split, symbol, triple.splits, cache.DataType.splits.ttl(), "tiingo");
s.writeWithSource(Dividend, symbol, triple.dividends, .{ .seconds = cache.DataType.dividends.ttl() }, "tiingo");
s.writeWithSource(Split, symbol, triple.splits, .{ .seconds = cache.DataType.splits.ttl() }, "tiingo");
return triple;
}
@ -884,7 +884,7 @@ pub const DataService = struct {
return DataError.FetchFailed;
};
s.write(EarningsEvent, symbol, fetched, cache.Ttl.earnings);
s.write(EarningsEvent, symbol, fetched, .{ .seconds = cache.Ttl.earnings });
return .{ .data = fetched, .source = .fetched, .timestamp = std.Io.Timestamp.now(self.io, .real).toSeconds(), .allocator = self.allocator };
}
@ -926,7 +926,7 @@ pub const DataService = struct {
return DataError.FetchFailed;
};
s.write(EtfProfile, symbol, fetched, cache.Ttl.etf_profile);
s.write(EtfProfile, symbol, fetched, .{ .seconds = cache.Ttl.etf_profile });
return .{ .data = fetched, .source = .fetched, .timestamp = std.Io.Timestamp.now(self.io, .real).toSeconds(), .allocator = self.allocator };
}
@ -1798,6 +1798,12 @@ pub const DataService = struct {
.splits => "/splits",
.etf_profile => return false, // not served
.meta => return false,
// New variants wired into the endpoint mapping by
// Milestone 1 chunk 3 (DataService methods). For now
// they're not yet served; clients fall through to live
// provider fetch via getClassification / getEntityFacts /
// getEtfMetrics, which don't exist yet.
.classification, .etf_metrics, .entity_facts => return false,
};
const full_url = std.fmt.allocPrint(self.allocator, "{s}/{s}{s}", .{ server_url, symbol, endpoint }) catch return false;
@ -2278,7 +2284,7 @@ test "fetchCached offline mode returns stale-cached data" {
};
// Manually set TTL to 1 second (long since expired) by writing
// through writeWithSource with a tiny TTL.
store.writeWithSource(Dividend, "TEST", divs[0..], -1_000_000, "test");
store.writeWithSource(Dividend, "TEST", divs[0..], .{ .seconds = -1_000_000 }, "test");
svc.panic_on_network_attempt = true;