add cusip to etf profile

This commit is contained in:
Emil Lerch 2026-06-16 16:57:27 -07:00
parent 415071b955
commit 867f9afb8c
Signed by: lobo
GPG key ID: A7B62D657EF764F8
3 changed files with 384 additions and 32 deletions

7
src/cache/store.zig vendored
View file

@ -1800,6 +1800,7 @@ pub const Store = struct {
errdefer {
for (holdings.items) |h| {
if (h.symbol) |s| allocator.free(s);
if (h.cusip) |c| allocator.free(c);
allocator.free(h.name);
}
holdings.deinit(allocator);
@ -1817,8 +1818,12 @@ pub const Store = struct {
},
.holding => |h| {
const duped_sym = if (h.symbol) |s| try allocator.dupe(u8, s) else null;
errdefer if (duped_sym) |s| allocator.free(s);
const duped_cusip = if (h.cusip) |c| try allocator.dupe(u8, c) else null;
errdefer if (duped_cusip) |c| allocator.free(c);
const duped_name = try allocator.dupe(u8, h.name);
try holdings.append(allocator, .{ .symbol = duped_sym, .name = duped_name, .weight = h.weight });
errdefer allocator.free(duped_name);
try holdings.append(allocator, .{ .symbol = duped_sym, .name = duped_name, .weight = h.weight, .cusip = duped_cusip });
},
}
}

View file

@ -5,6 +5,12 @@ pub const Holding = struct {
symbol: ?[]const u8 = null,
name: []const u8,
weight: f64,
/// CUSIP from the NPORT-P filing, when present. The join key for
/// look-through resolution: NPORT-P identifies holdings by CUSIP
/// far more reliably than by ticker, so `exposure` resolves this
/// to a ticker (via `cusip_tickers.srf` / OpenFIGI) to match a
/// holding against a directly-held position.
cusip: ?[]const u8 = null,
};
/// Sector allocation in an ETF.
@ -70,6 +76,7 @@ pub const EtfProfile = struct {
if (self.holdings) |h| {
for (h) |holding| {
if (holding.symbol) |s| allocator.free(s);
if (holding.cusip) |c| allocator.free(c);
allocator.free(holding.name);
}
allocator.free(h);

View file

@ -1006,6 +1006,7 @@ pub const DataService = struct {
for (holdings_buf.items) |h| {
self.allocator.free(h.name);
if (h.symbol) |s| self.allocator.free(s);
if (h.cusip) |c| self.allocator.free(c);
}
holdings_buf.deinit(self.allocator);
}
@ -1026,10 +1027,19 @@ pub const DataService = struct {
try self.allocator.dupe(u8, t)
else
null;
errdefer if (sym_dup) |s| self.allocator.free(s);
const cusip_dup: ?[]const u8 = if (h.cusip) |c|
try self.allocator.dupe(u8, c)
else
null;
errdefer if (cusip_dup) |c| self.allocator.free(c);
const name_dup = try self.allocator.dupe(u8, h.name);
errdefer self.allocator.free(name_dup);
try holdings_buf.append(self.allocator, .{
.symbol = sym_dup,
.name = try self.allocator.dupe(u8, h.name),
.name = name_dup,
.weight = h.pct_of_portfolio / 100.0,
.cusip = cusip_dup,
});
},
};
@ -2479,46 +2489,49 @@ pub const DataService = struct {
return result;
}
/// Append a CUSIP->ticker mapping to the cache file.
/// Append CUSIP->ticker mappings to `cusip_tickers.srf`, skipping
/// any whose CUSIP is already on disk and any duplicates within
/// `entries`. One read + one atomic write regardless of batch size.
///
/// Implemented as read-append-atomic-write (rather than a direct
/// open-for-append) so a concurrent reader never sees a file with a
/// valid header plus partial trailing record. See `cache/store.zig
/// appendRaw` for the same pattern and rationale.
///
/// Dedups: if the CUSIP is already cached, this is a no-op. That
/// keeps the file from accumulating duplicate rows when the same
/// CUSIP is looked up repeatedly (the historical bug the writer
/// never checked the file before appending).
pub fn cacheCusipTicker(self: *DataService, cusip: []const u8, ticker: []const u8) void {
// Dedup against what's already cached.
/// Read-append-atomic-write (rather than open-for-append) so a
/// concurrent reader never sees a valid header plus a partial
/// trailing record see `cache/store.zig appendRaw` for the same
/// pattern and rationale. `#!srfv1` directives are emitted only
/// when the file is being created.
fn appendCusipEntries(self: *DataService, entries: []const CusipEntry) void {
if (entries.len == 0) return;
// One load gives us both the dedup set and the existing bytes
// to concat (`backing`). Missing/empty file empty map + empty
// backing directives emitted below.
var existing_map = self.loadCusipTickerMap(self.allocator);
defer existing_map.deinit();
if (existing_map.contains(cusip)) return;
const existing = existing_map.backing;
// Keep only entries new to the file and unique within the batch.
var seen = std.StringHashMap(void).init(self.allocator);
defer seen.deinit();
var to_write: std.ArrayList(CusipEntry) = .empty;
defer to_write.deinit(self.allocator);
for (entries) |e| {
if (e.cusip.len == 0 or e.ticker.len == 0) continue;
if (existing_map.contains(e.cusip)) continue;
const gop = seen.getOrPut(e.cusip) catch continue;
if (gop.found_existing) continue;
to_write.append(self.allocator, e) catch continue;
}
if (to_write.items.len == 0) return;
const path = std.fs.path.join(self.allocator, &.{ self.config.cache_dir, "cusip_tickers.srf" }) catch return;
defer self.allocator.free(path);
// Ensure cache dir exists
if (std.fs.path.dirnamePosix(path)) |dir| {
std.Io.Dir.cwd().createDirPath(self.io, dir) catch |err| log.warn("audit-log createDirPath({s}): {t}", .{ dir, err });
std.Io.Dir.cwd().createDirPath(self.io, dir) catch |err| log.warn("cusip-cache createDirPath({s}): {t}", .{ dir, err });
}
// Read existing cache if present.
const existing = std.Io.Dir.cwd().readFileAlloc(self.io, path, self.allocator, .limited(4 * 1024 * 1024)) catch |err| switch (err) {
error.FileNotFound => @as([]u8, &.{}),
else => return,
};
const owns_existing = existing.len > 0;
defer if (owns_existing) self.allocator.free(existing);
// Serialize the new entry (with `#!srfv1` directives only if the
// cache file doesn't exist yet).
const emit_directives = !owns_existing;
const entry = [_]CusipEntry{.{ .cusip = cusip, .ticker = ticker }};
const emit_directives = existing.len == 0;
var aw: std.Io.Writer.Allocating = .init(self.allocator);
defer aw.deinit();
aw.writer.print("{f}", .{srf.fmt(CusipEntry, &entry, .{ .emit_directives = emit_directives })}) catch return;
aw.writer.print("{f}", .{srf.fmt(CusipEntry, to_write.items, .{ .emit_directives = emit_directives })}) catch return;
const encoded = aw.writer.buffered();
if (encoded.len == 0) return;
@ -2528,7 +2541,184 @@ pub const DataService = struct {
@memcpy(combined[0..existing.len], existing);
@memcpy(combined[existing.len..], encoded);
atomic.writeFileAtomic(self.io, self.allocator, path, combined) catch |err| log.warn("audit-log writeFileAtomic({s}): {t}", .{ path, err });
atomic.writeFileAtomic(self.io, self.allocator, path, combined) catch |err| log.warn("cusip-cache writeFileAtomic({s}): {t}", .{ path, err });
}
/// Append a single CUSIP->ticker mapping to the cache file
/// (dedup-aware). Thin wrapper over `appendCusipEntries`; the
/// `lookup` command's single-CUSIP path.
pub fn cacheCusipTicker(self: *DataService, cusip: []const u8, ticker: []const u8) void {
self.appendCusipEntries(&.{.{ .cusip = cusip, .ticker = ticker }});
}
/// Resolve a set of CUSIPs to tickers via the three-tier cascade,
/// persisting newly-learned mappings to `cusip_tickers.srf` (union
/// policy: the local file accumulates everything it ever learns and
/// converges toward the shared server set).
///
/// Tiers, cheapest first:
/// L1 local `cusip_tickers.srf` (always; no network)
/// L2 server `GET /cusips` whole-file sync (if ZFIN_SERVER set)
/// L3 OpenFIGI batch lookup (whatever still misses)
///
/// Best-effort: network failures degrade to "fewer entries
/// resolved" rather than erroring. The returned `CusipTickerMap` is
/// a zero-copy view over the (possibly just-rewritten) local file
/// and covers every CUSIP any tier could resolve. Callers resolve
/// forward-per-holding: look up each holding's CUSIP against it,
/// which sidesteps the "do I have every CUSIP for this ticker?"
/// completeness problem entirely.
///
/// Empty/duplicate CUSIPs in `cusips` are ignored. The caller owns
/// the returned map (`deinit`); pass a scratch allocator to scope
/// it to a single command invocation.
pub fn resolveCusips(self: *DataService, allocator: std.mem.Allocator, cusips: []const []const u8) CusipTickerMap {
var result = self.loadCusipTickerMap(allocator);
// Fast path: everything already in L1 no scratch, no network,
// no rewrite. This is the warm-cache common case.
if (!anyMissing(result, cusips)) return result;
// Scratch arena for minted entries; decouples their lifetime
// from the server body / OpenFIGI result buffers freed below.
var scratch = std.heap.ArenaAllocator.init(self.allocator);
defer scratch.deinit();
const sa = scratch.allocator();
var minted = std.StringHashMap([]const u8).init(sa); // cusip -> ticker
// L2: server whole-file sync. Degrades to no-op until the
// `GET /cusips` route exists (a 404 surfaces as NotFound from
// client.get); when it lands it's purely additive no change
// here. The server is expected to serve the file via its
// existing `handleStaticSrfFile` machinery (same shape as
// `/_edgar/tickers_funds`).
if (self.config.server_url) |server_url| {
if (self.fetchServerCusips(server_url)) |body| {
defer self.allocator.free(body);
mergeCusipBody(sa, &minted, result, body);
}
}
// L3: OpenFIGI for whatever still misses.
self.mintMissingViaOpenFigi(sa, &minted, result, cusips);
if (minted.count() == 0) return result; // nothing new learned
// Persist the union, then reload so the returned map is a clean
// single-buffer zero-copy view over the updated file.
var ents: std.ArrayList(CusipEntry) = .empty;
// Reserve up front so the collection loop is infallible. On OOM
// (vanishingly unlikely for a small list), skip persistence and
// return the L1 view some CUSIPs stay unresolved this run
// rather than erroring.
ents.ensureTotalCapacity(sa, minted.count()) catch return result;
var mit = minted.iterator();
while (mit.next()) |kv| ents.appendAssumeCapacity(.{ .cusip = kv.key_ptr.*, .ticker = kv.value_ptr.* });
self.appendCusipEntries(ents.items);
result.deinit();
return self.loadCusipTickerMap(allocator);
}
/// True if any non-empty CUSIP in `cusips` is absent from `map`.
fn anyMissing(map: CusipTickerMap, cusips: []const []const u8) bool {
for (cusips) |c| {
if (c.len == 0) continue;
if (!map.contains(c)) return true;
}
return false;
}
/// Merge a CUSIP->ticker SRF body (as served by `GET /cusips`) into
/// `out`, skipping any CUSIP already present in `have` or `out`.
/// Strings are duped into `arena`. Pure with respect to I/O, so it's
/// unit-tested directly with fixture bytes (the live L2 path can't
/// be exercised until the server route exists).
fn mergeCusipBody(arena: std.mem.Allocator, out: *std.StringHashMap([]const u8), have: CusipTickerMap, body: []const u8) void {
var reader = std.Io.Reader.fixed(body);
var it = srf.iterator(&reader, arena, .{ .parse_allocator = .none }) catch return;
defer it.deinit();
while (it.next() catch return) |fields| {
const e = fields.to(CusipEntry, .{}) catch continue;
if (e.cusip.len == 0 or e.ticker.len == 0) continue;
if (have.contains(e.cusip) or out.contains(e.cusip)) continue;
const kc = arena.dupe(u8, e.cusip) catch continue;
const vc = arena.dupe(u8, e.ticker) catch continue;
out.put(kc, vc) catch continue;
}
}
/// L2 seam: fetch the whole CUSIP->ticker map from the server via
/// `GET {server}/cusips`. Returns the raw SRF body (caller frees
/// with `self.allocator`) or null on any failure. Best-effort: no
/// retry and no torn-body archival (this is a shared reference
/// file, not per-symbol cache) a bad/absent response just
/// degrades to the OpenFIGI tier.
fn fetchServerCusips(self: *DataService, server_url: []const u8) ?[]u8 {
const url = std.fmt.allocPrint(self.allocator, "{s}/cusips", .{server_url}) catch return null;
defer self.allocator.free(url);
var client = http.Client.init(self.io, self.allocator);
defer client.deinit();
var response = client.get(url) catch |err| {
log.debug("cusips server sync failed: {s}", .{@errorName(err)});
return null;
};
defer response.deinit();
if (!cache.Store.looksCompleteSrf(response.body)) {
log.debug("cusips server response not complete SRF ({d} bytes) — ignoring", .{response.body.len});
return null;
}
return self.allocator.dupe(u8, response.body) catch null;
}
/// L3: resolve still-missing CUSIPs through OpenFIGI (batched 100
/// per request, the API's job limit), recording hits into `out`
/// (duped into `arena`). De-dups the lookup set against `have`,
/// `out`, and itself. Best-effort: a failed batch logs and is
/// skipped; remaining batches still run.
fn mintMissingViaOpenFigi(self: *DataService, arena: std.mem.Allocator, out: *std.StringHashMap([]const u8), have: CusipTickerMap, cusips: []const []const u8) void {
var seen = std.StringHashMap(void).init(arena);
var to_lookup: std.ArrayList([]const u8) = .empty;
for (cusips) |c| {
if (c.len == 0) continue;
if (have.contains(c) or out.contains(c)) continue;
const gop = seen.getOrPut(c) catch continue;
if (gop.found_existing) continue;
to_lookup.append(arena, c) catch continue;
}
if (to_lookup.items.len == 0) return;
const batch_size = 100; // OpenFIGI accepts up to 100 jobs/request.
var start: usize = 0;
while (start < to_lookup.items.len) : (start += batch_size) {
const end = @min(start + batch_size, to_lookup.items.len);
const batch = to_lookup.items[start..end];
const figi = self.lookupCusips(batch) catch |err| {
log.warn("resolveCusips: OpenFIGI lookup of {d} CUSIP(s) failed: {s}", .{ batch.len, @errorName(err) });
continue;
};
defer {
for (figi) |r| {
if (r.ticker) |t| self.allocator.free(t);
if (r.name) |n| self.allocator.free(n);
if (r.security_type) |s| self.allocator.free(s);
}
self.allocator.free(figi);
}
// Results are parallel to `batch` (same length + order).
for (figi, 0..) |r, i| {
if (!r.found) continue;
const ticker = r.ticker orelse continue;
const kc = arena.dupe(u8, batch[i]) catch continue;
const vc = arena.dupe(u8, ticker) catch continue;
out.put(kc, vc) catch continue;
}
}
}
// Utility
@ -3972,3 +4162,153 @@ test "loadCusipTickerMap: first occurrence wins on duplicate rows" {
try std.testing.expectEqual(@as(usize, 1), map.count());
try std.testing.expectEqualStrings("AAA", map.get("111111111").?);
}
// CUSIP resolution cascade (resolveCusips / appendCusipEntries)
test "appendCusipEntries: batches, dedups vs file and within batch" {
const allocator = std.testing.allocator;
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
defer allocator.free(dir_path);
var svc = DataService.init(io, allocator, Config{ .cache_dir = dir_path });
defer svc.deinit();
// Seed one entry on disk.
svc.cacheCusipTicker("111111111", "AAA");
// Batch: 111 already on disk (skip), 222 + 333 new, 222 repeated
// within the batch (skip the second).
const batch = [_]DataService.CusipEntry{
.{ .cusip = "111111111", .ticker = "ZZZ" },
.{ .cusip = "222222222", .ticker = "BBB" },
.{ .cusip = "333333333", .ticker = "CCC" },
.{ .cusip = "222222222", .ticker = "BBB" },
};
svc.appendCusipEntries(batch[0..]);
var map = svc.loadCusipTickerMap(allocator);
defer map.deinit();
try std.testing.expectEqual(@as(u32, 3), map.count());
try std.testing.expectEqualStrings("AAA", map.get("111111111").?); // file wins
try std.testing.expectEqualStrings("BBB", map.get("222222222").?);
try std.testing.expectEqualStrings("CCC", map.get("333333333").?);
// Physically exactly 3 data rows (plus the directive header).
const path = try std.fs.path.join(allocator, &.{ dir_path, "cusip_tickers.srf" });
defer allocator.free(path);
const data = try std.Io.Dir.cwd().readFileAlloc(io, path, allocator, .limited(64 * 1024));
defer allocator.free(data);
var rows: usize = 0;
var lines = std.mem.splitScalar(u8, data, '\n');
while (lines.next()) |line| {
if (std.mem.indexOf(u8, line, "cusip::") != null) rows += 1;
}
try std.testing.expectEqual(@as(usize, 3), rows);
}
test "mergeCusipBody: merges new entries, skips those already in `have` or the batch" {
const allocator = std.testing.allocator;
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
defer allocator.free(dir_path);
var svc = DataService.init(io, allocator, Config{ .cache_dir = dir_path });
defer svc.deinit();
// `have` already maps 111 -> AAA (local is authoritative).
svc.cacheCusipTicker("111111111", "AAA");
var have = svc.loadCusipTickerMap(allocator);
defer have.deinit();
var arena = std.heap.ArenaAllocator.init(allocator);
defer arena.deinit();
var out = std.StringHashMap([]const u8).init(arena.allocator());
// Server body: 111 conflicts with `have` (ignored), 222 + 333 are
// new, 222 repeated (the second is skipped).
const body =
"#!srfv1\n" ++
"cusip::111111111,ticker::ZZZ\n" ++
"cusip::222222222,ticker::BBB\n" ++
"cusip::333333333,ticker::CCC\n" ++
"cusip::222222222,ticker::BBB\n";
DataService.mergeCusipBody(arena.allocator(), &out, have, body);
try std.testing.expectEqual(@as(u32, 2), out.count());
try std.testing.expectEqualStrings("BBB", out.get("222222222").?);
try std.testing.expectEqualStrings("CCC", out.get("333333333").?);
try std.testing.expect(out.get("111111111") == null); // have wins
}
test "resolveCusips: warm cache resolves without touching the network" {
const allocator = std.testing.allocator;
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
defer allocator.free(dir_path);
var svc = DataService.init(io, allocator, Config{ .cache_dir = dir_path });
defer svc.deinit();
// No server_url; assert L2/L3 are never reached for an all-hit set.
svc.panic_on_network_attempt = true;
svc.cacheCusipTicker("111111111", "AAA");
svc.cacheCusipTicker("222222222", "BBB");
// Duplicate + empty CUSIP in the request must be tolerated.
const want = [_][]const u8{ "111111111", "222222222", "111111111", "" };
var map = svc.resolveCusips(allocator, want[0..]);
defer map.deinit();
try std.testing.expectEqualStrings("AAA", map.get("111111111").?);
try std.testing.expectEqualStrings("BBB", map.get("222222222").?);
}
test "getEtfProfile: carries holding CUSIP through the model boundary" {
const allocator = std.testing.allocator;
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
defer allocator.free(dir_path);
var svc = DataService.init(io, allocator, Config{ .cache_dir = dir_path });
defer svc.deinit();
// Seed etf_metrics: a profile row + a holding carrying a CUSIP but
// no ticker (the common NPORT-P shape placeholder values only).
var etf_records = [_]Edgar.EtfMetricRecord{
.{ .profile = .{
.symbol = try allocator.dupe(u8, "TESTF"),
.series_name = try allocator.dupe(u8, "Test Fund"),
.cik = try allocator.dupe(u8, "0000000002"),
.as_of = try allocator.dupe(u8, "2026-06-01"),
.source = try allocator.dupe(u8, "edgar"),
} },
.{ .holding = .{
.symbol = try allocator.dupe(u8, "TESTF"),
.name = try allocator.dupe(u8, "Placeholder Corp"),
.cusip = try allocator.dupe(u8, "999999999"),
.pct_of_portfolio = 12.5,
.as_of = try allocator.dupe(u8, "2026-06-01"),
.source = try allocator.dupe(u8, "edgar"),
} },
};
defer for (etf_records) |r| r.deinit(allocator);
var s = svc.store();
s.write(Edgar.EtfMetricRecord, "TESTF", etf_records[0..], cache.DataType.etf_metrics.ttl());
svc.panic_on_network_attempt = true;
const result = try svc.getEtfProfile("TESTF", .{ .skip_network = true });
defer result.deinit();
const holdings = result.data.holdings orelse return error.NoHoldings;
try std.testing.expectEqual(@as(usize, 1), holdings.len);
try std.testing.expectEqualStrings("999999999", holdings[0].cusip orelse return error.NoCusip);
try std.testing.expect(holdings[0].symbol == null); // filing had no ticker
}