diff --git a/src/main.zig b/src/main.zig index ea14d60..57382dc 100644 --- a/src/main.zig +++ b/src/main.zig @@ -485,6 +485,84 @@ fn printRateLimitTag(svc: *zfin.DataService, data_type: zfin.cache.DataType, std } } +/// Per-data-type outcome tally for a refresh run. `fetched` vs `cached` +/// tracks network-vs-cache (useful for spotting cache expiry / TTL +/// tuning and for gauging load against provider rate limits); `na` is a +/// legitimate "no data for this symbol" outcome (NotFound, or an +/// entity_facts skip); `failed` is a hard error. +const TypeStat = struct { + fetched: usize = 0, + cached: usize = 0, + na: usize = 0, + failed: usize = 0, + + /// Record a successful fetch: network hit vs served-from-cache. + fn hit(self: *TypeStat, was_fetched: bool) void { + if (was_fetched) self.fetched += 1 else self.cached += 1; + } +}; + +/// Per-type tallies for the seven per-symbol data types a refresh +/// touches. Reported as the summary table at the end of a run. +const RefreshStats = struct { + candles: TypeStat = .{}, + dividends: TypeStat = .{}, + splits: TypeStat = .{}, + earnings: TypeStat = .{}, + classification: TypeStat = .{}, + etf_metrics: TypeStat = .{}, + entity_facts: TypeStat = .{}, +}; + +/// Symbol-level status partition. Precedence, highest first: +/// failed > lagging > overdue > current -- the same precedence +/// `refreshExit` uses, so the printed counts always agree with the +/// process exit code (lagging is NOT folded into "current"). +const SymbolCounts = struct { + current: usize = 0, + lagging: usize = 0, + overdue: usize = 0, + failed: usize = 0, +}; + +/// Candle freshness for one symbol this run. Set during the candle +/// check; stays `.current` if candles were empty or the symbol failed +/// before the check (a failed symbol is bucketed as `failed` regardless). +const Freshness = enum { current, lagging, overdue }; + +/// Record a hard failure of `tag` for the current symbol: bump that +/// type's failed counter and remember the tag, in one call, so the two +/// can't drift apart. The remembered tags become the end-of-symbol +/// `failed:` entry, e.g. "SYM (candles, earnings)". Each data-type block +/// runs once per symbol and `fail_types` is cleared per symbol, so a tag +/// is recorded at most once per symbol (no duplicates). +fn recordFailure(stat: *TypeStat, fail_types: *std.ArrayList([]const u8), allocator: std.mem.Allocator, tag: []const u8) !void { + stat.failed += 1; + try fail_types.append(allocator, tag); +} + +/// Print one right-aligned data-type row of the summary table. Shares +/// its width specifiers with the header in `refresh` so columns line up. +fn printStatRow(stdout: *std.Io.Writer, name: []const u8, s: TypeStat) !void { + try stdout.print(" {s:<14}{d:>7}{d:>8}{d:>5}{d:>8}\n", .{ name, s.fetched, s.cached, s.na, s.failed }); +} + +/// Print a `label: a, b, c` line, or `label: (none)` when empty. Always +/// printed (even when empty) so the failed/lagging/overdue lines are +/// reliable grep targets. +fn printSymbolList(stdout: *std.Io.Writer, label: []const u8, items: []const []const u8) !void { + try stdout.print(" {s:<9}", .{label}); + if (items.len == 0) { + try stdout.print("(none)\n", .{}); + return; + } + for (items, 0..) |it, i| { + if (i > 0) try stdout.print(", ", .{}); + try stdout.print("{s}", .{it}); + } + try stdout.print("\n", .{}); +} + /// Format as percentage (e.g., 0.1234 -> "12.34000"), or "null" if absent. fn fmtPct(arena: std.mem.Allocator, value: ?f64) []const u8 { if (value) |v| return std.fmt.allocPrint(arena, "{d:.5}", .{v * 100.0}) catch "null"; @@ -580,6 +658,10 @@ fn refresh(io: std.Io, allocator: std.mem.Allocator, environ: *const std.process // once so the whole run shares a consistent "now". const now_s = std.Io.Timestamp.now(io, .real).toSeconds(); + // wall-clock required: end-to-end run duration for the summary line. + // .awake (monotonic) avoids skew-induced negatives like dispatch does. + const start_ns = std.Io.Timestamp.now(io, .awake).nanoseconds; + const portfolio_path = environ.get("ZFIN_PORTFOLIO") orelse "portfolio.srf"; const data = std.Io.Dir.cwd().readFileAlloc(io, portfolio_path, allocator, .limited(10 * 1024 * 1024)) catch { @@ -615,9 +697,14 @@ fn refresh(io: std.Io, allocator: std.mem.Allocator, environ: *const std.process try stdout.print("note: [~Ns] = est. pause before the next live fetch (bucket empty); cache hits skip it\n", .{}); try stdout.flush(); - var success_count: u32 = 0; - var fail_count: u32 = 0; - var lag_count: u32 = 0; + var counts: SymbolCounts = .{}; + var stats: RefreshStats = .{}; + var failed_list = std.ArrayList([]const u8).empty; + var lagging_list = std.ArrayList([]const u8).empty; + var overdue_list = std.ArrayList([]const u8).empty; + // Reused per symbol (clearRetainingCapacity) to collect the data-type + // tags that failed, so the failed list can read "SYM (candles, earnings)". + var fail_types = std.ArrayList([]const u8).empty; // Warm the EDGAR ticker maps once per refresh run. They're // ~3-5 MB each, cached for 30 days; warming guarantees the @@ -651,13 +738,15 @@ fn refresh(io: std.Io, allocator: std.mem.Allocator, environ: *const std.process try stdout.print("{s}: ", .{sym}); try stdout.flush(); - var sym_ok = true; + var sym_freshness: Freshness = .current; + fail_types.clearRetainingCapacity(); // Candles try printRateLimitTag(&svc, .candles_daily, stdout); if (svc.getCandles(sym, .{})) |result| { defer result.deinit(); try stdout.print("candles ok ({s})", .{@tagName(result.source)}); + stats.candles.hit(result.source == .fetched); // Provider-data-lag check: did we end up with the latest bar // the market should have posted by now? A `.lagging` bar is @@ -670,22 +759,25 @@ fn refresh(io: std.Io, allocator: std.mem.Allocator, environ: *const std.process const ds = std.fmt.bufPrint(&date_buf, "{f}", .{last}) catch "?"; switch (zfin.market.candleFreshness(now_s, zfin.market.classify(sym), last)) { .lagging => { - lag_count += 1; + sym_freshness = .lagging; try stdout.print(" LAGGING (latest {s})", .{ds}); log.info("provider data lag: {s} latest bar {s}; newer session due but unposted", .{ sym, ds }); }, - .overdue => log.info("{s}: latest bar {s} overdue past grace window; assuming market closure (no retry)", .{ sym, ds }), + .overdue => { + sym_freshness = .overdue; + log.info("{s}: latest bar {s} overdue past grace window; assuming market closure (no retry)", .{ sym, ds }); + }, .current => {}, } } } else |err| { try stdout.print("candles FAILED ({s})", .{@errorName(err)}); - sym_ok = false; + try recordFailure(&stats.candles, &fail_types, allocator, "candles"); if (err == zfin.DataError.TransientError or err == zfin.DataError.AuthError) { const reason = if (err == zfin.DataError.AuthError) "auth failure" else "transient provider failure"; try stdout.print("\n", .{}); try stdout.print("\nStopping refresh: {s}\n", .{reason}); - try stdout.print("Refresh aborted: {d} ok, {d} failed\n", .{ success_count, fail_count + 1 }); + try stdout.print("Refresh aborted after {d} current, {d} lagging, {d} overdue, {d} failed\n", .{ counts.current, counts.lagging, counts.overdue, counts.failed + 1 }); try stdout.flush(); return error.RefreshFailed; } @@ -697,9 +789,10 @@ fn refresh(io: std.Io, allocator: std.mem.Allocator, environ: *const std.process if (svc.getDividends(sym, .{})) |result| { defer result.deinit(); try stdout.print("dividends ok ({s})", .{@tagName(result.source)}); + stats.dividends.hit(result.source == .fetched); } else |err| { try stdout.print("dividends FAILED ({s})", .{@errorName(err)}); - sym_ok = false; + try recordFailure(&stats.dividends, &fail_types, allocator, "dividends"); } // Splits @@ -708,9 +801,10 @@ fn refresh(io: std.Io, allocator: std.mem.Allocator, environ: *const std.process if (svc.getSplits(sym, .{})) |result| { defer result.deinit(); try stdout.print("splits ok ({s})", .{@tagName(result.source)}); + stats.splits.hit(result.source == .fetched); } else |err| { try stdout.print("splits FAILED ({s})", .{@errorName(err)}); - sym_ok = false; + try recordFailure(&stats.splits, &fail_types, allocator, "splits"); } // Earnings @@ -719,15 +813,16 @@ fn refresh(io: std.Io, allocator: std.mem.Allocator, environ: *const std.process if (svc.getEarnings(sym, .{})) |result| { defer result.deinit(); try stdout.print("earnings ok ({s})", .{@tagName(result.source)}); + stats.earnings.hit(result.source == .fetched); } else |err| { try stdout.print("earnings FAILED ({s})", .{@errorName(err)}); - sym_ok = false; + try recordFailure(&stats.earnings, &fail_types, allocator, "earnings"); } // Classification (Wikidata + EDGAR fallback). Captures // CIK and is_etf — used to chain into entity_facts below. // NotFound is logged as `n/a` (symbol genuinely has no - // Wikidata or EDGAR entry) and doesn't flip sym_ok. + // Wikidata or EDGAR entry) and isn't counted as a failure. var cik_buf: ?[]u8 = null; defer if (cik_buf) |b| allocator.free(b); var is_etf = false; @@ -742,28 +837,36 @@ fn refresh(io: std.Io, allocator: std.mem.Allocator, environ: *const std.process is_etf = result.data[0].is_etf; } try stdout.print("classification ok ({s})", .{@tagName(result.source)}); + stats.classification.hit(result.source == .fetched); } else |err| switch (err) { - zfin.DataError.NotFound => try stdout.print("classification n/a", .{}), + zfin.DataError.NotFound => { + try stdout.print("classification n/a", .{}); + stats.classification.na += 1; + }, else => { try stdout.print("classification FAILED ({t})", .{err}); - sym_ok = false; + try recordFailure(&stats.classification, &fail_types, allocator, "classification"); }, } // ETF metrics. NotFound is the expected outcome for // non-funds (NPORT-P only exists for funds + UITs); a // negative-cache entry suppresses retries. Logged as - // `n/a` and doesn't flip sym_ok. + // `n/a` and isn't counted as a failure. try stdout.print(", ", .{}); try printRateLimitTag(&svc, .etf_metrics, stdout); if (svc.getEtfMetrics(sym, .{})) |result| { defer result.deinit(); try stdout.print("etf_metrics ok ({s})", .{@tagName(result.source)}); + stats.etf_metrics.hit(result.source == .fetched); } else |err| switch (err) { - zfin.DataError.NotFound => try stdout.print("etf_metrics n/a", .{}), + zfin.DataError.NotFound => { + try stdout.print("etf_metrics n/a", .{}); + stats.etf_metrics.na += 1; + }, else => { try stdout.print("etf_metrics FAILED ({t})", .{err}); - sym_ok = false; + try recordFailure(&stats.etf_metrics, &fail_types, allocator, "etf_metrics"); }, } @@ -777,31 +880,82 @@ fn refresh(io: std.Io, allocator: std.mem.Allocator, environ: *const std.process try stdout.print(", ", .{}); if (is_etf) { try stdout.print("entity_facts n/a (ETF)", .{}); + stats.entity_facts.na += 1; } else { try printRateLimitTag(&svc, .entity_facts, stdout); if (svc.getEntityFacts(cik, .{})) |result| { defer result.deinit(); try stdout.print("entity_facts ok ({s})", .{@tagName(result.source)}); + stats.entity_facts.hit(result.source == .fetched); } else |err| switch (err) { - zfin.DataError.NotFound => try stdout.print("entity_facts n/a", .{}), + zfin.DataError.NotFound => { + try stdout.print("entity_facts n/a", .{}); + stats.entity_facts.na += 1; + }, else => { try stdout.print("entity_facts FAILED ({t})", .{err}); - sym_ok = false; + try recordFailure(&stats.entity_facts, &fail_types, allocator, "entity_facts"); }, } } + } else { + // No CIK resolved: entity_facts is not applicable for this + // symbol, so it counts as n/a (keeps the table row summing + // to the total symbol count). + stats.entity_facts.na += 1; } try stdout.print("\n", .{}); try stdout.flush(); - if (sym_ok) success_count += 1 else fail_count += 1; + // Bucket the symbol by precedence failed > lagging > overdue > + // current (same order refreshExit uses for the exit code). A + // symbol failed iff any data type recorded a hard error. + if (fail_types.items.len > 0) { + counts.failed += 1; + const types = try std.mem.join(allocator, ", ", fail_types.items); + try failed_list.append(allocator, try std.fmt.allocPrint(allocator, "{s} ({s})", .{ sym, types })); + } else switch (sym_freshness) { + .current => counts.current += 1, + .lagging => { + counts.lagging += 1; + try lagging_list.append(allocator, sym); + }, + .overdue => { + counts.overdue += 1; + try overdue_list.append(allocator, sym); + }, + } } - try stdout.print("\nRefresh complete: {d} ok, {d} failed, {d} lagging\n", .{ success_count, fail_count, lag_count }); + const elapsed_ns = std.Io.Timestamp.now(io, .awake).nanoseconds - start_ns; + const elapsed_s: u64 = @intCast(@divTrunc(elapsed_ns, std.time.ns_per_s)); + const code = refreshExit(counts.failed, counts.lagging); + const reason = switch (code) { + 0 => "clean", + 75 => "lagging", + else => "failures", + }; + + try stdout.print("\nRefresh complete in {d}s (exit {d}: {s})\n", .{ elapsed_s, code, reason }); + try stdout.print(" symbols: {d} current, {d} lagging, {d} overdue, {d} failed ({d} total)\n", .{ counts.current, counts.lagging, counts.overdue, counts.failed, symbols.count() }); + + try stdout.print("\n {s:<14}{s:>7}{s:>8}{s:>5}{s:>8}\n", .{ "type", "fetched", "cached", "n/a", "failed" }); + try printStatRow(stdout, "candles", stats.candles); + try printStatRow(stdout, "dividends", stats.dividends); + try printStatRow(stdout, "splits", stats.splits); + try printStatRow(stdout, "earnings", stats.earnings); + try printStatRow(stdout, "classification", stats.classification); + try printStatRow(stdout, "etf_metrics", stats.etf_metrics); + try printStatRow(stdout, "entity_facts", stats.entity_facts); + + try stdout.print("\n", .{}); + try printSymbolList(stdout, "failed:", failed_list.items); + try printSymbolList(stdout, "lagging:", lagging_list.items); + try printSymbolList(stdout, "overdue:", overdue_list.items); try stdout.flush(); - return refreshExit(fail_count, lag_count); + return code; } /// Map a refresh run's failure/lag counts to a process exit code: @@ -812,7 +966,7 @@ fn refresh(io: std.Io, allocator: std.mem.Allocator, environ: *const std.process /// 1 - at least one hard failure (fetch error) /// Hard failure dominates lag - if anything failed outright that's the /// code the operator needs to act on. -fn refreshExit(fail_count: u32, lag_count: u32) u8 { +fn refreshExit(fail_count: usize, lag_count: usize) u8 { if (fail_count > 0) return 1; if (lag_count > 0) return 75; return 0; @@ -977,3 +1131,37 @@ test "refreshExit: hard failure dominates, then lag, else clean" { // A hard failure outranks lag. try std.testing.expectEqual(@as(u8, 1), refreshExit(1, 5)); } + +test "printStatRow aligns with the summary-table header" { + var hdr: std.Io.Writer.Allocating = .init(std.testing.allocator); + defer hdr.deinit(); + try hdr.writer.print(" {s:<14}{s:>7}{s:>8}{s:>5}{s:>8}\n", .{ "type", "fetched", "cached", "n/a", "failed" }); + const hdr_out = try hdr.toOwnedSlice(); + defer std.testing.allocator.free(hdr_out); + + var row: std.Io.Writer.Allocating = .init(std.testing.allocator); + defer row.deinit(); + // Multi-digit values and the widest type name still fit the columns. + try printStatRow(&row.writer, "classification", .{ .fetched = 0, .cached = 31, .na = 14, .failed = 0 }); + const row_out = try row.toOwnedSlice(); + defer std.testing.allocator.free(row_out); + + // Identical width specifiers => identical rendered length => columns line up. + try std.testing.expectEqual(hdr_out.len, row_out.len); +} + +test "printSymbolList: empty shows (none), non-empty joins with commas" { + var a: std.Io.Writer.Allocating = .init(std.testing.allocator); + defer a.deinit(); + try printSymbolList(&a.writer, "failed:", &.{}); + const a_out = try a.toOwnedSlice(); + defer std.testing.allocator.free(a_out); + try std.testing.expectEqualStrings(" failed: (none)\n", a_out); + + var b: std.Io.Writer.Allocating = .init(std.testing.allocator); + defer b.deinit(); + try printSymbolList(&b.writer, "lagging:", &.{ "NKE", "AMZN" }); + const b_out = try b.toOwnedSlice(); + defer std.testing.allocator.free(b_out); + try std.testing.expectEqualStrings(" lagging: NKE, AMZN\n", b_out); +}