add jitter to splits and dividends
This commit is contained in:
parent
641a88b0b7
commit
f54faf4732
2 changed files with 147 additions and 69 deletions
192
src/cache/store.zig
vendored
192
src/cache/store.zig
vendored
|
|
@ -214,19 +214,46 @@ pub const DataType = enum {
|
|||
};
|
||||
}
|
||||
|
||||
pub fn ttl(self: DataType) i64 {
|
||||
/// TTL specification for this data type, including any per-key
|
||||
/// jitter policy. The cache layer owns the policy because it
|
||||
/// owns both the constants (`Ttl.X`) and the mechanism that
|
||||
/// applies them (`computeExpires`); call sites just delegate.
|
||||
///
|
||||
/// Jitter assignments:
|
||||
///
|
||||
/// - 11% on dividends/splits (14d base, ~3d total spread).
|
||||
/// Tuned so a daily cron sees roughly 1/3 of a portfolio's
|
||||
/// symbols expire each day instead of all in lockstep.
|
||||
///
|
||||
/// - 8% on the longer-TTL types (classification 90d,
|
||||
/// etf_metrics 90d, entity_facts 30d, ticker maps 30d).
|
||||
/// Same thundering-herd defense; smaller percentage
|
||||
/// because the absolute spread on a 30d/90d base is
|
||||
/// already large in days.
|
||||
///
|
||||
/// - 0% on the rest. options/earnings/etf_profile either
|
||||
/// have natural cadence spread or are short-TTL enough
|
||||
/// that jitter would exceed meaningful drift.
|
||||
pub fn ttl(self: DataType) TtlSpec {
|
||||
return switch (self) {
|
||||
.dividends => Ttl.dividends,
|
||||
.splits => Ttl.splits,
|
||||
.options => Ttl.options,
|
||||
.earnings => Ttl.earnings,
|
||||
.etf_profile => Ttl.etf_profile,
|
||||
.classification => Ttl.classification,
|
||||
.etf_metrics => Ttl.etf_metrics,
|
||||
.entity_facts => Ttl.entity_facts,
|
||||
.tickers_funds => Ttl.tickers_funds,
|
||||
.tickers_companies => Ttl.tickers_companies,
|
||||
.candles_daily, .candles_meta, .meta => 0,
|
||||
.dividends => .{ .seconds = Ttl.dividends, .jitter_pct = 11 },
|
||||
.splits => .{ .seconds = Ttl.splits, .jitter_pct = 11 },
|
||||
.options => .{ .seconds = Ttl.options },
|
||||
.earnings => .{ .seconds = Ttl.earnings },
|
||||
.etf_profile => .{ .seconds = Ttl.etf_profile },
|
||||
.classification => .{ .seconds = Ttl.classification, .jitter_pct = 8 },
|
||||
.etf_metrics => .{ .seconds = Ttl.etf_metrics, .jitter_pct = 8 },
|
||||
.entity_facts => .{ .seconds = Ttl.entity_facts, .jitter_pct = 8 },
|
||||
.tickers_funds => .{ .seconds = Ttl.tickers_funds, .jitter_pct = 8 },
|
||||
.tickers_companies => .{ .seconds = Ttl.tickers_companies, .jitter_pct = 8 },
|
||||
// Sentinel: these types have their own writers
|
||||
// (`cacheCandles` for the candle pair, `writeNegative`
|
||||
// for `meta`) that don't go through the generic
|
||||
// `write()` / `writeWithSource()` path. Calling
|
||||
// `.ttl()` on one of them is a misuse — replace this
|
||||
// `unreachable` with `@compileError` once the call
|
||||
// graph is locked down enough to enforce at comptime.
|
||||
.candles_daily, .candles_meta, .meta => unreachable,
|
||||
};
|
||||
}
|
||||
};
|
||||
|
|
@ -521,11 +548,15 @@ pub const Store = struct {
|
|||
}
|
||||
}
|
||||
|
||||
if (added == 0 and upgraded == 0) {
|
||||
// Nothing changed — leave the file untouched. This is
|
||||
// the common case for repeated Polygon/Tiingo refreshes.
|
||||
return;
|
||||
}
|
||||
// Note: even when nothing was added or upgraded, fall through
|
||||
// and rewrite the file. The on-disk `#!expires=` directive
|
||||
// needs to be refreshed every time we successfully fetched
|
||||
// and merged, otherwise an aged-out file stays aged-out:
|
||||
// every subsequent refresh pays the full provider rate-
|
||||
// limiter cost only to discover no changes and skip the
|
||||
// write, locking the cache into a permanent slow-path. The
|
||||
// write itself is a sub-millisecond atomic rename of a tiny
|
||||
// file, so saving it isn't worth the bookkeeping.
|
||||
|
||||
// Sort descending by date (newest first), matching the
|
||||
// existing on-disk convention.
|
||||
|
|
@ -1779,7 +1810,15 @@ test "writeMerged Dividend: union sorted desc, new entry added" {
|
|||
try std.testing.expect(result.data[2].ex_date.eql(Date.fromYmd(2024, 2, 15)));
|
||||
}
|
||||
|
||||
test "writeMerged Dividend: no-op when nothing new" {
|
||||
test "writeMerged Dividend: no-change merge still rewrites to refresh expires" {
|
||||
// The "nothing changed" path used to skip the rewrite as an
|
||||
// optimization. Problem: once the on-disk `#!expires=` aged
|
||||
// past TTL, every subsequent refresh paid the full provider
|
||||
// rate-limiter cost only to discover no changes and skip the
|
||||
// write again, locking the cache into a permanent slow-path.
|
||||
//
|
||||
// Now we always rewrite. The write is a sub-millisecond atomic
|
||||
// rename of a tiny file; saving it isn't worth the bookkeeping.
|
||||
const allocator = std.testing.allocator;
|
||||
const io = std.testing.io;
|
||||
var tmp = std.testing.tmpDir(.{});
|
||||
|
|
@ -1788,27 +1827,41 @@ test "writeMerged Dividend: no-op when nothing new" {
|
|||
defer allocator.free(dir_path);
|
||||
|
||||
var s = Store.init(io, allocator, dir_path);
|
||||
var initial = [_]Dividend{
|
||||
|
||||
// Seed a file with expires 30 days in the past — the aged-out
|
||||
// case that motivated this fix. (Pre-fix: the no-change merge
|
||||
// would skip the write and the file would stay aged-out
|
||||
// forever. Post-fix: the file gets rewritten with a fresh
|
||||
// expires.)
|
||||
const now_s = std.Io.Timestamp.now(io, .real).toSeconds();
|
||||
const seed_expires = now_s - 30 * std.time.s_per_day;
|
||||
const divs = [_]Dividend{
|
||||
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50, .type = .regular },
|
||||
};
|
||||
s.write(Dividend, "TEST", initial[0..], .{ .seconds = Ttl.dividends });
|
||||
const seed_bytes = try Store.serializeWithMeta(Dividend, io, allocator, &divs, .{ .expires = seed_expires });
|
||||
defer allocator.free(seed_bytes);
|
||||
try s.writeRaw("TEST", .dividends, seed_bytes);
|
||||
|
||||
// Capture file mtime before second (no-op) write.
|
||||
const path = try std.fs.path.join(allocator, &.{ dir_path, "TEST", "dividends.srf" });
|
||||
defer allocator.free(path);
|
||||
const stat_before = try std.Io.Dir.cwd().statFile(io, path, .{});
|
||||
|
||||
// Sleep briefly so mtime resolution can detect a write if one happens.
|
||||
std.Io.sleep(io, std.Io.Duration.fromMilliseconds(20), .awake) catch {};
|
||||
|
||||
// Same incoming entry — nothing new, should not rewrite.
|
||||
// Same incoming entry — nothing new, but we still expect a rewrite.
|
||||
var repeat = [_]Dividend{
|
||||
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50, .type = .regular },
|
||||
};
|
||||
s.write(Dividend, "TEST", repeat[0..], .{ .seconds = Ttl.dividends });
|
||||
s.writeWithSource(Dividend, "TEST", repeat[0..], .{ .seconds = Ttl.dividends }, "polygon");
|
||||
|
||||
const stat_after = try std.Io.Dir.cwd().statFile(io, path, .{});
|
||||
try std.testing.expectEqual(stat_before.mtime, stat_after.mtime);
|
||||
// Confirm fresh expires landed on disk: read the raw file and
|
||||
// parse out the directive, expecting it to be roughly now+14d.
|
||||
const path = try std.fs.path.join(allocator, &.{ dir_path, "TEST", "dividends.srf" });
|
||||
defer allocator.free(path);
|
||||
const data = try std.Io.Dir.cwd().readFileAlloc(io, path, allocator, .limited(1024 * 1024));
|
||||
defer allocator.free(data);
|
||||
|
||||
var reader = std.Io.Reader.fixed(data);
|
||||
const it = try srf.iterator(&reader, allocator, .{});
|
||||
defer it.deinit();
|
||||
|
||||
const new_expires = it.expires orelse return error.ExpiresMissing;
|
||||
try std.testing.expect(new_expires > now_s);
|
||||
try std.testing.expect(new_expires - now_s > 13 * std.time.s_per_day);
|
||||
}
|
||||
|
||||
test "writeMerged Dividend: field-level upgrade fills nulls (Tiingo-then-Polygon)" {
|
||||
|
|
@ -1939,8 +1992,10 @@ test "writeMerged Dividend: non-null fields are not overwritten" {
|
|||
|
||||
test "writeMerged Dividend: upgrade is no-op when both have same fields" {
|
||||
// Both writes have the same ex_date, amount, pay_date, and
|
||||
// type. There's nothing to upgrade and nothing new — the file
|
||||
// should not be touched on the second write.
|
||||
// type. There's nothing to upgrade and nothing new — but the
|
||||
// file is still rewritten so the on-disk `#!expires=` directive
|
||||
// gets refreshed. (Pre-rewrite-always behavior was to skip;
|
||||
// that locked aged-out files into a permanent slow-path.)
|
||||
const allocator = std.testing.allocator;
|
||||
const io = std.testing.io;
|
||||
var tmp = std.testing.tmpDir(.{});
|
||||
|
|
@ -1960,12 +2015,6 @@ test "writeMerged Dividend: upgrade is no-op when both have same fields" {
|
|||
};
|
||||
s.writeWithSource(Dividend, "TEST", initial[0..], .{ .seconds = Ttl.dividends }, "polygon");
|
||||
|
||||
const path = try std.fs.path.join(allocator, &.{ dir_path, "TEST", "dividends.srf" });
|
||||
defer allocator.free(path);
|
||||
const stat_before = try std.Io.Dir.cwd().statFile(io, path, .{});
|
||||
|
||||
std.Io.sleep(io, std.Io.Duration.fromMilliseconds(20), .awake) catch {};
|
||||
|
||||
var repeat = [_]Dividend{
|
||||
.{
|
||||
.ex_date = Date.fromYmd(2024, 5, 15),
|
||||
|
|
@ -1976,8 +2025,11 @@ test "writeMerged Dividend: upgrade is no-op when both have same fields" {
|
|||
};
|
||||
s.writeWithSource(Dividend, "TEST", repeat[0..], .{ .seconds = Ttl.dividends }, "polygon");
|
||||
|
||||
const stat_after = try std.Io.Dir.cwd().statFile(io, path, .{});
|
||||
try std.testing.expectEqual(stat_before.mtime, stat_after.mtime);
|
||||
// The merged result is still just one record (no duplication).
|
||||
const result = s.read(Dividend, "TEST", null, .any) orelse return error.NoCache;
|
||||
defer allocator.free(result.data);
|
||||
defer for (result.data) |d| d.deinit(allocator);
|
||||
try std.testing.expectEqual(@as(usize, 1), result.data.len);
|
||||
}
|
||||
|
||||
test "writeMerged Dividend: near-match dedup catches last-biz-day vs calendar-end" {
|
||||
|
|
@ -2354,22 +2406,48 @@ test "TTL constants are reasonable" {
|
|||
try std.testing.expectEqual(@as(i64, 30 * std.time.s_per_day), Ttl.tickers_companies);
|
||||
}
|
||||
|
||||
test "DataType.ttl returns correct values" {
|
||||
try std.testing.expectEqual(Ttl.dividends, DataType.dividends.ttl());
|
||||
try std.testing.expectEqual(Ttl.splits, DataType.splits.ttl());
|
||||
try std.testing.expectEqual(Ttl.options, DataType.options.ttl());
|
||||
try std.testing.expectEqual(Ttl.earnings, DataType.earnings.ttl());
|
||||
try std.testing.expectEqual(Ttl.etf_profile, DataType.etf_profile.ttl());
|
||||
try std.testing.expectEqual(Ttl.classification, DataType.classification.ttl());
|
||||
try std.testing.expectEqual(Ttl.etf_metrics, DataType.etf_metrics.ttl());
|
||||
try std.testing.expectEqual(Ttl.entity_facts, DataType.entity_facts.ttl());
|
||||
try std.testing.expectEqual(Ttl.tickers_funds, DataType.tickers_funds.ttl());
|
||||
try std.testing.expectEqual(Ttl.tickers_companies, DataType.tickers_companies.ttl());
|
||||
test "DataType.ttl returns correct seconds and jitter policy" {
|
||||
// 11% jitter: dividends and splits.
|
||||
const div = DataType.dividends.ttl();
|
||||
try std.testing.expectEqual(Ttl.dividends, div.seconds);
|
||||
try std.testing.expectEqual(@as(u8, 11), div.jitter_pct);
|
||||
|
||||
// These types have no TTL (0 = managed elsewhere)
|
||||
try std.testing.expectEqual(@as(i64, 0), DataType.candles_daily.ttl());
|
||||
try std.testing.expectEqual(@as(i64, 0), DataType.candles_meta.ttl());
|
||||
try std.testing.expectEqual(@as(i64, 0), DataType.meta.ttl());
|
||||
const spl = DataType.splits.ttl();
|
||||
try std.testing.expectEqual(Ttl.splits, spl.seconds);
|
||||
try std.testing.expectEqual(@as(u8, 11), spl.jitter_pct);
|
||||
|
||||
// 8% jitter: classification, etf_metrics, entity_facts, ticker maps.
|
||||
const cls = DataType.classification.ttl();
|
||||
try std.testing.expectEqual(Ttl.classification, cls.seconds);
|
||||
try std.testing.expectEqual(@as(u8, 8), cls.jitter_pct);
|
||||
|
||||
const em = DataType.etf_metrics.ttl();
|
||||
try std.testing.expectEqual(Ttl.etf_metrics, em.seconds);
|
||||
try std.testing.expectEqual(@as(u8, 8), em.jitter_pct);
|
||||
|
||||
const ef = DataType.entity_facts.ttl();
|
||||
try std.testing.expectEqual(Ttl.entity_facts, ef.seconds);
|
||||
try std.testing.expectEqual(@as(u8, 8), ef.jitter_pct);
|
||||
|
||||
const tf = DataType.tickers_funds.ttl();
|
||||
try std.testing.expectEqual(Ttl.tickers_funds, tf.seconds);
|
||||
try std.testing.expectEqual(@as(u8, 8), tf.jitter_pct);
|
||||
|
||||
const tc = DataType.tickers_companies.ttl();
|
||||
try std.testing.expectEqual(Ttl.tickers_companies, tc.seconds);
|
||||
try std.testing.expectEqual(@as(u8, 8), tc.jitter_pct);
|
||||
|
||||
// No jitter: short-TTL types and etf_profile.
|
||||
try std.testing.expectEqual(Ttl.options, DataType.options.ttl().seconds);
|
||||
try std.testing.expectEqual(@as(u8, 0), DataType.options.ttl().jitter_pct);
|
||||
try std.testing.expectEqual(Ttl.earnings, DataType.earnings.ttl().seconds);
|
||||
try std.testing.expectEqual(@as(u8, 0), DataType.earnings.ttl().jitter_pct);
|
||||
try std.testing.expectEqual(Ttl.etf_profile, DataType.etf_profile.ttl().seconds);
|
||||
try std.testing.expectEqual(@as(u8, 0), DataType.etf_profile.ttl().jitter_pct);
|
||||
|
||||
// candles_daily, candles_meta, and meta have their own writers
|
||||
// (`cacheCandles`, `writeNegative`); calling .ttl() on them is
|
||||
// unreachable and would panic. Not exercised here.
|
||||
}
|
||||
|
||||
test "DataType.fileName returns correct file names" {
|
||||
|
|
|
|||
|
|
@ -478,7 +478,7 @@ pub const DataService = struct {
|
|||
const retried = self.fetchFromProvider(T, symbol) catch {
|
||||
return DataError.FetchFailed;
|
||||
};
|
||||
s.writeWithSource(T, symbol, retried, .{ .seconds = data_type.ttl() }, sourceHintFor(T));
|
||||
s.writeWithSource(T, symbol, retried, data_type.ttl(), sourceHintFor(T));
|
||||
return .{ .data = retried, .source = .fetched, .timestamp = std.Io.Timestamp.now(self.io, .real).toSeconds(), .allocator = self.allocator };
|
||||
}
|
||||
// Only NotFound (provider says "this symbol genuinely has
|
||||
|
|
@ -492,7 +492,7 @@ pub const DataService = struct {
|
|||
return DataError.FetchFailed;
|
||||
};
|
||||
|
||||
s.writeWithSource(T, symbol, fetched, .{ .seconds = data_type.ttl() }, sourceHintFor(T));
|
||||
s.writeWithSource(T, symbol, fetched, data_type.ttl(), sourceHintFor(T));
|
||||
return .{ .data = fetched, .source = .fetched, .timestamp = std.Io.Timestamp.now(self.io, .real).toSeconds(), .allocator = self.allocator };
|
||||
}
|
||||
|
||||
|
|
@ -586,8 +586,8 @@ pub const DataService = struct {
|
|||
// view supplements rather than replaces existing (typically
|
||||
// Polygon-sourced) records. New entries are logged with
|
||||
// "tiingo" attribution.
|
||||
s.writeWithSource(Dividend, symbol, triple.dividends, .{ .seconds = cache.DataType.dividends.ttl() }, "tiingo");
|
||||
s.writeWithSource(Split, symbol, triple.splits, .{ .seconds = cache.DataType.splits.ttl() }, "tiingo");
|
||||
s.writeWithSource(Dividend, symbol, triple.dividends, cache.DataType.dividends.ttl(), "tiingo");
|
||||
s.writeWithSource(Split, symbol, triple.splits, cache.DataType.splits.ttl(), "tiingo");
|
||||
|
||||
return triple;
|
||||
}
|
||||
|
|
@ -1134,7 +1134,7 @@ pub const DataService = struct {
|
|||
break_blk: {
|
||||
const retried = wd.fetch(self.allocator, &symbols) catch break :break_blk;
|
||||
if (retried.len > 0) {
|
||||
s.write(Wikidata.ClassificationRecord, symbol, retried, .{ .seconds = cache.Ttl.classification, .jitter_pct = 8 });
|
||||
s.write(Wikidata.ClassificationRecord, symbol, retried, cache.DataType.classification.ttl());
|
||||
return .{ .data = retried, .source = .fetched, .timestamp = std.Io.Timestamp.now(self.io, .real).toSeconds(), .allocator = self.allocator };
|
||||
}
|
||||
self.allocator.free(retried);
|
||||
|
|
@ -1152,7 +1152,7 @@ pub const DataService = struct {
|
|||
return DataError.NotFound;
|
||||
}
|
||||
|
||||
s.write(Wikidata.ClassificationRecord, symbol, fetched, .{ .seconds = cache.Ttl.classification, .jitter_pct = 8 });
|
||||
s.write(Wikidata.ClassificationRecord, symbol, fetched, cache.DataType.classification.ttl());
|
||||
|
||||
return .{ .data = fetched, .source = .fetched, .timestamp = std.Io.Timestamp.now(self.io, .real).toSeconds(), .allocator = self.allocator };
|
||||
}
|
||||
|
|
@ -1239,7 +1239,7 @@ pub const DataService = struct {
|
|||
// Write each fetched record to its per-symbol cache file.
|
||||
for (fetched) |rec| {
|
||||
const single = [_]Wikidata.ClassificationRecord{rec};
|
||||
s.write(Wikidata.ClassificationRecord, rec.symbol, &single, .{ .seconds = cache.Ttl.classification, .jitter_pct = 8 });
|
||||
s.write(Wikidata.ClassificationRecord, rec.symbol, &single, cache.DataType.classification.ttl());
|
||||
}
|
||||
|
||||
// Combine cached + fetched into the result.
|
||||
|
|
@ -1315,7 +1315,7 @@ pub const DataService = struct {
|
|||
|
||||
const records = try self.allocator.alloc(Edgar.EntityFactRecord, 1);
|
||||
records[0] = .{ .shares_outstanding = shares_record };
|
||||
s.write(Edgar.EntityFactRecord, cik, records, .{ .seconds = cache.Ttl.entity_facts, .jitter_pct = 8 });
|
||||
s.write(Edgar.EntityFactRecord, cik, records, cache.DataType.entity_facts.ttl());
|
||||
|
||||
return .{ .data = records, .source = .fetched, .timestamp = std.Io.Timestamp.now(self.io, .real).toSeconds(), .allocator = self.allocator };
|
||||
}
|
||||
|
|
@ -1416,7 +1416,7 @@ pub const DataService = struct {
|
|||
}
|
||||
try Edgar.appendEtfMetricRecords(self.allocator, &records, m);
|
||||
const owned = try records.toOwnedSlice(self.allocator);
|
||||
s.write(Edgar.EtfMetricRecord, symbol, owned, .{ .seconds = cache.Ttl.etf_metrics, .jitter_pct = 8 });
|
||||
s.write(Edgar.EtfMetricRecord, symbol, owned, cache.DataType.etf_metrics.ttl());
|
||||
return .{ .data = owned, .source = .fetched, .timestamp = std.Io.Timestamp.now(self.io, .real).toSeconds(), .allocator = self.allocator };
|
||||
},
|
||||
.profile_only => |m_in| {
|
||||
|
|
@ -1430,7 +1430,7 @@ pub const DataService = struct {
|
|||
}
|
||||
try Edgar.appendEtfMetricRecords(self.allocator, &records, m);
|
||||
const owned = try records.toOwnedSlice(self.allocator);
|
||||
s.write(Edgar.EtfMetricRecord, symbol, owned, .{ .seconds = cache.Ttl.etf_metrics, .jitter_pct = 8 });
|
||||
s.write(Edgar.EtfMetricRecord, symbol, owned, cache.DataType.etf_metrics.ttl());
|
||||
return .{ .data = owned, .source = .fetched, .timestamp = std.Io.Timestamp.now(self.io, .real).toSeconds(), .allocator = self.allocator };
|
||||
},
|
||||
.not_a_fund => {
|
||||
|
|
@ -1480,7 +1480,7 @@ pub const DataService = struct {
|
|||
// + rate-limit token), cache the parsed slice, then build
|
||||
// the lookup map (which takes ownership of the slice).
|
||||
const entries = try edgar.fetchMutualFundTickerMap(self.allocator);
|
||||
s.write(Edgar.MutualFundTickerEntry, "_edgar", entries, .{ .seconds = cache.Ttl.tickers_funds, .jitter_pct = 8 });
|
||||
s.write(Edgar.MutualFundTickerEntry, "_edgar", entries, cache.DataType.tickers_funds.ttl());
|
||||
return Edgar.TickerMap(Edgar.MutualFundTickerEntry).fromEntries(self.allocator, entries);
|
||||
}
|
||||
|
||||
|
|
@ -1505,7 +1505,7 @@ pub const DataService = struct {
|
|||
var edgar = try self.getProvider(Edgar);
|
||||
|
||||
const entries = try edgar.fetchCompanyTickerMap(self.allocator);
|
||||
s.write(Edgar.CompanyTickerEntry, "_edgar", entries, .{ .seconds = cache.Ttl.tickers_companies, .jitter_pct = 8 });
|
||||
s.write(Edgar.CompanyTickerEntry, "_edgar", entries, cache.DataType.tickers_companies.ttl());
|
||||
return Edgar.TickerMap(Edgar.CompanyTickerEntry).fromEntries(self.allocator, entries);
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue