remove frequency and add ability to "upgrade" fields
All checks were successful
Generic zig build / build (push) Successful in 2m10s
Generic zig build / deploy (push) Successful in 19s

This commit is contained in:
Emil Lerch 2026-05-20 16:00:13 -07:00
parent d9f2e8404b
commit fe28949757
Signed by: lobo
GPG key ID: A7B62D657EF764F8
6 changed files with 336 additions and 60 deletions

345
src/cache/store.zig vendored
View file

@ -265,23 +265,39 @@ pub const Store = struct {
}
/// Sorted-union write for `Dividend` and `Split`. Reads the
/// existing cache file, adds any items from `incoming` whose
/// date key isn't already present, sorts the union descending
/// by date, and writes the result. If nothing new came in, the
/// existing file is left untouched (no mtime bump, no I/O).
/// existing cache file, merges `incoming` into it, sorts the
/// union descending by date, and writes the result. If nothing
/// changed, the existing file is left untouched (no mtime bump,
/// no I/O).
///
/// Existing entries always win on key collision: Polygon's
/// dividend records carry richer metadata (`pay_date`,
/// `record_date`, `type`, `currency`) than Tiingo's, so once
/// Polygon has supplied an ex_date we don't want a later Tiingo
/// write to overwrite it with a sparser record. The merge
/// preserves whichever entry landed first for any given date.
/// Two kinds of merge happen on each incoming entry:
///
/// Each newly-added entry triggers an `info(cache)` log line so
/// the user is alerted when a supplementary source (Tiingo,
/// usually) discovers a corporate action the primary source
/// (Polygon) missed. The `source_hint` argument, when present,
/// names the source in that log line.
/// **New key** incoming entry's key (ex_date / split.date) is
/// not in the existing data. Append it; emit an `info(cache)
/// supplied` log line so the user is alerted when a source
/// surfaces a corporate action that wasn't already known. The
/// canonical case is Tiingo discovering SPYM's 2017-10-16 4:1
/// split that Polygon's reference endpoint doesn't return.
///
/// **Existing key, field-level upgrade** incoming entry's key
/// matches an existing entry. For `Dividend`, walk the optional
/// fields (`pay_date`, `record_date`, `type`, `currency`) and
/// fill in any nulls on the existing record from the incoming
/// record's non-null values. Don't overwrite non-null fields.
/// `type = .unknown` counts as null-equivalent. This means
/// Polygon's richer Dividend records can fill in metadata
/// regardless of whether Tiingo wrote first or Polygon did
/// the on-disk record is the union of all sources' knowledge,
/// with conflicts resolved by "first non-null wins." Each field
/// upgrade emits its own `info(cache) upgraded` log line.
///
/// `Split` has no optional fields, so the field-level path is a
/// no-op for splits and the merge collapses to "skip if key
/// already exists."
///
/// `source_hint`, when present, names the source in the log
/// lines (e.g. `"polygon"` / `"tiingo"`). Defaults to
/// `"fetch"` when null.
fn writeMerged(
self: *Store,
comptime T: type,
@ -294,7 +310,16 @@ pub const Store = struct {
// Read existing entries (any freshness; we want the union of
// what's on disk, not just fresh data).
const existing_result = self.read(T, symbol, null, .any);
//
// The post-process callback dupes any heap-allocated string
// fields (Dividend.currency) into stable memory because the
// SRF iterator's backing buffer is freed inside `read()` and
// un-duped strings would become dangling pointers as soon as
// we return. The matching `deinit` in the cleanup `defer`
// below frees these duped strings after we're done with the
// merged list. Keep the post-process logic in lockstep with
// the deinit handling they're a pair.
const existing_result = self.read(T, symbol, mergePostProcess(T), .any);
const existing: []const T = if (existing_result) |r| r.data else &.{};
defer if (existing_result != null) {
if (comptime @hasDecl(T, "deinit")) {
@ -303,23 +328,32 @@ pub const Store = struct {
self.allocator.free(existing);
};
// Build the union. Start with existing entries, then append
// any incoming entry whose key isn't already present.
// Build the union. Start with a mutable copy of existing
// entries, then either upgrade in place or append new
// entries from incoming.
var merged: std.ArrayList(T) = .empty;
defer merged.deinit(self.allocator);
merged.appendSlice(self.allocator, existing) catch return;
var added: usize = 0;
var upgraded: usize = 0;
for (incoming) |item| {
if (containsKey(T, merged.items, mergeKey(T, item))) continue;
merged.append(self.allocator, item) catch return;
added += 1;
logSupplied(T, symbol, item, source_hint);
const key = mergeKey(T, item);
if (findKeyIndex(T, merged.items, key)) |idx| {
// Existing entry try to upgrade its optional fields
// from the incoming entry's non-null values.
upgraded += upgradeRecord(T, &merged.items[idx], item, symbol, source_hint);
} else {
// New entry append.
merged.append(self.allocator, item) catch return;
added += 1;
logSupplied(T, symbol, item, source_hint);
}
}
if (added == 0) {
// Nothing new leave the file untouched. This is the
// common case for repeated Polygon/Tiingo refreshes.
if (added == 0 and upgraded == 0) {
// Nothing changed leave the file untouched. This is
// the common case for repeated Polygon/Tiingo refreshes.
return;
}
@ -347,11 +381,31 @@ pub const Store = struct {
@compileError("mergeKey only defined for Dividend and Split");
}
fn containsKey(comptime T: type, items: []const T, key: i32) bool {
for (items) |it| {
if (mergeKey(T, it) == key) return true;
/// Post-process callback for the merge primitive's `read` call.
/// Dupes any heap-allocated string fields into stable memory so
/// the `existing` slice's records survive past the SRF
/// iterator's backing buffer being freed. Paired with each
/// type's `deinit` to release the duped strings after merge.
/// Splits have no string fields so the callback is null.
fn mergePostProcess(comptime T: type) ?*const fn (*T, std.mem.Allocator) anyerror!void {
return switch (T) {
Dividend => &struct {
fn pp(div: *Dividend, allocator: std.mem.Allocator) anyerror!void {
if (div.currency) |c| {
div.currency = try allocator.dupe(u8, c);
}
}
}.pp,
Split => null,
else => @compileError("mergePostProcess only defined for Dividend and Split"),
};
}
fn findKeyIndex(comptime T: type, items: []const T, key: i32) ?usize {
for (items, 0..) |it, i| {
if (mergeKey(T, it) == key) return i;
}
return false;
return null;
}
fn lessByDateDesc(comptime T: type) fn (void, T, T) bool {
@ -362,6 +416,67 @@ pub const Store = struct {
}.lt;
}
/// Field-level upgrade for an existing record using values from
/// an incoming record with the same merge key. Returns the
/// number of fields actually upgraded so the caller can decide
/// whether to write the file. For `Split` this is always 0
/// (no optional fields). For `Dividend` it walks `pay_date`,
/// `record_date`, `type`, `currency` and fills nulls from the
/// incoming record's non-null values. `type = .unknown` is
/// treated as null-equivalent so a `.regular` from Polygon can
/// upgrade an `.unknown` from Tiingo.
fn upgradeRecord(
comptime T: type,
existing: *T,
incoming: T,
symbol: []const u8,
source_hint: ?[]const u8,
) usize {
if (T == Split) return 0;
if (T != Dividend) @compileError("upgradeRecord only defined for Dividend and Split");
const source = source_hint orelse "fetch";
var count: usize = 0;
const key = existing.ex_date;
if (existing.pay_date == null and incoming.pay_date != null) {
existing.pay_date = incoming.pay_date;
log.info("{s}: {s} upgraded ex_date {f}: pay_date null -> {f}", .{
symbol, source, key, incoming.pay_date.?,
});
count += 1;
}
if (existing.record_date == null and incoming.record_date != null) {
existing.record_date = incoming.record_date;
log.info("{s}: {s} upgraded ex_date {f}: record_date null -> {f}", .{
symbol, source, key, incoming.record_date.?,
});
count += 1;
}
if (existing.type == .unknown and incoming.type != .unknown) {
existing.type = incoming.type;
log.info("{s}: {s} upgraded ex_date {f}: type unknown -> {s}", .{
symbol, source, key, @tagName(incoming.type),
});
count += 1;
}
if (existing.currency == null and incoming.currency != null) {
// Borrow the incoming string. The merged list lives only
// until serialization completes inside writeMerged, and
// serialization makes its own copy via SRF format. The
// backing memory is owned by `incoming` (kept alive by
// the caller's slice), and `existing.currency`'s
// original null state means there's nothing to free.
existing.currency = incoming.currency;
log.info("{s}: {s} upgraded ex_date {f}: currency null -> {s}", .{
symbol, source, key, incoming.currency.?,
});
count += 1;
}
return count;
}
fn logSupplied(comptime T: type, symbol: []const u8, item: T, source_hint: ?[]const u8) void {
const source = source_hint orelse "fetch";
if (T == Dividend) {
@ -1304,7 +1419,7 @@ test "dividend serialize/deserialize round-trip" {
const io = std.testing.io;
const allocator = std.testing.allocator;
const divs = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 3, 15), .amount = 0.8325, .pay_date = Date.fromYmd(2024, 3, 28), .frequency = 4, .type = .regular },
.{ .ex_date = Date.fromYmd(2024, 3, 15), .amount = 0.8325, .pay_date = Date.fromYmd(2024, 3, 28), .type = .regular },
.{ .ex_date = Date.fromYmd(2024, 6, 14), .amount = 0.9148, .type = .special },
};
@ -1322,7 +1437,6 @@ test "dividend serialize/deserialize round-trip" {
try std.testing.expectApproxEqAbs(@as(f64, 0.8325), parsed[0].amount, 0.0001);
try std.testing.expect(parsed[0].pay_date != null);
try std.testing.expect(parsed[0].pay_date.?.eql(Date.fromYmd(2024, 3, 28)));
try std.testing.expectEqual(@as(?u8, 4), parsed[0].frequency);
try std.testing.expectEqual(DividendType.regular, parsed[0].type);
try std.testing.expect(parsed[1].ex_date.eql(Date.fromYmd(2024, 6, 14)));
@ -1484,6 +1598,175 @@ test "writeMerged Dividend: no-op when nothing new" {
try std.testing.expectEqual(stat_before.mtime, stat_after.mtime);
}
test "writeMerged Dividend: field-level upgrade fills nulls (Tiingo-then-Polygon)" {
// Simulates the order that happens during a server refresh:
// Tiingo writes its sparse view first (via populateAllFromTiingo
// during getCandles), then Polygon writes its rich view second
// (via fetchCached during getDividends). The on-disk record
// must end up with Polygon's metadata, not stuck with Tiingo's
// null fields.
const allocator = std.testing.allocator;
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
defer allocator.free(dir_path);
var s = Store.init(io, allocator, dir_path);
// Tiingo first: sparse only ex_date and amount.
var tiingo_view = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50 },
};
s.writeWithSource(Dividend, "TEST", tiingo_view[0..], Ttl.dividends, "tiingo");
// Polygon second: rich pay_date, record_date, type, currency.
var polygon_view = [_]Dividend{
.{
.ex_date = Date.fromYmd(2024, 5, 15),
.pay_date = Date.fromYmd(2024, 6, 1),
.record_date = Date.fromYmd(2024, 5, 17),
.amount = 0.50,
.type = .regular,
// currency stays null in this fixture so we don't have
// to deal with allocation; the upgrade-currency case is
// covered by a separate test.
},
};
s.writeWithSource(Dividend, "TEST", polygon_view[0..], Ttl.dividends, "polygon");
const result = s.read(Dividend, "TEST", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
defer for (result.data) |d| d.deinit(allocator);
try std.testing.expectEqual(@as(usize, 1), result.data.len);
try std.testing.expect(result.data[0].pay_date != null);
try std.testing.expect(result.data[0].pay_date.?.eql(Date.fromYmd(2024, 6, 1)));
try std.testing.expect(result.data[0].record_date != null);
try std.testing.expect(result.data[0].record_date.?.eql(Date.fromYmd(2024, 5, 17)));
try std.testing.expectEqual(DividendType.regular, result.data[0].type);
}
test "writeMerged Dividend: type unknown counts as null and gets upgraded" {
// Tiingo's dividend records always carry type = .unknown. A
// later Polygon write with type = .regular must upgrade the
// existing record. This is the "type-unknown is null-equivalent"
// rule.
const allocator = std.testing.allocator;
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
defer allocator.free(dir_path);
var s = Store.init(io, allocator, dir_path);
var tiingo_view = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50, .type = .unknown },
};
s.writeWithSource(Dividend, "TEST", tiingo_view[0..], Ttl.dividends, "tiingo");
var polygon_view = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50, .type = .special },
};
s.writeWithSource(Dividend, "TEST", polygon_view[0..], Ttl.dividends, "polygon");
const result = s.read(Dividend, "TEST", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
defer for (result.data) |d| d.deinit(allocator);
try std.testing.expectEqual(@as(usize, 1), result.data.len);
try std.testing.expectEqual(DividendType.special, result.data[0].type);
}
test "writeMerged Dividend: non-null fields are not overwritten" {
// If the existing record already has a non-null field, an
// incoming record's different value must NOT overwrite it. The
// rule is "first non-null wins." Catches a regression where the
// upgrade logic is too eager.
const allocator = std.testing.allocator;
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
defer allocator.free(dir_path);
var s = Store.init(io, allocator, dir_path);
var first = [_]Dividend{
.{
.ex_date = Date.fromYmd(2024, 5, 15),
.pay_date = Date.fromYmd(2024, 6, 1),
.amount = 0.50,
.type = .regular,
},
};
s.writeWithSource(Dividend, "TEST", first[0..], Ttl.dividends, "polygon");
// Second write with a different (and wrong) pay_date and type.
var second = [_]Dividend{
.{
.ex_date = Date.fromYmd(2024, 5, 15),
.pay_date = Date.fromYmd(2099, 1, 1),
.amount = 0.50,
.type = .special,
},
};
s.writeWithSource(Dividend, "TEST", second[0..], Ttl.dividends, "polygon");
const result = s.read(Dividend, "TEST", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
defer for (result.data) |d| d.deinit(allocator);
try std.testing.expectEqual(@as(usize, 1), result.data.len);
// First record's fields stick.
try std.testing.expect(result.data[0].pay_date.?.eql(Date.fromYmd(2024, 6, 1)));
try std.testing.expectEqual(DividendType.regular, result.data[0].type);
}
test "writeMerged Dividend: upgrade is no-op when both have same fields" {
// Both writes have the same ex_date, amount, pay_date, and
// type. There's nothing to upgrade and nothing new the file
// should not be touched on the second write.
const allocator = std.testing.allocator;
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
defer allocator.free(dir_path);
var s = Store.init(io, allocator, dir_path);
var initial = [_]Dividend{
.{
.ex_date = Date.fromYmd(2024, 5, 15),
.pay_date = Date.fromYmd(2024, 6, 1),
.amount = 0.50,
.type = .regular,
},
};
s.writeWithSource(Dividend, "TEST", initial[0..], Ttl.dividends, "polygon");
const path = try std.fs.path.join(allocator, &.{ dir_path, "TEST", "dividends.srf" });
defer allocator.free(path);
const stat_before = try std.Io.Dir.cwd().statFile(io, path, .{});
std.Io.sleep(io, std.Io.Duration.fromMilliseconds(20), .awake) catch {};
var repeat = [_]Dividend{
.{
.ex_date = Date.fromYmd(2024, 5, 15),
.pay_date = Date.fromYmd(2024, 6, 1),
.amount = 0.50,
.type = .regular,
},
};
s.writeWithSource(Dividend, "TEST", repeat[0..], Ttl.dividends, "polygon");
const stat_after = try std.Io.Dir.cwd().statFile(io, path, .{});
try std.testing.expectEqual(stat_before.mtime, stat_after.mtime);
}
test "writeMerged Split: SPYM-style supplementary entry added" {
const allocator = std.testing.allocator;
const io = std.testing.io;

View file

@ -75,11 +75,11 @@ pub fn display(dividends: []const zfin.Dividend, symbol: []const u8, current_pri
}
try cli.setFg(out, color, cli.CLR_MUTED);
try out.print("{s:>12} {s:>10} {s:>12} {s:>6} {s:>10}\n", .{
"Ex-Date", "Amount", "Pay Date", "Freq", "Type",
try out.print("{s:>12} {s:>10} {s:>12} {s:>10}\n", .{
"Ex-Date", "Amount", "Pay Date", "Type",
});
try out.print("{s:->12} {s:->10} {s:->12} {s:->6} {s:->10}\n", .{
"", "", "", "", "",
try out.print("{s:->12} {s:->10} {s:->12} {s:->10}\n", .{
"", "", "", "",
});
try cli.reset(out, color);
@ -94,11 +94,6 @@ pub fn display(dividends: []const zfin.Dividend, symbol: []const u8, current_pri
} else {
try out.print(" {s:>12}", .{"--"});
}
if (div.frequency) |f| {
try out.print(" {d:>6}", .{f});
} else {
try out.print(" {s:>6}", .{"--"});
}
try out.print(" {s:>10}\n", .{@tagName(div.type)});
total += div.amount;
if (!div.ex_date.lessThan(one_year_ago)) ttm += div.amount;

View file

@ -19,8 +19,6 @@ pub const Dividend = struct {
record_date: ?Date = null,
/// Cash amount per share
amount: f64,
/// How many times per year this dividend is expected
frequency: ?u8 = null,
/// Classification of the dividend
type: DividendType = .unknown,
/// Currency code (e.g., "USD"). Heap-allocated; freed by deinit().

View file

@ -183,7 +183,6 @@ fn parseDividendsPage(
.amount = amount,
.pay_date = parseDateField(obj, "pay_date"),
.record_date = parseDateField(obj, "record_date"),
.frequency = parseFrequency(obj),
.type = parseDividendType(obj),
.currency = if (jsonStr(obj.get("currency"))) |s|
(allocator.dupe(u8, s) catch null)
@ -271,15 +270,6 @@ fn parseDateField(obj: std.json.ObjectMap, key: []const u8) ?Date {
return Date.parse(s) catch null;
}
fn parseFrequency(obj: std.json.ObjectMap) ?u8 {
const v = obj.get("frequency") orelse return null;
return switch (v) {
.integer => |i| if (i > 0 and i <= 255) @intCast(i) else null,
.float => |f| if (f > 0 and f <= 255) @intFromFloat(f) else null,
else => null,
};
}
fn parseDividendType(obj: std.json.ObjectMap) DividendType {
const v = obj.get("dividend_type") orelse return .unknown;
const s = switch (v) {
@ -332,7 +322,6 @@ test "parseDividendsPage basic" {
try std.testing.expect(out.items[0].ex_date.eql(Date.fromYmd(2024, 8, 12)));
try std.testing.expectApproxEqAbs(@as(f64, 0.25), out.items[0].amount, 0.001);
try std.testing.expect(out.items[0].pay_date != null);
try std.testing.expectEqual(@as(?u8, 4), out.items[0].frequency);
try std.testing.expectEqual(DividendType.regular, out.items[0].type);
try std.testing.expectEqual(DividendType.special, out.items[1].type);

View file

@ -216,9 +216,8 @@ fn parseAll(allocator: std.mem.Allocator, body: []const u8) !CandleAndCorporateA
.ex_date = date,
.amount = div_cash,
// Tiingo doesn't carry pay_date / record_date /
// frequency / type. Display-only fields stay null /
// .unknown; total-return math only needs ex_date and
// amount.
// type. Display-only fields stay null / .unknown;
// total-return math only needs ex_date and amount.
});
}
@ -317,7 +316,6 @@ test "parseAll extracts a dividend from a divCash row" {
// Metadata fields are absent for Tiingo-sourced dividends
try std.testing.expect(div.pay_date == null);
try std.testing.expect(div.record_date == null);
try std.testing.expect(div.frequency == null);
try std.testing.expectEqual(@import("../models/dividend.zig").DividendType.unknown, div.type);
}

View file

@ -330,7 +330,7 @@ pub const DataService = struct {
const retried = self.fetchFromProvider(T, symbol) catch {
return DataError.FetchFailed;
};
s.write(T, symbol, retried, data_type.ttl());
s.writeWithSource(T, symbol, retried, data_type.ttl(), sourceHintFor(T));
return .{ .data = retried, .source = .fetched, .timestamp = std.Io.Timestamp.now(self.io, .real).toSeconds(), .allocator = self.allocator };
}
// Only NotFound (provider says "this symbol genuinely has
@ -344,10 +344,23 @@ pub const DataService = struct {
return DataError.FetchFailed;
};
s.write(T, symbol, fetched, data_type.ttl());
s.writeWithSource(T, symbol, fetched, data_type.ttl(), sourceHintFor(T));
return .{ .data = fetched, .source = .fetched, .timestamp = std.Io.Timestamp.now(self.io, .real).toSeconds(), .allocator = self.allocator };
}
/// Map the model type fetched via `fetchCached` back to the
/// provider it came from, so the merge primitive's `info(cache)`
/// log lines can attribute new entries / field upgrades to a
/// named source. Returns null for types where the source name
/// isn't useful (the merge primitive only consults this for
/// Dividend and Split).
fn sourceHintFor(comptime T: type) ?[]const u8 {
return switch (T) {
Dividend, Split => "polygon",
else => null,
};
}
/// Dispatch a fetch to the correct provider based on model type.
fn fetchFromProvider(self: *DataService, comptime T: type, symbol: []const u8) !cache.Store.DataFor(T) {
return switch (T) {
@ -1560,7 +1573,7 @@ pub const DataService = struct {
.dividends => "/dividends",
.earnings => "/earnings",
.options => "/options",
.splits => return false, // not served
.splits => "/splits",
.etf_profile => return false, // not served
.meta => return false,
};