diff --git a/src/cache/store.zig b/src/cache/store.zig index d484922..e7764f0 100644 --- a/src/cache/store.zig +++ b/src/cache/store.zig @@ -265,23 +265,39 @@ pub const Store = struct { } /// Sorted-union write for `Dividend` and `Split`. Reads the - /// existing cache file, adds any items from `incoming` whose - /// date key isn't already present, sorts the union descending - /// by date, and writes the result. If nothing new came in, the - /// existing file is left untouched (no mtime bump, no I/O). + /// existing cache file, merges `incoming` into it, sorts the + /// union descending by date, and writes the result. If nothing + /// changed, the existing file is left untouched (no mtime bump, + /// no I/O). /// - /// Existing entries always win on key collision: Polygon's - /// dividend records carry richer metadata (`pay_date`, - /// `record_date`, `type`, `currency`) than Tiingo's, so once - /// Polygon has supplied an ex_date we don't want a later Tiingo - /// write to overwrite it with a sparser record. The merge - /// preserves whichever entry landed first for any given date. + /// Two kinds of merge happen on each incoming entry: /// - /// Each newly-added entry triggers an `info(cache)` log line so - /// the user is alerted when a supplementary source (Tiingo, - /// usually) discovers a corporate action the primary source - /// (Polygon) missed. The `source_hint` argument, when present, - /// names the source in that log line. + /// **New key** — incoming entry's key (ex_date / split.date) is + /// not in the existing data. Append it; emit an `info(cache) + /// supplied` log line so the user is alerted when a source + /// surfaces a corporate action that wasn't already known. The + /// canonical case is Tiingo discovering SPYM's 2017-10-16 4:1 + /// split that Polygon's reference endpoint doesn't return. + /// + /// **Existing key, field-level upgrade** — incoming entry's key + /// matches an existing entry. For `Dividend`, walk the optional + /// fields (`pay_date`, `record_date`, `type`, `currency`) and + /// fill in any nulls on the existing record from the incoming + /// record's non-null values. Don't overwrite non-null fields. + /// `type = .unknown` counts as null-equivalent. This means + /// Polygon's richer Dividend records can fill in metadata + /// regardless of whether Tiingo wrote first or Polygon did — + /// the on-disk record is the union of all sources' knowledge, + /// with conflicts resolved by "first non-null wins." Each field + /// upgrade emits its own `info(cache) upgraded` log line. + /// + /// `Split` has no optional fields, so the field-level path is a + /// no-op for splits and the merge collapses to "skip if key + /// already exists." + /// + /// `source_hint`, when present, names the source in the log + /// lines (e.g. `"polygon"` / `"tiingo"`). Defaults to + /// `"fetch"` when null. fn writeMerged( self: *Store, comptime T: type, @@ -294,7 +310,16 @@ pub const Store = struct { // Read existing entries (any freshness; we want the union of // what's on disk, not just fresh data). - const existing_result = self.read(T, symbol, null, .any); + // + // The post-process callback dupes any heap-allocated string + // fields (Dividend.currency) into stable memory because the + // SRF iterator's backing buffer is freed inside `read()` and + // un-duped strings would become dangling pointers as soon as + // we return. The matching `deinit` in the cleanup `defer` + // below frees these duped strings after we're done with the + // merged list. Keep the post-process logic in lockstep with + // the deinit handling — they're a pair. + const existing_result = self.read(T, symbol, mergePostProcess(T), .any); const existing: []const T = if (existing_result) |r| r.data else &.{}; defer if (existing_result != null) { if (comptime @hasDecl(T, "deinit")) { @@ -303,23 +328,32 @@ pub const Store = struct { self.allocator.free(existing); }; - // Build the union. Start with existing entries, then append - // any incoming entry whose key isn't already present. + // Build the union. Start with a mutable copy of existing + // entries, then either upgrade in place or append new + // entries from incoming. var merged: std.ArrayList(T) = .empty; defer merged.deinit(self.allocator); merged.appendSlice(self.allocator, existing) catch return; var added: usize = 0; + var upgraded: usize = 0; for (incoming) |item| { - if (containsKey(T, merged.items, mergeKey(T, item))) continue; - merged.append(self.allocator, item) catch return; - added += 1; - logSupplied(T, symbol, item, source_hint); + const key = mergeKey(T, item); + if (findKeyIndex(T, merged.items, key)) |idx| { + // Existing entry — try to upgrade its optional fields + // from the incoming entry's non-null values. + upgraded += upgradeRecord(T, &merged.items[idx], item, symbol, source_hint); + } else { + // New entry — append. + merged.append(self.allocator, item) catch return; + added += 1; + logSupplied(T, symbol, item, source_hint); + } } - if (added == 0) { - // Nothing new — leave the file untouched. This is the - // common case for repeated Polygon/Tiingo refreshes. + if (added == 0 and upgraded == 0) { + // Nothing changed — leave the file untouched. This is + // the common case for repeated Polygon/Tiingo refreshes. return; } @@ -347,11 +381,31 @@ pub const Store = struct { @compileError("mergeKey only defined for Dividend and Split"); } - fn containsKey(comptime T: type, items: []const T, key: i32) bool { - for (items) |it| { - if (mergeKey(T, it) == key) return true; + /// Post-process callback for the merge primitive's `read` call. + /// Dupes any heap-allocated string fields into stable memory so + /// the `existing` slice's records survive past the SRF + /// iterator's backing buffer being freed. Paired with each + /// type's `deinit` to release the duped strings after merge. + /// Splits have no string fields so the callback is null. + fn mergePostProcess(comptime T: type) ?*const fn (*T, std.mem.Allocator) anyerror!void { + return switch (T) { + Dividend => &struct { + fn pp(div: *Dividend, allocator: std.mem.Allocator) anyerror!void { + if (div.currency) |c| { + div.currency = try allocator.dupe(u8, c); + } + } + }.pp, + Split => null, + else => @compileError("mergePostProcess only defined for Dividend and Split"), + }; + } + + fn findKeyIndex(comptime T: type, items: []const T, key: i32) ?usize { + for (items, 0..) |it, i| { + if (mergeKey(T, it) == key) return i; } - return false; + return null; } fn lessByDateDesc(comptime T: type) fn (void, T, T) bool { @@ -362,6 +416,67 @@ pub const Store = struct { }.lt; } + /// Field-level upgrade for an existing record using values from + /// an incoming record with the same merge key. Returns the + /// number of fields actually upgraded so the caller can decide + /// whether to write the file. For `Split` this is always 0 + /// (no optional fields). For `Dividend` it walks `pay_date`, + /// `record_date`, `type`, `currency` and fills nulls from the + /// incoming record's non-null values. `type = .unknown` is + /// treated as null-equivalent so a `.regular` from Polygon can + /// upgrade an `.unknown` from Tiingo. + fn upgradeRecord( + comptime T: type, + existing: *T, + incoming: T, + symbol: []const u8, + source_hint: ?[]const u8, + ) usize { + if (T == Split) return 0; + if (T != Dividend) @compileError("upgradeRecord only defined for Dividend and Split"); + + const source = source_hint orelse "fetch"; + var count: usize = 0; + const key = existing.ex_date; + + if (existing.pay_date == null and incoming.pay_date != null) { + existing.pay_date = incoming.pay_date; + log.info("{s}: {s} upgraded ex_date {f}: pay_date null -> {f}", .{ + symbol, source, key, incoming.pay_date.?, + }); + count += 1; + } + if (existing.record_date == null and incoming.record_date != null) { + existing.record_date = incoming.record_date; + log.info("{s}: {s} upgraded ex_date {f}: record_date null -> {f}", .{ + symbol, source, key, incoming.record_date.?, + }); + count += 1; + } + if (existing.type == .unknown and incoming.type != .unknown) { + existing.type = incoming.type; + log.info("{s}: {s} upgraded ex_date {f}: type unknown -> {s}", .{ + symbol, source, key, @tagName(incoming.type), + }); + count += 1; + } + if (existing.currency == null and incoming.currency != null) { + // Borrow the incoming string. The merged list lives only + // until serialization completes inside writeMerged, and + // serialization makes its own copy via SRF format. The + // backing memory is owned by `incoming` (kept alive by + // the caller's slice), and `existing.currency`'s + // original null state means there's nothing to free. + existing.currency = incoming.currency; + log.info("{s}: {s} upgraded ex_date {f}: currency null -> {s}", .{ + symbol, source, key, incoming.currency.?, + }); + count += 1; + } + + return count; + } + fn logSupplied(comptime T: type, symbol: []const u8, item: T, source_hint: ?[]const u8) void { const source = source_hint orelse "fetch"; if (T == Dividend) { @@ -1304,7 +1419,7 @@ test "dividend serialize/deserialize round-trip" { const io = std.testing.io; const allocator = std.testing.allocator; const divs = [_]Dividend{ - .{ .ex_date = Date.fromYmd(2024, 3, 15), .amount = 0.8325, .pay_date = Date.fromYmd(2024, 3, 28), .frequency = 4, .type = .regular }, + .{ .ex_date = Date.fromYmd(2024, 3, 15), .amount = 0.8325, .pay_date = Date.fromYmd(2024, 3, 28), .type = .regular }, .{ .ex_date = Date.fromYmd(2024, 6, 14), .amount = 0.9148, .type = .special }, }; @@ -1322,7 +1437,6 @@ test "dividend serialize/deserialize round-trip" { try std.testing.expectApproxEqAbs(@as(f64, 0.8325), parsed[0].amount, 0.0001); try std.testing.expect(parsed[0].pay_date != null); try std.testing.expect(parsed[0].pay_date.?.eql(Date.fromYmd(2024, 3, 28))); - try std.testing.expectEqual(@as(?u8, 4), parsed[0].frequency); try std.testing.expectEqual(DividendType.regular, parsed[0].type); try std.testing.expect(parsed[1].ex_date.eql(Date.fromYmd(2024, 6, 14))); @@ -1484,6 +1598,175 @@ test "writeMerged Dividend: no-op when nothing new" { try std.testing.expectEqual(stat_before.mtime, stat_after.mtime); } +test "writeMerged Dividend: field-level upgrade fills nulls (Tiingo-then-Polygon)" { + // Simulates the order that happens during a server refresh: + // Tiingo writes its sparse view first (via populateAllFromTiingo + // during getCandles), then Polygon writes its rich view second + // (via fetchCached during getDividends). The on-disk record + // must end up with Polygon's metadata, not stuck with Tiingo's + // null fields. + const allocator = std.testing.allocator; + const io = std.testing.io; + var tmp = std.testing.tmpDir(.{}); + defer tmp.cleanup(); + const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator); + defer allocator.free(dir_path); + + var s = Store.init(io, allocator, dir_path); + + // Tiingo first: sparse — only ex_date and amount. + var tiingo_view = [_]Dividend{ + .{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50 }, + }; + s.writeWithSource(Dividend, "TEST", tiingo_view[0..], Ttl.dividends, "tiingo"); + + // Polygon second: rich — pay_date, record_date, type, currency. + var polygon_view = [_]Dividend{ + .{ + .ex_date = Date.fromYmd(2024, 5, 15), + .pay_date = Date.fromYmd(2024, 6, 1), + .record_date = Date.fromYmd(2024, 5, 17), + .amount = 0.50, + .type = .regular, + // currency stays null in this fixture so we don't have + // to deal with allocation; the upgrade-currency case is + // covered by a separate test. + }, + }; + s.writeWithSource(Dividend, "TEST", polygon_view[0..], Ttl.dividends, "polygon"); + + const result = s.read(Dividend, "TEST", null, .any) orelse return error.NoCache; + defer allocator.free(result.data); + defer for (result.data) |d| d.deinit(allocator); + + try std.testing.expectEqual(@as(usize, 1), result.data.len); + try std.testing.expect(result.data[0].pay_date != null); + try std.testing.expect(result.data[0].pay_date.?.eql(Date.fromYmd(2024, 6, 1))); + try std.testing.expect(result.data[0].record_date != null); + try std.testing.expect(result.data[0].record_date.?.eql(Date.fromYmd(2024, 5, 17))); + try std.testing.expectEqual(DividendType.regular, result.data[0].type); +} + +test "writeMerged Dividend: type unknown counts as null and gets upgraded" { + // Tiingo's dividend records always carry type = .unknown. A + // later Polygon write with type = .regular must upgrade the + // existing record. This is the "type-unknown is null-equivalent" + // rule. + const allocator = std.testing.allocator; + const io = std.testing.io; + var tmp = std.testing.tmpDir(.{}); + defer tmp.cleanup(); + const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator); + defer allocator.free(dir_path); + + var s = Store.init(io, allocator, dir_path); + + var tiingo_view = [_]Dividend{ + .{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50, .type = .unknown }, + }; + s.writeWithSource(Dividend, "TEST", tiingo_view[0..], Ttl.dividends, "tiingo"); + + var polygon_view = [_]Dividend{ + .{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50, .type = .special }, + }; + s.writeWithSource(Dividend, "TEST", polygon_view[0..], Ttl.dividends, "polygon"); + + const result = s.read(Dividend, "TEST", null, .any) orelse return error.NoCache; + defer allocator.free(result.data); + defer for (result.data) |d| d.deinit(allocator); + + try std.testing.expectEqual(@as(usize, 1), result.data.len); + try std.testing.expectEqual(DividendType.special, result.data[0].type); +} + +test "writeMerged Dividend: non-null fields are not overwritten" { + // If the existing record already has a non-null field, an + // incoming record's different value must NOT overwrite it. The + // rule is "first non-null wins." Catches a regression where the + // upgrade logic is too eager. + const allocator = std.testing.allocator; + const io = std.testing.io; + var tmp = std.testing.tmpDir(.{}); + defer tmp.cleanup(); + const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator); + defer allocator.free(dir_path); + + var s = Store.init(io, allocator, dir_path); + + var first = [_]Dividend{ + .{ + .ex_date = Date.fromYmd(2024, 5, 15), + .pay_date = Date.fromYmd(2024, 6, 1), + .amount = 0.50, + .type = .regular, + }, + }; + s.writeWithSource(Dividend, "TEST", first[0..], Ttl.dividends, "polygon"); + + // Second write with a different (and wrong) pay_date and type. + var second = [_]Dividend{ + .{ + .ex_date = Date.fromYmd(2024, 5, 15), + .pay_date = Date.fromYmd(2099, 1, 1), + .amount = 0.50, + .type = .special, + }, + }; + s.writeWithSource(Dividend, "TEST", second[0..], Ttl.dividends, "polygon"); + + const result = s.read(Dividend, "TEST", null, .any) orelse return error.NoCache; + defer allocator.free(result.data); + defer for (result.data) |d| d.deinit(allocator); + + try std.testing.expectEqual(@as(usize, 1), result.data.len); + // First record's fields stick. + try std.testing.expect(result.data[0].pay_date.?.eql(Date.fromYmd(2024, 6, 1))); + try std.testing.expectEqual(DividendType.regular, result.data[0].type); +} + +test "writeMerged Dividend: upgrade is no-op when both have same fields" { + // Both writes have the same ex_date, amount, pay_date, and + // type. There's nothing to upgrade and nothing new — the file + // should not be touched on the second write. + const allocator = std.testing.allocator; + const io = std.testing.io; + var tmp = std.testing.tmpDir(.{}); + defer tmp.cleanup(); + const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator); + defer allocator.free(dir_path); + + var s = Store.init(io, allocator, dir_path); + + var initial = [_]Dividend{ + .{ + .ex_date = Date.fromYmd(2024, 5, 15), + .pay_date = Date.fromYmd(2024, 6, 1), + .amount = 0.50, + .type = .regular, + }, + }; + s.writeWithSource(Dividend, "TEST", initial[0..], Ttl.dividends, "polygon"); + + const path = try std.fs.path.join(allocator, &.{ dir_path, "TEST", "dividends.srf" }); + defer allocator.free(path); + const stat_before = try std.Io.Dir.cwd().statFile(io, path, .{}); + + std.Io.sleep(io, std.Io.Duration.fromMilliseconds(20), .awake) catch {}; + + var repeat = [_]Dividend{ + .{ + .ex_date = Date.fromYmd(2024, 5, 15), + .pay_date = Date.fromYmd(2024, 6, 1), + .amount = 0.50, + .type = .regular, + }, + }; + s.writeWithSource(Dividend, "TEST", repeat[0..], Ttl.dividends, "polygon"); + + const stat_after = try std.Io.Dir.cwd().statFile(io, path, .{}); + try std.testing.expectEqual(stat_before.mtime, stat_after.mtime); +} + test "writeMerged Split: SPYM-style supplementary entry added" { const allocator = std.testing.allocator; const io = std.testing.io; diff --git a/src/commands/divs.zig b/src/commands/divs.zig index d33cfbc..0bccecc 100644 --- a/src/commands/divs.zig +++ b/src/commands/divs.zig @@ -75,11 +75,11 @@ pub fn display(dividends: []const zfin.Dividend, symbol: []const u8, current_pri } try cli.setFg(out, color, cli.CLR_MUTED); - try out.print("{s:>12} {s:>10} {s:>12} {s:>6} {s:>10}\n", .{ - "Ex-Date", "Amount", "Pay Date", "Freq", "Type", + try out.print("{s:>12} {s:>10} {s:>12} {s:>10}\n", .{ + "Ex-Date", "Amount", "Pay Date", "Type", }); - try out.print("{s:->12} {s:->10} {s:->12} {s:->6} {s:->10}\n", .{ - "", "", "", "", "", + try out.print("{s:->12} {s:->10} {s:->12} {s:->10}\n", .{ + "", "", "", "", }); try cli.reset(out, color); @@ -94,11 +94,6 @@ pub fn display(dividends: []const zfin.Dividend, symbol: []const u8, current_pri } else { try out.print(" {s:>12}", .{"--"}); } - if (div.frequency) |f| { - try out.print(" {d:>6}", .{f}); - } else { - try out.print(" {s:>6}", .{"--"}); - } try out.print(" {s:>10}\n", .{@tagName(div.type)}); total += div.amount; if (!div.ex_date.lessThan(one_year_ago)) ttm += div.amount; diff --git a/src/models/dividend.zig b/src/models/dividend.zig index 51ee84b..99ffda5 100644 --- a/src/models/dividend.zig +++ b/src/models/dividend.zig @@ -19,8 +19,6 @@ pub const Dividend = struct { record_date: ?Date = null, /// Cash amount per share amount: f64, - /// How many times per year this dividend is expected - frequency: ?u8 = null, /// Classification of the dividend type: DividendType = .unknown, /// Currency code (e.g., "USD"). Heap-allocated; freed by deinit(). diff --git a/src/providers/polygon.zig b/src/providers/polygon.zig index 0a7bf66..9e0eba9 100644 --- a/src/providers/polygon.zig +++ b/src/providers/polygon.zig @@ -183,7 +183,6 @@ fn parseDividendsPage( .amount = amount, .pay_date = parseDateField(obj, "pay_date"), .record_date = parseDateField(obj, "record_date"), - .frequency = parseFrequency(obj), .type = parseDividendType(obj), .currency = if (jsonStr(obj.get("currency"))) |s| (allocator.dupe(u8, s) catch null) @@ -271,15 +270,6 @@ fn parseDateField(obj: std.json.ObjectMap, key: []const u8) ?Date { return Date.parse(s) catch null; } -fn parseFrequency(obj: std.json.ObjectMap) ?u8 { - const v = obj.get("frequency") orelse return null; - return switch (v) { - .integer => |i| if (i > 0 and i <= 255) @intCast(i) else null, - .float => |f| if (f > 0 and f <= 255) @intFromFloat(f) else null, - else => null, - }; -} - fn parseDividendType(obj: std.json.ObjectMap) DividendType { const v = obj.get("dividend_type") orelse return .unknown; const s = switch (v) { @@ -332,7 +322,6 @@ test "parseDividendsPage basic" { try std.testing.expect(out.items[0].ex_date.eql(Date.fromYmd(2024, 8, 12))); try std.testing.expectApproxEqAbs(@as(f64, 0.25), out.items[0].amount, 0.001); try std.testing.expect(out.items[0].pay_date != null); - try std.testing.expectEqual(@as(?u8, 4), out.items[0].frequency); try std.testing.expectEqual(DividendType.regular, out.items[0].type); try std.testing.expectEqual(DividendType.special, out.items[1].type); diff --git a/src/providers/tiingo.zig b/src/providers/tiingo.zig index 32c821c..d895c13 100644 --- a/src/providers/tiingo.zig +++ b/src/providers/tiingo.zig @@ -216,9 +216,8 @@ fn parseAll(allocator: std.mem.Allocator, body: []const u8) !CandleAndCorporateA .ex_date = date, .amount = div_cash, // Tiingo doesn't carry pay_date / record_date / - // frequency / type. Display-only fields stay null / - // .unknown; total-return math only needs ex_date and - // amount. + // type. Display-only fields stay null / .unknown; + // total-return math only needs ex_date and amount. }); } @@ -317,7 +316,6 @@ test "parseAll extracts a dividend from a divCash row" { // Metadata fields are absent for Tiingo-sourced dividends try std.testing.expect(div.pay_date == null); try std.testing.expect(div.record_date == null); - try std.testing.expect(div.frequency == null); try std.testing.expectEqual(@import("../models/dividend.zig").DividendType.unknown, div.type); } diff --git a/src/service.zig b/src/service.zig index c37e770..8333aee 100644 --- a/src/service.zig +++ b/src/service.zig @@ -330,7 +330,7 @@ pub const DataService = struct { const retried = self.fetchFromProvider(T, symbol) catch { return DataError.FetchFailed; }; - s.write(T, symbol, retried, data_type.ttl()); + s.writeWithSource(T, symbol, retried, data_type.ttl(), sourceHintFor(T)); return .{ .data = retried, .source = .fetched, .timestamp = std.Io.Timestamp.now(self.io, .real).toSeconds(), .allocator = self.allocator }; } // Only NotFound (provider says "this symbol genuinely has @@ -344,10 +344,23 @@ pub const DataService = struct { return DataError.FetchFailed; }; - s.write(T, symbol, fetched, data_type.ttl()); + s.writeWithSource(T, symbol, fetched, data_type.ttl(), sourceHintFor(T)); return .{ .data = fetched, .source = .fetched, .timestamp = std.Io.Timestamp.now(self.io, .real).toSeconds(), .allocator = self.allocator }; } + /// Map the model type fetched via `fetchCached` back to the + /// provider it came from, so the merge primitive's `info(cache)` + /// log lines can attribute new entries / field upgrades to a + /// named source. Returns null for types where the source name + /// isn't useful (the merge primitive only consults this for + /// Dividend and Split). + fn sourceHintFor(comptime T: type) ?[]const u8 { + return switch (T) { + Dividend, Split => "polygon", + else => null, + }; + } + /// Dispatch a fetch to the correct provider based on model type. fn fetchFromProvider(self: *DataService, comptime T: type, symbol: []const u8) !cache.Store.DataFor(T) { return switch (T) { @@ -1560,7 +1573,7 @@ pub const DataService = struct { .dividends => "/dividends", .earnings => "/earnings", .options => "/options", - .splits => return false, // not served + .splits => "/splits", .etf_profile => return false, // not served .meta => return false, };