From 7eba504ed8cafd31a50baa75c7bf1f3d1113c4f8 Mon Sep 17 00:00:00 2001 From: Emil Lerch Date: Tue, 10 Mar 2026 08:37:00 -0700 Subject: [PATCH] retry logic on 429 --- src/cache/store.zig | 27 +++- src/service.zig | 302 ++++++++++++++++++++++++++------------------ 2 files changed, 201 insertions(+), 128 deletions(-) diff --git a/src/cache/store.zig b/src/cache/store.zig index 6cb36bb..9311d3d 100644 --- a/src/cache/store.zig +++ b/src/cache/store.zig @@ -57,6 +57,17 @@ pub const DataType = enum { .meta => "meta.srf", }; } + + pub fn ttl(self: DataType) i64 { + return switch (self) { + .dividends => Ttl.dividends, + .splits => Ttl.splits, + .options => Ttl.options, + .earnings => Ttl.earnings, + .etf_profile => Ttl.etf_profile, + .candles_daily, .candles_meta, .meta => 0, + }; + } }; /// Persistent SRF-backed cache with per-symbol, per-data-type files. @@ -84,7 +95,7 @@ pub const Store = struct { // ── Generic typed API ──────────────────────────────────────── /// Map a model type to its cache DataType. - fn dataTypeFor(comptime T: type) DataType { + pub fn dataTypeFor(comptime T: type) DataType { return switch (T) { Candle => .candles_daily, Dividend => .dividends, @@ -97,7 +108,7 @@ pub const Store = struct { } /// The data payload for a given type: single struct for EtfProfile, slice for everything else. - fn DataFor(comptime T: type) type { + pub fn DataFor(comptime T: type) type { return if (T == EtfProfile) EtfProfile else []T; } @@ -121,6 +132,18 @@ pub const Store = struct { defer self.allocator.free(data); if (T == EtfProfile or T == OptionsChain) { + const is_negative = std.mem.eql(u8, data, negative_cache_content); + if (is_negative) { + if (freshness == .fresh_only) { + // Negative entries are always fresh — return empty data + if (T == EtfProfile) + return .{ .data = EtfProfile{ .symbol = "" }, .timestamp = std.time.timestamp() }; + if (T == OptionsChain) + return .{ .data = &.{}, .timestamp = std.time.timestamp() }; + } + return null; + } + var reader = std.Io.Reader.fixed(data); var it = srf.iterator(&reader, self.allocator, .{ .alloc_strings = false }) catch return null; defer it.deinit(); diff --git a/src/service.zig b/src/service.zig index 58c9e02..05f48e4 100644 --- a/src/service.zig +++ b/src/service.zig @@ -18,6 +18,7 @@ const Quote = @import("models/quote.zig").Quote; const EtfProfile = @import("models/etf_profile.zig").EtfProfile; const Config = @import("config.zig").Config; const cache = @import("cache/store.zig"); +const srf = @import("srf"); const TwelveData = @import("providers/twelvedata.zig").TwelveData; const Polygon = @import("providers/polygon.zig").Polygon; const Finnhub = @import("providers/finnhub.zig").Finnhub; @@ -47,6 +48,15 @@ pub const Source = enum { fetched, }; +/// Generic result type for all fetch operations: data payload + provenance metadata. +pub fn FetchResult(comptime T: type) type { + return struct { + data: cache.Store.DataFor(T), + source: Source, + timestamp: i64, + }; +} + // ── PostProcess callbacks ──────────────────────────────────── // These are passed to Store.read to handle type-specific // concerns: string duping (serialization plumbing) and domain transforms. @@ -95,40 +105,40 @@ pub const DataService = struct { if (self.av) |*av| av.deinit(); } - // ── Provider accessors ─────────────────────────────────────── + // ── Provider accessor ────────────────────────────────────────── - fn getTwelveData(self: *DataService) DataError!*TwelveData { - if (self.td) |*td| return td; - const key = self.config.twelvedata_key orelse return DataError.NoApiKey; - self.td = TwelveData.init(self.allocator, key); - return &self.td.?; + fn getProvider(self: *DataService, comptime T: type) DataError!*T { + const field_name = comptime providerField(T); + if (@field(self, field_name)) |*p| return p; + if (T == Cboe) { + // CBOE has no key + @field(self, field_name) = T.init(self.allocator); + } else { + // All we're doing here is lower casing the type name, then + // appending _key to it, so AlphaVantage -> alphavantage_key + const config_key = comptime blk: { + const full = @typeName(T); + var start: usize = 0; + for (full, 0..) |c, i| { + if (c == '.') start = i + 1; + } + const short = full[start..]; + var buf: [short.len + 4]u8 = undefined; + _ = std.ascii.lowerString(buf[0..short.len], short); + @memcpy(buf[short.len..][0..4], "_key"); + break :blk buf[0 .. short.len + 4]; + }; + const key = @field(self.config, config_key) orelse return DataError.NoApiKey; + @field(self, field_name) = T.init(self.allocator, key); + } + return &@field(self, field_name).?; } - fn getPolygon(self: *DataService) DataError!*Polygon { - if (self.pg) |*pg| return pg; - const key = self.config.polygon_key orelse return DataError.NoApiKey; - self.pg = Polygon.init(self.allocator, key); - return &self.pg.?; - } - - fn getFinnhub(self: *DataService) DataError!*Finnhub { - if (self.fh) |*fh| return fh; - const key = self.config.finnhub_key orelse return DataError.NoApiKey; - self.fh = Finnhub.init(self.allocator, key); - return &self.fh.?; - } - - fn getCboe(self: *DataService) *Cboe { - if (self.cboe) |*c| return c; - self.cboe = Cboe.init(self.allocator); - return &self.cboe.?; - } - - fn getAlphaVantage(self: *DataService) DataError!*AlphaVantage { - if (self.av) |*av| return av; - const key = self.config.alphavantage_key orelse return DataError.NoApiKey; - self.av = AlphaVantage.init(self.allocator, key); - return &self.av.?; + fn providerField(comptime T: type) []const u8 { + inline for (std.meta.fields(DataService)) |f| { + if (f.type == ?T) return f.name; + } + @compileError("unknown provider type"); } // ── Cache helper ───────────────────────────────────────────── @@ -137,6 +147,59 @@ pub const DataService = struct { return cache.Store.init(self.allocator, self.config.cache_dir); } + /// Generic fetch-or-cache for simple data types (dividends, splits, options). + /// Checks cache first; on miss, fetches from the appropriate provider, + /// writes to cache, and returns. On permanent fetch failure, writes a negative + /// cache entry. Rate limit failures are retried once. + fn fetchCached( + self: *DataService, + comptime T: type, + symbol: []const u8, + comptime postProcess: ?*const fn (*T, std.mem.Allocator) anyerror!void, + ) DataError!FetchResult(T) { + var s = self.store(); + const data_type = comptime cache.Store.dataTypeFor(T); + + if (s.read(T, symbol, postProcess, .fresh_only)) |cached| + return .{ .data = cached.data, .source = .cached, .timestamp = cached.timestamp }; + + const fetched = self.fetchFromProvider(T, symbol) catch |err| { + if (err == error.RateLimited) { + // Wait and retry once + self.rateLimitBackoff(); + const retried = self.fetchFromProvider(T, symbol) catch { + return DataError.FetchFailed; + }; + s.write(T, symbol, retried, data_type.ttl()); + return .{ .data = retried, .source = .fetched, .timestamp = std.time.timestamp() }; + } + s.writeNegative(symbol, data_type); + return DataError.FetchFailed; + }; + + s.write(T, symbol, fetched, data_type.ttl()); + return .{ .data = fetched, .source = .fetched, .timestamp = std.time.timestamp() }; + } + + /// Dispatch a fetch to the correct provider based on model type. + fn fetchFromProvider(self: *DataService, comptime T: type, symbol: []const u8) !cache.Store.DataFor(T) { + return switch (T) { + Dividend => { + var pg = try self.getProvider(Polygon); + return pg.fetchDividends(self.allocator, symbol, null, null); + }, + Split => { + var pg = try self.getProvider(Polygon); + return pg.fetchSplits(self.allocator, symbol); + }, + OptionsChain => { + var cboe = try self.getProvider(Cboe); + return cboe.fetchOptionsChain(self.allocator, symbol); + }, + else => @compileError("unsupported type for fetchFromProvider"), + }; + } + /// Invalidate cached data for a symbol so the next get* call forces a fresh fetch. pub fn invalidate(self: *DataService, symbol: []const u8, data_type: cache.DataType) void { var s = self.store(); @@ -154,7 +217,7 @@ pub const DataService = struct { /// Uses incremental updates: when the cache is stale, only fetches /// candles newer than the last cached date rather than re-fetching /// the entire history. - pub fn getCandles(self: *DataService, symbol: []const u8) DataError!struct { data: []Candle, source: Source, timestamp: i64 } { + pub fn getCandles(self: *DataService, symbol: []const u8) DataError!FetchResult(Candle) { var s = self.store(); const today = todayDate(); @@ -178,14 +241,22 @@ pub const DataService = struct { return .{ .data = r.data, .source = .cached, .timestamp = std.time.timestamp() }; } else { // Incremental fetch from day after last cached candle - var td = self.getTwelveData() catch { + var td = self.getProvider(TwelveData) catch { // No API key — return stale data if (s.read(Candle, symbol, null, .any)) |r| return .{ .data = r.data, .source = .cached, .timestamp = mr.created }; return DataError.NoApiKey; }; - const new_candles = td.fetchCandles(self.allocator, symbol, fetch_from, today) catch { - // Fetch failed — return stale data rather than erroring + const new_candles = td.fetchCandles(self.allocator, symbol, fetch_from, today) catch |err| blk: { + if (err == error.RateLimited) { + self.rateLimitBackoff(); + break :blk td.fetchCandles(self.allocator, symbol, fetch_from, today) catch { + if (s.read(Candle, symbol, null, .any)) |r| + return .{ .data = r.data, .source = .cached, .timestamp = mr.created }; + return DataError.FetchFailed; + }; + } + // Non-rate-limit failure — return stale data if (s.read(Candle, symbol, null, .any)) |r| return .{ .data = r.data, .source = .cached, .timestamp = mr.created }; return DataError.FetchFailed; @@ -212,10 +283,17 @@ pub const DataService = struct { } // No usable cache — full fetch (~10 years, plus buffer for leap years) - var td = try self.getTwelveData(); + var td = try self.getProvider(TwelveData); const from = today.addDays(-3700); - const fetched = td.fetchCandles(self.allocator, symbol, from, today) catch { + const fetched = td.fetchCandles(self.allocator, symbol, from, today) catch |err| blk: { + if (err == error.RateLimited) { + self.rateLimitBackoff(); + break :blk td.fetchCandles(self.allocator, symbol, from, today) catch { + return DataError.FetchFailed; + }; + } + s.writeNegative(symbol, .candles_daily); return DataError.FetchFailed; }; @@ -227,68 +305,25 @@ pub const DataService = struct { } /// Fetch dividend history for a symbol. - /// Checks cache first; fetches from Polygon if stale/missing. - pub fn getDividends(self: *DataService, symbol: []const u8) DataError!struct { data: []Dividend, source: Source, timestamp: i64 } { - var s = self.store(); - - if (s.read(Dividend, symbol, dividendPostProcess, .fresh_only)) |cached| - return .{ .data = cached.data, .source = .cached, .timestamp = cached.timestamp }; - - var pg = try self.getPolygon(); - const fetched = pg.fetchDividends(self.allocator, symbol, null, null) catch { - return DataError.FetchFailed; - }; - - if (fetched.len > 0) { - s.write(Dividend, symbol, fetched, cache.Ttl.dividends); - } - - return .{ .data = fetched, .source = .fetched, .timestamp = std.time.timestamp() }; + pub fn getDividends(self: *DataService, symbol: []const u8) DataError!FetchResult(Dividend) { + return self.fetchCached(Dividend, symbol, dividendPostProcess); } /// Fetch split history for a symbol. - /// Checks cache first; fetches from Polygon if stale/missing. - pub fn getSplits(self: *DataService, symbol: []const u8) DataError!struct { data: []Split, source: Source, timestamp: i64 } { - var s = self.store(); - - if (s.read(Split, symbol, null, .fresh_only)) |cached| - return .{ .data = cached.data, .source = .cached, .timestamp = cached.timestamp }; - - var pg = try self.getPolygon(); - const fetched = pg.fetchSplits(self.allocator, symbol) catch { - return DataError.FetchFailed; - }; - - s.write(Split, symbol, fetched, cache.Ttl.splits); - - return .{ .data = fetched, .source = .fetched, .timestamp = std.time.timestamp() }; + pub fn getSplits(self: *DataService, symbol: []const u8) DataError!FetchResult(Split) { + return self.fetchCached(Split, symbol, null); } - /// Fetch options chain for a symbol (all expirations). - /// Checks cache first; fetches from CBOE if stale/missing (no API key needed). - pub fn getOptions(self: *DataService, symbol: []const u8) DataError!struct { data: []OptionsChain, source: Source, timestamp: i64 } { - var s = self.store(); - - if (s.read(OptionsChain, symbol, null, .fresh_only)) |cached| - return .{ .data = cached.data, .source = .cached, .timestamp = cached.timestamp }; - - var cboe = self.getCboe(); - const fetched = cboe.fetchOptionsChain(self.allocator, symbol) catch { - return DataError.FetchFailed; - }; - - if (fetched.len > 0) { - s.write(OptionsChain, symbol, fetched, cache.Ttl.options); - } - - return .{ .data = fetched, .source = .fetched, .timestamp = std.time.timestamp() }; + /// Fetch options chain for a symbol (all expirations, no API key needed). + pub fn getOptions(self: *DataService, symbol: []const u8) DataError!FetchResult(OptionsChain) { + return self.fetchCached(OptionsChain, symbol, null); } /// Fetch earnings history for a symbol (5 years back, 1 year forward). /// Checks cache first; fetches from Finnhub if stale/missing. /// Smart refresh: even if cache is fresh, re-fetches when a past earnings /// date has no actual results yet (i.e. results just came out). - pub fn getEarnings(self: *DataService, symbol: []const u8) DataError!struct { data: []EarningsEvent, source: Source, timestamp: i64 } { + pub fn getEarnings(self: *DataService, symbol: []const u8) DataError!FetchResult(EarningsEvent) { var s = self.store(); const today = todayDate(); @@ -306,31 +341,43 @@ pub const DataService = struct { self.allocator.free(cached.data); } - var fh = try self.getFinnhub(); + var fh = try self.getProvider(Finnhub); const from = today.subtractYears(5); const to = today.addDays(365); - const fetched = fh.fetchEarnings(self.allocator, symbol, from, to) catch { + const fetched = fh.fetchEarnings(self.allocator, symbol, from, to) catch |err| blk: { + if (err == error.RateLimited) { + self.rateLimitBackoff(); + break :blk fh.fetchEarnings(self.allocator, symbol, from, to) catch { + return DataError.FetchFailed; + }; + } + s.writeNegative(symbol, .earnings); return DataError.FetchFailed; }; - if (fetched.len > 0) { - s.write(EarningsEvent, symbol, fetched, cache.Ttl.earnings); - } + s.write(EarningsEvent, symbol, fetched, cache.Ttl.earnings); return .{ .data = fetched, .source = .fetched, .timestamp = std.time.timestamp() }; } /// Fetch ETF profile for a symbol. /// Checks cache first; fetches from Alpha Vantage if stale/missing. - pub fn getEtfProfile(self: *DataService, symbol: []const u8) DataError!struct { data: EtfProfile, source: Source, timestamp: i64 } { + pub fn getEtfProfile(self: *DataService, symbol: []const u8) DataError!FetchResult(EtfProfile) { var s = self.store(); if (s.read(EtfProfile, symbol, null, .fresh_only)) |cached| return .{ .data = cached.data, .source = .cached, .timestamp = cached.timestamp }; - var av = try self.getAlphaVantage(); - const fetched = av.fetchEtfProfile(self.allocator, symbol) catch { + var av = try self.getProvider(AlphaVantage); + const fetched = av.fetchEtfProfile(self.allocator, symbol) catch |err| blk: { + if (err == error.RateLimited) { + self.rateLimitBackoff(); + break :blk av.fetchEtfProfile(self.allocator, symbol) catch { + return DataError.FetchFailed; + }; + } + s.writeNegative(symbol, .etf_profile); return DataError.FetchFailed; }; @@ -342,7 +389,7 @@ pub const DataService = struct { /// Fetch a real-time (or 15-min delayed) quote for a symbol. /// No cache -- always fetches fresh from TwelveData. pub fn getQuote(self: *DataService, symbol: []const u8) DataError!Quote { - var td = try self.getTwelveData(); + var td = try self.getProvider(TwelveData); return td.fetchQuote(self.allocator, symbol) catch return DataError.FetchFailed; } @@ -350,7 +397,7 @@ pub const DataService = struct { /// Fetch company overview (sector, industry, country, market cap) from Alpha Vantage. /// No cache -- always fetches fresh. Caller must free the returned string fields. pub fn getCompanyOverview(self: *DataService, symbol: []const u8) DataError!CompanyOverview { - var av = try self.getAlphaVantage(); + var av = try self.getProvider(AlphaVantage); return av.fetchCompanyOverview(self.allocator, symbol) catch return DataError.FetchFailed; } @@ -602,6 +649,12 @@ pub const DataService = struct { return null; } + /// A single CUSIP-to-ticker mapping record in the cache file. + const CusipEntry = struct { + cusip: []const u8 = "", + ticker: []const u8 = "", + }; + /// Read a cached CUSIP->ticker mapping. Returns null if not cached. /// Caller owns the returned string. fn getCachedCusipTicker(self: *DataService, cusip: []const u8) ?[]const u8 { @@ -611,30 +664,14 @@ pub const DataService = struct { const data = std.fs.cwd().readFileAlloc(self.allocator, path, 64 * 1024) catch return null; defer self.allocator.free(data); - // Simple line-based format: cusip::XXXXX,ticker::YYYYY - var lines = std.mem.splitScalar(u8, data, '\n'); - while (lines.next()) |line| { - const trimmed = std.mem.trim(u8, line, &std.ascii.whitespace); - if (trimmed.len == 0 or trimmed[0] == '#') continue; + var reader = std.Io.Reader.fixed(data); + var it = srf.iterator(&reader, self.allocator, .{ .alloc_strings = false }) catch return null; + defer it.deinit(); - // Parse cusip:: field - const cusip_prefix = "cusip::"; - if (!std.mem.startsWith(u8, trimmed, cusip_prefix)) continue; - const after_cusip = trimmed[cusip_prefix.len..]; - const comma_idx = std.mem.indexOfScalar(u8, after_cusip, ',') orelse continue; - const cached_cusip = after_cusip[0..comma_idx]; - if (!std.mem.eql(u8, cached_cusip, cusip)) continue; - - // Parse ticker:: field - const rest = after_cusip[comma_idx + 1 ..]; - const ticker_prefix = "ticker::"; - if (!std.mem.startsWith(u8, rest, ticker_prefix)) continue; - const ticker_val = rest[ticker_prefix.len..]; - // Trim any trailing comma/fields - const ticker_end = std.mem.indexOfScalar(u8, ticker_val, ',') orelse ticker_val.len; - const ticker = ticker_val[0..ticker_end]; - if (ticker.len > 0) { - return self.allocator.dupe(u8, ticker) catch null; + while (it.next() catch return null) |fields| { + const entry = fields.to(CusipEntry) catch continue; + if (std.mem.eql(u8, entry.cusip, cusip) and entry.ticker.len > 0) { + return self.allocator.dupe(u8, entry.ticker) catch null; } } return null; @@ -650,21 +687,34 @@ pub const DataService = struct { std.fs.cwd().makePath(dir) catch {}; } - // Append the mapping + // Open existing (append) or create new (with header) + var emit_directives = false; const file = std.fs.cwd().openFile(path, .{ .mode = .write_only }) catch blk: { - // File doesn't exist, create it + emit_directives = true; break :blk std.fs.cwd().createFile(path, .{}) catch return; }; defer file.close(); - file.seekFromEnd(0) catch {}; + if (!emit_directives) file.seekFromEnd(0) catch {}; + const entry = [_]CusipEntry{.{ .cusip = cusip, .ticker = ticker }}; var buf: [256]u8 = undefined; - const line = std.fmt.bufPrint(&buf, "cusip::{s},ticker::{s}\n", .{ cusip, ticker }) catch return; - _ = file.write(line) catch {}; + var writer = file.writer(&buf); + writer.interface.print("{f}", .{srf.fmtFrom(CusipEntry, self.allocator, &entry, .{ .emit_directives = emit_directives })}) catch return; + writer.interface.flush() catch {}; } // ── Utility ────────────────────────────────────────────────── + /// Sleep before retrying after a rate limit error. + /// Uses the provider's rate limiter estimate if available, otherwise a fixed 10s backoff. + fn rateLimitBackoff(self: *DataService) void { + const wait_ns: u64 = if (self.td) |*td| + @max(td.rate_limiter.estimateWaitNs(), 2 * std.time.ns_per_s) + else + 10 * std.time.ns_per_s; + std.Thread.sleep(wait_ns); + } + fn todayDate() Date { const ts = std.time.timestamp(); const days: i32 = @intCast(@divFloor(ts, std.time.s_per_day));