From 867f9afb8c521864f9fcbdabbd64afeacd05c992 Mon Sep 17 00:00:00 2001 From: Emil Lerch Date: Tue, 16 Jun 2026 16:57:27 -0700 Subject: [PATCH] add cusip to etf profile --- src/cache/store.zig | 7 +- src/models/etf_profile.zig | 7 + src/service.zig | 402 ++++++++++++++++++++++++++++++++++--- 3 files changed, 384 insertions(+), 32 deletions(-) diff --git a/src/cache/store.zig b/src/cache/store.zig index 7163fd5..8ca60fb 100644 --- a/src/cache/store.zig +++ b/src/cache/store.zig @@ -1800,6 +1800,7 @@ pub const Store = struct { errdefer { for (holdings.items) |h| { if (h.symbol) |s| allocator.free(s); + if (h.cusip) |c| allocator.free(c); allocator.free(h.name); } holdings.deinit(allocator); @@ -1817,8 +1818,12 @@ pub const Store = struct { }, .holding => |h| { const duped_sym = if (h.symbol) |s| try allocator.dupe(u8, s) else null; + errdefer if (duped_sym) |s| allocator.free(s); + const duped_cusip = if (h.cusip) |c| try allocator.dupe(u8, c) else null; + errdefer if (duped_cusip) |c| allocator.free(c); const duped_name = try allocator.dupe(u8, h.name); - try holdings.append(allocator, .{ .symbol = duped_sym, .name = duped_name, .weight = h.weight }); + errdefer allocator.free(duped_name); + try holdings.append(allocator, .{ .symbol = duped_sym, .name = duped_name, .weight = h.weight, .cusip = duped_cusip }); }, } } diff --git a/src/models/etf_profile.zig b/src/models/etf_profile.zig index 0359780..b3381a7 100644 --- a/src/models/etf_profile.zig +++ b/src/models/etf_profile.zig @@ -5,6 +5,12 @@ pub const Holding = struct { symbol: ?[]const u8 = null, name: []const u8, weight: f64, + /// CUSIP from the NPORT-P filing, when present. The join key for + /// look-through resolution: NPORT-P identifies holdings by CUSIP + /// far more reliably than by ticker, so `exposure` resolves this + /// to a ticker (via `cusip_tickers.srf` / OpenFIGI) to match a + /// holding against a directly-held position. + cusip: ?[]const u8 = null, }; /// Sector allocation in an ETF. @@ -70,6 +76,7 @@ pub const EtfProfile = struct { if (self.holdings) |h| { for (h) |holding| { if (holding.symbol) |s| allocator.free(s); + if (holding.cusip) |c| allocator.free(c); allocator.free(holding.name); } allocator.free(h); diff --git a/src/service.zig b/src/service.zig index 927f5b5..1a367fe 100644 --- a/src/service.zig +++ b/src/service.zig @@ -1006,6 +1006,7 @@ pub const DataService = struct { for (holdings_buf.items) |h| { self.allocator.free(h.name); if (h.symbol) |s| self.allocator.free(s); + if (h.cusip) |c| self.allocator.free(c); } holdings_buf.deinit(self.allocator); } @@ -1026,10 +1027,19 @@ pub const DataService = struct { try self.allocator.dupe(u8, t) else null; + errdefer if (sym_dup) |s| self.allocator.free(s); + const cusip_dup: ?[]const u8 = if (h.cusip) |c| + try self.allocator.dupe(u8, c) + else + null; + errdefer if (cusip_dup) |c| self.allocator.free(c); + const name_dup = try self.allocator.dupe(u8, h.name); + errdefer self.allocator.free(name_dup); try holdings_buf.append(self.allocator, .{ .symbol = sym_dup, - .name = try self.allocator.dupe(u8, h.name), + .name = name_dup, .weight = h.pct_of_portfolio / 100.0, + .cusip = cusip_dup, }); }, }; @@ -2479,46 +2489,49 @@ pub const DataService = struct { return result; } - /// Append a CUSIP->ticker mapping to the cache file. + /// Append CUSIP->ticker mappings to `cusip_tickers.srf`, skipping + /// any whose CUSIP is already on disk and any duplicates within + /// `entries`. One read + one atomic write regardless of batch size. /// - /// Implemented as read-append-atomic-write (rather than a direct - /// open-for-append) so a concurrent reader never sees a file with a - /// valid header plus partial trailing record. See `cache/store.zig - /// appendRaw` for the same pattern and rationale. - /// - /// Dedups: if the CUSIP is already cached, this is a no-op. That - /// keeps the file from accumulating duplicate rows when the same - /// CUSIP is looked up repeatedly (the historical bug — the writer - /// never checked the file before appending). - pub fn cacheCusipTicker(self: *DataService, cusip: []const u8, ticker: []const u8) void { - // Dedup against what's already cached. + /// Read-append-atomic-write (rather than open-for-append) so a + /// concurrent reader never sees a valid header plus a partial + /// trailing record — see `cache/store.zig appendRaw` for the same + /// pattern and rationale. `#!srfv1` directives are emitted only + /// when the file is being created. + fn appendCusipEntries(self: *DataService, entries: []const CusipEntry) void { + if (entries.len == 0) return; + + // One load gives us both the dedup set and the existing bytes + // to concat (`backing`). Missing/empty file → empty map + empty + // backing → directives emitted below. var existing_map = self.loadCusipTickerMap(self.allocator); defer existing_map.deinit(); - if (existing_map.contains(cusip)) return; + const existing = existing_map.backing; + + // Keep only entries new to the file and unique within the batch. + var seen = std.StringHashMap(void).init(self.allocator); + defer seen.deinit(); + var to_write: std.ArrayList(CusipEntry) = .empty; + defer to_write.deinit(self.allocator); + for (entries) |e| { + if (e.cusip.len == 0 or e.ticker.len == 0) continue; + if (existing_map.contains(e.cusip)) continue; + const gop = seen.getOrPut(e.cusip) catch continue; + if (gop.found_existing) continue; + to_write.append(self.allocator, e) catch continue; + } + if (to_write.items.len == 0) return; const path = std.fs.path.join(self.allocator, &.{ self.config.cache_dir, "cusip_tickers.srf" }) catch return; defer self.allocator.free(path); - - // Ensure cache dir exists if (std.fs.path.dirnamePosix(path)) |dir| { - std.Io.Dir.cwd().createDirPath(self.io, dir) catch |err| log.warn("audit-log createDirPath({s}): {t}", .{ dir, err }); + std.Io.Dir.cwd().createDirPath(self.io, dir) catch |err| log.warn("cusip-cache createDirPath({s}): {t}", .{ dir, err }); } - // Read existing cache if present. - const existing = std.Io.Dir.cwd().readFileAlloc(self.io, path, self.allocator, .limited(4 * 1024 * 1024)) catch |err| switch (err) { - error.FileNotFound => @as([]u8, &.{}), - else => return, - }; - const owns_existing = existing.len > 0; - defer if (owns_existing) self.allocator.free(existing); - - // Serialize the new entry (with `#!srfv1` directives only if the - // cache file doesn't exist yet). - const emit_directives = !owns_existing; - const entry = [_]CusipEntry{.{ .cusip = cusip, .ticker = ticker }}; + const emit_directives = existing.len == 0; var aw: std.Io.Writer.Allocating = .init(self.allocator); defer aw.deinit(); - aw.writer.print("{f}", .{srf.fmt(CusipEntry, &entry, .{ .emit_directives = emit_directives })}) catch return; + aw.writer.print("{f}", .{srf.fmt(CusipEntry, to_write.items, .{ .emit_directives = emit_directives })}) catch return; const encoded = aw.writer.buffered(); if (encoded.len == 0) return; @@ -2528,7 +2541,184 @@ pub const DataService = struct { @memcpy(combined[0..existing.len], existing); @memcpy(combined[existing.len..], encoded); - atomic.writeFileAtomic(self.io, self.allocator, path, combined) catch |err| log.warn("audit-log writeFileAtomic({s}): {t}", .{ path, err }); + atomic.writeFileAtomic(self.io, self.allocator, path, combined) catch |err| log.warn("cusip-cache writeFileAtomic({s}): {t}", .{ path, err }); + } + + /// Append a single CUSIP->ticker mapping to the cache file + /// (dedup-aware). Thin wrapper over `appendCusipEntries`; the + /// `lookup` command's single-CUSIP path. + pub fn cacheCusipTicker(self: *DataService, cusip: []const u8, ticker: []const u8) void { + self.appendCusipEntries(&.{.{ .cusip = cusip, .ticker = ticker }}); + } + + /// Resolve a set of CUSIPs to tickers via the three-tier cascade, + /// persisting newly-learned mappings to `cusip_tickers.srf` (union + /// policy: the local file accumulates everything it ever learns and + /// converges toward the shared server set). + /// + /// Tiers, cheapest first: + /// L1 local `cusip_tickers.srf` (always; no network) + /// L2 server `GET /cusips` whole-file sync (if ZFIN_SERVER set) + /// L3 OpenFIGI batch lookup (whatever still misses) + /// + /// Best-effort: network failures degrade to "fewer entries + /// resolved" rather than erroring. The returned `CusipTickerMap` is + /// a zero-copy view over the (possibly just-rewritten) local file + /// and covers every CUSIP any tier could resolve. Callers resolve + /// forward-per-holding: look up each holding's CUSIP against it, + /// which sidesteps the "do I have every CUSIP for this ticker?" + /// completeness problem entirely. + /// + /// Empty/duplicate CUSIPs in `cusips` are ignored. The caller owns + /// the returned map (`deinit`); pass a scratch allocator to scope + /// it to a single command invocation. + pub fn resolveCusips(self: *DataService, allocator: std.mem.Allocator, cusips: []const []const u8) CusipTickerMap { + var result = self.loadCusipTickerMap(allocator); + + // Fast path: everything already in L1 → no scratch, no network, + // no rewrite. This is the warm-cache common case. + if (!anyMissing(result, cusips)) return result; + + // Scratch arena for minted entries; decouples their lifetime + // from the server body / OpenFIGI result buffers freed below. + var scratch = std.heap.ArenaAllocator.init(self.allocator); + defer scratch.deinit(); + const sa = scratch.allocator(); + var minted = std.StringHashMap([]const u8).init(sa); // cusip -> ticker + + // L2: server whole-file sync. Degrades to no-op until the + // `GET /cusips` route exists (a 404 surfaces as NotFound from + // client.get); when it lands it's purely additive — no change + // here. The server is expected to serve the file via its + // existing `handleStaticSrfFile` machinery (same shape as + // `/_edgar/tickers_funds`). + if (self.config.server_url) |server_url| { + if (self.fetchServerCusips(server_url)) |body| { + defer self.allocator.free(body); + mergeCusipBody(sa, &minted, result, body); + } + } + + // L3: OpenFIGI for whatever still misses. + self.mintMissingViaOpenFigi(sa, &minted, result, cusips); + + if (minted.count() == 0) return result; // nothing new learned + + // Persist the union, then reload so the returned map is a clean + // single-buffer zero-copy view over the updated file. + var ents: std.ArrayList(CusipEntry) = .empty; + // Reserve up front so the collection loop is infallible. On OOM + // (vanishingly unlikely for a small list), skip persistence and + // return the L1 view — some CUSIPs stay unresolved this run + // rather than erroring. + ents.ensureTotalCapacity(sa, minted.count()) catch return result; + var mit = minted.iterator(); + while (mit.next()) |kv| ents.appendAssumeCapacity(.{ .cusip = kv.key_ptr.*, .ticker = kv.value_ptr.* }); + self.appendCusipEntries(ents.items); + + result.deinit(); + return self.loadCusipTickerMap(allocator); + } + + /// True if any non-empty CUSIP in `cusips` is absent from `map`. + fn anyMissing(map: CusipTickerMap, cusips: []const []const u8) bool { + for (cusips) |c| { + if (c.len == 0) continue; + if (!map.contains(c)) return true; + } + return false; + } + + /// Merge a CUSIP->ticker SRF body (as served by `GET /cusips`) into + /// `out`, skipping any CUSIP already present in `have` or `out`. + /// Strings are duped into `arena`. Pure with respect to I/O, so it's + /// unit-tested directly with fixture bytes (the live L2 path can't + /// be exercised until the server route exists). + fn mergeCusipBody(arena: std.mem.Allocator, out: *std.StringHashMap([]const u8), have: CusipTickerMap, body: []const u8) void { + var reader = std.Io.Reader.fixed(body); + var it = srf.iterator(&reader, arena, .{ .parse_allocator = .none }) catch return; + defer it.deinit(); + while (it.next() catch return) |fields| { + const e = fields.to(CusipEntry, .{}) catch continue; + if (e.cusip.len == 0 or e.ticker.len == 0) continue; + if (have.contains(e.cusip) or out.contains(e.cusip)) continue; + const kc = arena.dupe(u8, e.cusip) catch continue; + const vc = arena.dupe(u8, e.ticker) catch continue; + out.put(kc, vc) catch continue; + } + } + + /// L2 seam: fetch the whole CUSIP->ticker map from the server via + /// `GET {server}/cusips`. Returns the raw SRF body (caller frees + /// with `self.allocator`) or null on any failure. Best-effort: no + /// retry and no torn-body archival (this is a shared reference + /// file, not per-symbol cache) — a bad/absent response just + /// degrades to the OpenFIGI tier. + fn fetchServerCusips(self: *DataService, server_url: []const u8) ?[]u8 { + const url = std.fmt.allocPrint(self.allocator, "{s}/cusips", .{server_url}) catch return null; + defer self.allocator.free(url); + + var client = http.Client.init(self.io, self.allocator); + defer client.deinit(); + + var response = client.get(url) catch |err| { + log.debug("cusips server sync failed: {s}", .{@errorName(err)}); + return null; + }; + defer response.deinit(); + + if (!cache.Store.looksCompleteSrf(response.body)) { + log.debug("cusips server response not complete SRF ({d} bytes) — ignoring", .{response.body.len}); + return null; + } + return self.allocator.dupe(u8, response.body) catch null; + } + + /// L3: resolve still-missing CUSIPs through OpenFIGI (batched 100 + /// per request, the API's job limit), recording hits into `out` + /// (duped into `arena`). De-dups the lookup set against `have`, + /// `out`, and itself. Best-effort: a failed batch logs and is + /// skipped; remaining batches still run. + fn mintMissingViaOpenFigi(self: *DataService, arena: std.mem.Allocator, out: *std.StringHashMap([]const u8), have: CusipTickerMap, cusips: []const []const u8) void { + var seen = std.StringHashMap(void).init(arena); + var to_lookup: std.ArrayList([]const u8) = .empty; + for (cusips) |c| { + if (c.len == 0) continue; + if (have.contains(c) or out.contains(c)) continue; + const gop = seen.getOrPut(c) catch continue; + if (gop.found_existing) continue; + to_lookup.append(arena, c) catch continue; + } + if (to_lookup.items.len == 0) return; + + const batch_size = 100; // OpenFIGI accepts up to 100 jobs/request. + var start: usize = 0; + while (start < to_lookup.items.len) : (start += batch_size) { + const end = @min(start + batch_size, to_lookup.items.len); + const batch = to_lookup.items[start..end]; + + const figi = self.lookupCusips(batch) catch |err| { + log.warn("resolveCusips: OpenFIGI lookup of {d} CUSIP(s) failed: {s}", .{ batch.len, @errorName(err) }); + continue; + }; + defer { + for (figi) |r| { + if (r.ticker) |t| self.allocator.free(t); + if (r.name) |n| self.allocator.free(n); + if (r.security_type) |s| self.allocator.free(s); + } + self.allocator.free(figi); + } + + // Results are parallel to `batch` (same length + order). + for (figi, 0..) |r, i| { + if (!r.found) continue; + const ticker = r.ticker orelse continue; + const kc = arena.dupe(u8, batch[i]) catch continue; + const vc = arena.dupe(u8, ticker) catch continue; + out.put(kc, vc) catch continue; + } + } } // ── Utility ────────────────────────────────────────────────── @@ -3972,3 +4162,153 @@ test "loadCusipTickerMap: first occurrence wins on duplicate rows" { try std.testing.expectEqual(@as(usize, 1), map.count()); try std.testing.expectEqualStrings("AAA", map.get("111111111").?); } + +// ── CUSIP resolution cascade (resolveCusips / appendCusipEntries) ── + +test "appendCusipEntries: batches, dedups vs file and within batch" { + const allocator = std.testing.allocator; + const io = std.testing.io; + var tmp = std.testing.tmpDir(.{}); + defer tmp.cleanup(); + const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator); + defer allocator.free(dir_path); + + var svc = DataService.init(io, allocator, Config{ .cache_dir = dir_path }); + defer svc.deinit(); + + // Seed one entry on disk. + svc.cacheCusipTicker("111111111", "AAA"); + + // Batch: 111 already on disk (skip), 222 + 333 new, 222 repeated + // within the batch (skip the second). + const batch = [_]DataService.CusipEntry{ + .{ .cusip = "111111111", .ticker = "ZZZ" }, + .{ .cusip = "222222222", .ticker = "BBB" }, + .{ .cusip = "333333333", .ticker = "CCC" }, + .{ .cusip = "222222222", .ticker = "BBB" }, + }; + svc.appendCusipEntries(batch[0..]); + + var map = svc.loadCusipTickerMap(allocator); + defer map.deinit(); + try std.testing.expectEqual(@as(u32, 3), map.count()); + try std.testing.expectEqualStrings("AAA", map.get("111111111").?); // file wins + try std.testing.expectEqualStrings("BBB", map.get("222222222").?); + try std.testing.expectEqualStrings("CCC", map.get("333333333").?); + + // Physically exactly 3 data rows (plus the directive header). + const path = try std.fs.path.join(allocator, &.{ dir_path, "cusip_tickers.srf" }); + defer allocator.free(path); + const data = try std.Io.Dir.cwd().readFileAlloc(io, path, allocator, .limited(64 * 1024)); + defer allocator.free(data); + var rows: usize = 0; + var lines = std.mem.splitScalar(u8, data, '\n'); + while (lines.next()) |line| { + if (std.mem.indexOf(u8, line, "cusip::") != null) rows += 1; + } + try std.testing.expectEqual(@as(usize, 3), rows); +} + +test "mergeCusipBody: merges new entries, skips those already in `have` or the batch" { + const allocator = std.testing.allocator; + const io = std.testing.io; + var tmp = std.testing.tmpDir(.{}); + defer tmp.cleanup(); + const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator); + defer allocator.free(dir_path); + + var svc = DataService.init(io, allocator, Config{ .cache_dir = dir_path }); + defer svc.deinit(); + + // `have` already maps 111 -> AAA (local is authoritative). + svc.cacheCusipTicker("111111111", "AAA"); + var have = svc.loadCusipTickerMap(allocator); + defer have.deinit(); + + var arena = std.heap.ArenaAllocator.init(allocator); + defer arena.deinit(); + var out = std.StringHashMap([]const u8).init(arena.allocator()); + + // Server body: 111 conflicts with `have` (ignored), 222 + 333 are + // new, 222 repeated (the second is skipped). + const body = + "#!srfv1\n" ++ + "cusip::111111111,ticker::ZZZ\n" ++ + "cusip::222222222,ticker::BBB\n" ++ + "cusip::333333333,ticker::CCC\n" ++ + "cusip::222222222,ticker::BBB\n"; + DataService.mergeCusipBody(arena.allocator(), &out, have, body); + + try std.testing.expectEqual(@as(u32, 2), out.count()); + try std.testing.expectEqualStrings("BBB", out.get("222222222").?); + try std.testing.expectEqualStrings("CCC", out.get("333333333").?); + try std.testing.expect(out.get("111111111") == null); // have wins +} + +test "resolveCusips: warm cache resolves without touching the network" { + const allocator = std.testing.allocator; + const io = std.testing.io; + var tmp = std.testing.tmpDir(.{}); + defer tmp.cleanup(); + const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator); + defer allocator.free(dir_path); + + var svc = DataService.init(io, allocator, Config{ .cache_dir = dir_path }); + defer svc.deinit(); + // No server_url; assert L2/L3 are never reached for an all-hit set. + svc.panic_on_network_attempt = true; + + svc.cacheCusipTicker("111111111", "AAA"); + svc.cacheCusipTicker("222222222", "BBB"); + + // Duplicate + empty CUSIP in the request must be tolerated. + const want = [_][]const u8{ "111111111", "222222222", "111111111", "" }; + var map = svc.resolveCusips(allocator, want[0..]); + defer map.deinit(); + try std.testing.expectEqualStrings("AAA", map.get("111111111").?); + try std.testing.expectEqualStrings("BBB", map.get("222222222").?); +} + +test "getEtfProfile: carries holding CUSIP through the model boundary" { + const allocator = std.testing.allocator; + const io = std.testing.io; + var tmp = std.testing.tmpDir(.{}); + defer tmp.cleanup(); + const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator); + defer allocator.free(dir_path); + + var svc = DataService.init(io, allocator, Config{ .cache_dir = dir_path }); + defer svc.deinit(); + + // Seed etf_metrics: a profile row + a holding carrying a CUSIP but + // no ticker (the common NPORT-P shape — placeholder values only). + var etf_records = [_]Edgar.EtfMetricRecord{ + .{ .profile = .{ + .symbol = try allocator.dupe(u8, "TESTF"), + .series_name = try allocator.dupe(u8, "Test Fund"), + .cik = try allocator.dupe(u8, "0000000002"), + .as_of = try allocator.dupe(u8, "2026-06-01"), + .source = try allocator.dupe(u8, "edgar"), + } }, + .{ .holding = .{ + .symbol = try allocator.dupe(u8, "TESTF"), + .name = try allocator.dupe(u8, "Placeholder Corp"), + .cusip = try allocator.dupe(u8, "999999999"), + .pct_of_portfolio = 12.5, + .as_of = try allocator.dupe(u8, "2026-06-01"), + .source = try allocator.dupe(u8, "edgar"), + } }, + }; + defer for (etf_records) |r| r.deinit(allocator); + var s = svc.store(); + s.write(Edgar.EtfMetricRecord, "TESTF", etf_records[0..], cache.DataType.etf_metrics.ttl()); + + svc.panic_on_network_attempt = true; + const result = try svc.getEtfProfile("TESTF", .{ .skip_network = true }); + defer result.deinit(); + + const holdings = result.data.holdings orelse return error.NoHoldings; + try std.testing.expectEqual(@as(usize, 1), holdings.len); + try std.testing.expectEqualStrings("999999999", holdings[0].cusip orelse return error.NoCusip); + try std.testing.expect(holdings[0].symbol == null); // filing had no ticker +}