better refresh reporting
All checks were successful
Generic zig build / build (push) Successful in 1m43s
Generic zig build / deploy (push) Successful in 18s

This commit is contained in:
Emil Lerch 2026-06-26 11:32:51 -07:00
parent 4366414207
commit 890b07fe42
Signed by: lobo
GPG key ID: A7B62D657EF764F8

View file

@ -485,6 +485,84 @@ fn printRateLimitTag(svc: *zfin.DataService, data_type: zfin.cache.DataType, std
}
}
/// Per-data-type outcome tally for a refresh run. `fetched` vs `cached`
/// tracks network-vs-cache (useful for spotting cache expiry / TTL
/// tuning and for gauging load against provider rate limits); `na` is a
/// legitimate "no data for this symbol" outcome (NotFound, or an
/// entity_facts skip); `failed` is a hard error.
const TypeStat = struct {
fetched: usize = 0,
cached: usize = 0,
na: usize = 0,
failed: usize = 0,
/// Record a successful fetch: network hit vs served-from-cache.
fn hit(self: *TypeStat, was_fetched: bool) void {
if (was_fetched) self.fetched += 1 else self.cached += 1;
}
};
/// Per-type tallies for the seven per-symbol data types a refresh
/// touches. Reported as the summary table at the end of a run.
const RefreshStats = struct {
candles: TypeStat = .{},
dividends: TypeStat = .{},
splits: TypeStat = .{},
earnings: TypeStat = .{},
classification: TypeStat = .{},
etf_metrics: TypeStat = .{},
entity_facts: TypeStat = .{},
};
/// Symbol-level status partition. Precedence, highest first:
/// failed > lagging > overdue > current -- the same precedence
/// `refreshExit` uses, so the printed counts always agree with the
/// process exit code (lagging is NOT folded into "current").
const SymbolCounts = struct {
current: usize = 0,
lagging: usize = 0,
overdue: usize = 0,
failed: usize = 0,
};
/// Candle freshness for one symbol this run. Set during the candle
/// check; stays `.current` if candles were empty or the symbol failed
/// before the check (a failed symbol is bucketed as `failed` regardless).
const Freshness = enum { current, lagging, overdue };
/// Record a hard failure of `tag` for the current symbol: bump that
/// type's failed counter and remember the tag, in one call, so the two
/// can't drift apart. The remembered tags become the end-of-symbol
/// `failed:` entry, e.g. "SYM (candles, earnings)". Each data-type block
/// runs once per symbol and `fail_types` is cleared per symbol, so a tag
/// is recorded at most once per symbol (no duplicates).
fn recordFailure(stat: *TypeStat, fail_types: *std.ArrayList([]const u8), allocator: std.mem.Allocator, tag: []const u8) !void {
stat.failed += 1;
try fail_types.append(allocator, tag);
}
/// Print one right-aligned data-type row of the summary table. Shares
/// its width specifiers with the header in `refresh` so columns line up.
fn printStatRow(stdout: *std.Io.Writer, name: []const u8, s: TypeStat) !void {
try stdout.print(" {s:<14}{d:>7}{d:>8}{d:>5}{d:>8}\n", .{ name, s.fetched, s.cached, s.na, s.failed });
}
/// Print a `label: a, b, c` line, or `label: (none)` when empty. Always
/// printed (even when empty) so the failed/lagging/overdue lines are
/// reliable grep targets.
fn printSymbolList(stdout: *std.Io.Writer, label: []const u8, items: []const []const u8) !void {
try stdout.print(" {s:<9}", .{label});
if (items.len == 0) {
try stdout.print("(none)\n", .{});
return;
}
for (items, 0..) |it, i| {
if (i > 0) try stdout.print(", ", .{});
try stdout.print("{s}", .{it});
}
try stdout.print("\n", .{});
}
/// Format as percentage (e.g., 0.1234 -> "12.34000"), or "null" if absent.
fn fmtPct(arena: std.mem.Allocator, value: ?f64) []const u8 {
if (value) |v| return std.fmt.allocPrint(arena, "{d:.5}", .{v * 100.0}) catch "null";
@ -580,6 +658,10 @@ fn refresh(io: std.Io, allocator: std.mem.Allocator, environ: *const std.process
// once so the whole run shares a consistent "now".
const now_s = std.Io.Timestamp.now(io, .real).toSeconds();
// wall-clock required: end-to-end run duration for the summary line.
// .awake (monotonic) avoids skew-induced negatives like dispatch does.
const start_ns = std.Io.Timestamp.now(io, .awake).nanoseconds;
const portfolio_path = environ.get("ZFIN_PORTFOLIO") orelse "portfolio.srf";
const data = std.Io.Dir.cwd().readFileAlloc(io, portfolio_path, allocator, .limited(10 * 1024 * 1024)) catch {
@ -615,9 +697,14 @@ fn refresh(io: std.Io, allocator: std.mem.Allocator, environ: *const std.process
try stdout.print("note: [~Ns] = est. pause before the next live fetch (bucket empty); cache hits skip it\n", .{});
try stdout.flush();
var success_count: u32 = 0;
var fail_count: u32 = 0;
var lag_count: u32 = 0;
var counts: SymbolCounts = .{};
var stats: RefreshStats = .{};
var failed_list = std.ArrayList([]const u8).empty;
var lagging_list = std.ArrayList([]const u8).empty;
var overdue_list = std.ArrayList([]const u8).empty;
// Reused per symbol (clearRetainingCapacity) to collect the data-type
// tags that failed, so the failed list can read "SYM (candles, earnings)".
var fail_types = std.ArrayList([]const u8).empty;
// Warm the EDGAR ticker maps once per refresh run. They're
// ~3-5 MB each, cached for 30 days; warming guarantees the
@ -651,13 +738,15 @@ fn refresh(io: std.Io, allocator: std.mem.Allocator, environ: *const std.process
try stdout.print("{s}: ", .{sym});
try stdout.flush();
var sym_ok = true;
var sym_freshness: Freshness = .current;
fail_types.clearRetainingCapacity();
// Candles
try printRateLimitTag(&svc, .candles_daily, stdout);
if (svc.getCandles(sym, .{})) |result| {
defer result.deinit();
try stdout.print("candles ok ({s})", .{@tagName(result.source)});
stats.candles.hit(result.source == .fetched);
// Provider-data-lag check: did we end up with the latest bar
// the market should have posted by now? A `.lagging` bar is
@ -670,22 +759,25 @@ fn refresh(io: std.Io, allocator: std.mem.Allocator, environ: *const std.process
const ds = std.fmt.bufPrint(&date_buf, "{f}", .{last}) catch "?";
switch (zfin.market.candleFreshness(now_s, zfin.market.classify(sym), last)) {
.lagging => {
lag_count += 1;
sym_freshness = .lagging;
try stdout.print(" LAGGING (latest {s})", .{ds});
log.info("provider data lag: {s} latest bar {s}; newer session due but unposted", .{ sym, ds });
},
.overdue => log.info("{s}: latest bar {s} overdue past grace window; assuming market closure (no retry)", .{ sym, ds }),
.overdue => {
sym_freshness = .overdue;
log.info("{s}: latest bar {s} overdue past grace window; assuming market closure (no retry)", .{ sym, ds });
},
.current => {},
}
}
} else |err| {
try stdout.print("candles FAILED ({s})", .{@errorName(err)});
sym_ok = false;
try recordFailure(&stats.candles, &fail_types, allocator, "candles");
if (err == zfin.DataError.TransientError or err == zfin.DataError.AuthError) {
const reason = if (err == zfin.DataError.AuthError) "auth failure" else "transient provider failure";
try stdout.print("\n", .{});
try stdout.print("\nStopping refresh: {s}\n", .{reason});
try stdout.print("Refresh aborted: {d} ok, {d} failed\n", .{ success_count, fail_count + 1 });
try stdout.print("Refresh aborted after {d} current, {d} lagging, {d} overdue, {d} failed\n", .{ counts.current, counts.lagging, counts.overdue, counts.failed + 1 });
try stdout.flush();
return error.RefreshFailed;
}
@ -697,9 +789,10 @@ fn refresh(io: std.Io, allocator: std.mem.Allocator, environ: *const std.process
if (svc.getDividends(sym, .{})) |result| {
defer result.deinit();
try stdout.print("dividends ok ({s})", .{@tagName(result.source)});
stats.dividends.hit(result.source == .fetched);
} else |err| {
try stdout.print("dividends FAILED ({s})", .{@errorName(err)});
sym_ok = false;
try recordFailure(&stats.dividends, &fail_types, allocator, "dividends");
}
// Splits
@ -708,9 +801,10 @@ fn refresh(io: std.Io, allocator: std.mem.Allocator, environ: *const std.process
if (svc.getSplits(sym, .{})) |result| {
defer result.deinit();
try stdout.print("splits ok ({s})", .{@tagName(result.source)});
stats.splits.hit(result.source == .fetched);
} else |err| {
try stdout.print("splits FAILED ({s})", .{@errorName(err)});
sym_ok = false;
try recordFailure(&stats.splits, &fail_types, allocator, "splits");
}
// Earnings
@ -719,15 +813,16 @@ fn refresh(io: std.Io, allocator: std.mem.Allocator, environ: *const std.process
if (svc.getEarnings(sym, .{})) |result| {
defer result.deinit();
try stdout.print("earnings ok ({s})", .{@tagName(result.source)});
stats.earnings.hit(result.source == .fetched);
} else |err| {
try stdout.print("earnings FAILED ({s})", .{@errorName(err)});
sym_ok = false;
try recordFailure(&stats.earnings, &fail_types, allocator, "earnings");
}
// Classification (Wikidata + EDGAR fallback). Captures
// CIK and is_etf used to chain into entity_facts below.
// NotFound is logged as `n/a` (symbol genuinely has no
// Wikidata or EDGAR entry) and doesn't flip sym_ok.
// Wikidata or EDGAR entry) and isn't counted as a failure.
var cik_buf: ?[]u8 = null;
defer if (cik_buf) |b| allocator.free(b);
var is_etf = false;
@ -742,28 +837,36 @@ fn refresh(io: std.Io, allocator: std.mem.Allocator, environ: *const std.process
is_etf = result.data[0].is_etf;
}
try stdout.print("classification ok ({s})", .{@tagName(result.source)});
stats.classification.hit(result.source == .fetched);
} else |err| switch (err) {
zfin.DataError.NotFound => try stdout.print("classification n/a", .{}),
zfin.DataError.NotFound => {
try stdout.print("classification n/a", .{});
stats.classification.na += 1;
},
else => {
try stdout.print("classification FAILED ({t})", .{err});
sym_ok = false;
try recordFailure(&stats.classification, &fail_types, allocator, "classification");
},
}
// ETF metrics. NotFound is the expected outcome for
// non-funds (NPORT-P only exists for funds + UITs); a
// negative-cache entry suppresses retries. Logged as
// `n/a` and doesn't flip sym_ok.
// `n/a` and isn't counted as a failure.
try stdout.print(", ", .{});
try printRateLimitTag(&svc, .etf_metrics, stdout);
if (svc.getEtfMetrics(sym, .{})) |result| {
defer result.deinit();
try stdout.print("etf_metrics ok ({s})", .{@tagName(result.source)});
stats.etf_metrics.hit(result.source == .fetched);
} else |err| switch (err) {
zfin.DataError.NotFound => try stdout.print("etf_metrics n/a", .{}),
zfin.DataError.NotFound => {
try stdout.print("etf_metrics n/a", .{});
stats.etf_metrics.na += 1;
},
else => {
try stdout.print("etf_metrics FAILED ({t})", .{err});
sym_ok = false;
try recordFailure(&stats.etf_metrics, &fail_types, allocator, "etf_metrics");
},
}
@ -777,31 +880,82 @@ fn refresh(io: std.Io, allocator: std.mem.Allocator, environ: *const std.process
try stdout.print(", ", .{});
if (is_etf) {
try stdout.print("entity_facts n/a (ETF)", .{});
stats.entity_facts.na += 1;
} else {
try printRateLimitTag(&svc, .entity_facts, stdout);
if (svc.getEntityFacts(cik, .{})) |result| {
defer result.deinit();
try stdout.print("entity_facts ok ({s})", .{@tagName(result.source)});
stats.entity_facts.hit(result.source == .fetched);
} else |err| switch (err) {
zfin.DataError.NotFound => try stdout.print("entity_facts n/a", .{}),
zfin.DataError.NotFound => {
try stdout.print("entity_facts n/a", .{});
stats.entity_facts.na += 1;
},
else => {
try stdout.print("entity_facts FAILED ({t})", .{err});
sym_ok = false;
try recordFailure(&stats.entity_facts, &fail_types, allocator, "entity_facts");
},
}
}
} else {
// No CIK resolved: entity_facts is not applicable for this
// symbol, so it counts as n/a (keeps the table row summing
// to the total symbol count).
stats.entity_facts.na += 1;
}
try stdout.print("\n", .{});
try stdout.flush();
if (sym_ok) success_count += 1 else fail_count += 1;
// Bucket the symbol by precedence failed > lagging > overdue >
// current (same order refreshExit uses for the exit code). A
// symbol failed iff any data type recorded a hard error.
if (fail_types.items.len > 0) {
counts.failed += 1;
const types = try std.mem.join(allocator, ", ", fail_types.items);
try failed_list.append(allocator, try std.fmt.allocPrint(allocator, "{s} ({s})", .{ sym, types }));
} else switch (sym_freshness) {
.current => counts.current += 1,
.lagging => {
counts.lagging += 1;
try lagging_list.append(allocator, sym);
},
.overdue => {
counts.overdue += 1;
try overdue_list.append(allocator, sym);
},
}
}
try stdout.print("\nRefresh complete: {d} ok, {d} failed, {d} lagging\n", .{ success_count, fail_count, lag_count });
const elapsed_ns = std.Io.Timestamp.now(io, .awake).nanoseconds - start_ns;
const elapsed_s: u64 = @intCast(@divTrunc(elapsed_ns, std.time.ns_per_s));
const code = refreshExit(counts.failed, counts.lagging);
const reason = switch (code) {
0 => "clean",
75 => "lagging",
else => "failures",
};
try stdout.print("\nRefresh complete in {d}s (exit {d}: {s})\n", .{ elapsed_s, code, reason });
try stdout.print(" symbols: {d} current, {d} lagging, {d} overdue, {d} failed ({d} total)\n", .{ counts.current, counts.lagging, counts.overdue, counts.failed, symbols.count() });
try stdout.print("\n {s:<14}{s:>7}{s:>8}{s:>5}{s:>8}\n", .{ "type", "fetched", "cached", "n/a", "failed" });
try printStatRow(stdout, "candles", stats.candles);
try printStatRow(stdout, "dividends", stats.dividends);
try printStatRow(stdout, "splits", stats.splits);
try printStatRow(stdout, "earnings", stats.earnings);
try printStatRow(stdout, "classification", stats.classification);
try printStatRow(stdout, "etf_metrics", stats.etf_metrics);
try printStatRow(stdout, "entity_facts", stats.entity_facts);
try stdout.print("\n", .{});
try printSymbolList(stdout, "failed:", failed_list.items);
try printSymbolList(stdout, "lagging:", lagging_list.items);
try printSymbolList(stdout, "overdue:", overdue_list.items);
try stdout.flush();
return refreshExit(fail_count, lag_count);
return code;
}
/// Map a refresh run's failure/lag counts to a process exit code:
@ -812,7 +966,7 @@ fn refresh(io: std.Io, allocator: std.mem.Allocator, environ: *const std.process
/// 1 - at least one hard failure (fetch error)
/// Hard failure dominates lag - if anything failed outright that's the
/// code the operator needs to act on.
fn refreshExit(fail_count: u32, lag_count: u32) u8 {
fn refreshExit(fail_count: usize, lag_count: usize) u8 {
if (fail_count > 0) return 1;
if (lag_count > 0) return 75;
return 0;
@ -977,3 +1131,37 @@ test "refreshExit: hard failure dominates, then lag, else clean" {
// A hard failure outranks lag.
try std.testing.expectEqual(@as(u8, 1), refreshExit(1, 5));
}
test "printStatRow aligns with the summary-table header" {
var hdr: std.Io.Writer.Allocating = .init(std.testing.allocator);
defer hdr.deinit();
try hdr.writer.print(" {s:<14}{s:>7}{s:>8}{s:>5}{s:>8}\n", .{ "type", "fetched", "cached", "n/a", "failed" });
const hdr_out = try hdr.toOwnedSlice();
defer std.testing.allocator.free(hdr_out);
var row: std.Io.Writer.Allocating = .init(std.testing.allocator);
defer row.deinit();
// Multi-digit values and the widest type name still fit the columns.
try printStatRow(&row.writer, "classification", .{ .fetched = 0, .cached = 31, .na = 14, .failed = 0 });
const row_out = try row.toOwnedSlice();
defer std.testing.allocator.free(row_out);
// Identical width specifiers => identical rendered length => columns line up.
try std.testing.expectEqual(hdr_out.len, row_out.len);
}
test "printSymbolList: empty shows (none), non-empty joins with commas" {
var a: std.Io.Writer.Allocating = .init(std.testing.allocator);
defer a.deinit();
try printSymbolList(&a.writer, "failed:", &.{});
const a_out = try a.toOwnedSlice();
defer std.testing.allocator.free(a_out);
try std.testing.expectEqualStrings(" failed: (none)\n", a_out);
var b: std.Io.Writer.Allocating = .init(std.testing.allocator);
defer b.deinit();
try printSymbolList(&b.writer, "lagging:", &.{ "NKE", "AMZN" });
const b_out = try b.toOwnedSlice();
defer std.testing.allocator.free(b_out);
try std.testing.expectEqualStrings(" lagging: NKE, AMZN\n", b_out);
}