//! DataService -- unified data access layer for zfin. //! //! Encapsulates the "check cache -> fresh? return -> else fetch from provider -> cache -> return" //! pattern that was previously duplicated between CLI and TUI. Both frontends should use this //! as their sole data source. //! //! Provider selection is internal: each data type routes to the appropriate provider //! based on available API keys. Callers never need to know which provider was used. const std = @import("std"); const log = std.log.scoped(.service); const Date = @import("models/date.zig").Date; const Candle = @import("models/candle.zig").Candle; const Dividend = @import("models/dividend.zig").Dividend; const Split = @import("models/split.zig").Split; const OptionsChain = @import("models/option.zig").OptionsChain; const EarningsEvent = @import("models/earnings.zig").EarningsEvent; const Quote = @import("models/quote.zig").Quote; const EtfProfile = @import("models/etf_profile.zig").EtfProfile; const Config = @import("config.zig").Config; const cache = @import("cache/store.zig"); const srf = @import("srf"); const TwelveData = @import("providers/twelvedata.zig").TwelveData; const Polygon = @import("providers/polygon.zig").Polygon; const Finnhub = @import("providers/finnhub.zig").Finnhub; const Cboe = @import("providers/cboe.zig").Cboe; const AlphaVantage = @import("providers/alphavantage.zig").AlphaVantage; const alphavantage = @import("providers/alphavantage.zig"); const OpenFigi = @import("providers/openfigi.zig"); const Yahoo = @import("providers/yahoo.zig").Yahoo; const fmt = @import("format.zig"); const performance = @import("analytics/performance.zig"); const http = @import("net/http.zig"); pub const DataError = error{ NoApiKey, FetchFailed, CacheError, ParseError, OutOfMemory, }; /// Re-exported provider types needed by commands via DataService. pub const CompanyOverview = alphavantage.CompanyOverview; /// Result of a CUSIP-to-ticker lookup (provider-agnostic). pub const CusipResult = OpenFigi.FigiResult; /// Indicates whether the returned data came from cache or was freshly fetched. pub const Source = enum { cached, fetched, }; /// Generic result type for all fetch operations: data payload + provenance metadata. pub fn FetchResult(comptime T: type) type { return struct { data: cache.Store.DataFor(T), source: Source, timestamp: i64, }; } // ── PostProcess callbacks ──────────────────────────────────── // These are passed to Store.read to handle type-specific // concerns: string duping (serialization plumbing) and domain transforms. /// Dupe the currency string so it outlives the SRF iterator's backing buffer. fn dividendPostProcess(div: *Dividend, allocator: std.mem.Allocator) anyerror!void { if (div.currency) |c| { div.currency = try allocator.dupe(u8, c); } } /// Recompute surprise/surprise_percent from actual and estimate fields. /// SRF only stores actual and estimate; surprise is derived. fn earningsPostProcess(ev: *EarningsEvent, _: std.mem.Allocator) anyerror!void { if (ev.actual != null and ev.estimate != null) { ev.surprise = ev.actual.? - ev.estimate.?; if (ev.estimate.? != 0) { ev.surprise_percent = (ev.surprise.? / @abs(ev.estimate.?)) * 100.0; } } } pub const DataService = struct { allocator: std.mem.Allocator, config: Config, // Lazily initialized providers (null until first use) td: ?TwelveData = null, pg: ?Polygon = null, fh: ?Finnhub = null, cboe: ?Cboe = null, av: ?AlphaVantage = null, yh: ?Yahoo = null, pub fn init(allocator: std.mem.Allocator, config: Config) DataService { return .{ .allocator = allocator, .config = config, }; } pub fn deinit(self: *DataService) void { if (self.td) |*td| td.deinit(); if (self.pg) |*pg| pg.deinit(); if (self.fh) |*fh| fh.deinit(); if (self.cboe) |*c| c.deinit(); if (self.av) |*av| av.deinit(); if (self.yh) |*yh| yh.deinit(); } // ── Provider accessor ────────────────────────────────────────── fn getProvider(self: *DataService, comptime T: type) DataError!*T { const field_name = comptime providerField(T); if (@field(self, field_name)) |*p| return p; if (T == Cboe or T == Yahoo) { // CBOE and Yahoo have no API key @field(self, field_name) = T.init(self.allocator); } else { // All we're doing here is lower casing the type name, then // appending _key to it, so AlphaVantage -> alphavantage_key const config_key = comptime blk: { const full = @typeName(T); var start: usize = 0; for (full, 0..) |c, i| { if (c == '.') start = i + 1; } const short = full[start..]; var buf: [short.len + 4]u8 = undefined; _ = std.ascii.lowerString(buf[0..short.len], short); @memcpy(buf[short.len..][0..4], "_key"); break :blk buf[0 .. short.len + 4]; }; const key = @field(self.config, config_key) orelse return DataError.NoApiKey; @field(self, field_name) = T.init(self.allocator, key); } return &@field(self, field_name).?; } fn providerField(comptime T: type) []const u8 { inline for (std.meta.fields(DataService)) |f| { if (f.type == ?T) return f.name; } @compileError("unknown provider type"); } // ── Cache helper ───────────────────────────────────────────── fn store(self: *DataService) cache.Store { return cache.Store.init(self.allocator, self.config.cache_dir); } /// Generic fetch-or-cache for simple data types (dividends, splits, options). /// Checks cache first; on miss, fetches from the appropriate provider, /// writes to cache, and returns. On permanent fetch failure, writes a negative /// cache entry. Rate limit failures are retried once. fn fetchCached( self: *DataService, comptime T: type, symbol: []const u8, comptime postProcess: ?*const fn (*T, std.mem.Allocator) anyerror!void, ) DataError!FetchResult(T) { var s = self.store(); const data_type = comptime cache.Store.dataTypeFor(T); if (s.read(T, symbol, postProcess, .fresh_only)) |cached| { log.debug("{s}: {s} fresh in local cache", .{ symbol, @tagName(data_type) }); return .{ .data = cached.data, .source = .cached, .timestamp = cached.timestamp }; } // Try server sync before hitting providers if (self.syncFromServer(symbol, data_type)) { if (s.read(T, symbol, postProcess, .fresh_only)) |cached| { log.debug("{s}: {s} synced from server and fresh", .{ symbol, @tagName(data_type) }); return .{ .data = cached.data, .source = .cached, .timestamp = cached.timestamp }; } log.debug("{s}: {s} synced from server but stale, falling through to provider", .{ symbol, @tagName(data_type) }); } log.debug("{s}: fetching {s} from provider", .{ symbol, @tagName(data_type) }); const fetched = self.fetchFromProvider(T, symbol) catch |err| { if (err == error.RateLimited) { // Wait and retry once self.rateLimitBackoff(); const retried = self.fetchFromProvider(T, symbol) catch { return DataError.FetchFailed; }; s.write(T, symbol, retried, data_type.ttl()); return .{ .data = retried, .source = .fetched, .timestamp = std.time.timestamp() }; } s.writeNegative(symbol, data_type); return DataError.FetchFailed; }; s.write(T, symbol, fetched, data_type.ttl()); return .{ .data = fetched, .source = .fetched, .timestamp = std.time.timestamp() }; } /// Dispatch a fetch to the correct provider based on model type. fn fetchFromProvider(self: *DataService, comptime T: type, symbol: []const u8) !cache.Store.DataFor(T) { return switch (T) { Dividend => { var pg = try self.getProvider(Polygon); return pg.fetchDividends(self.allocator, symbol, null, null); }, Split => { var pg = try self.getProvider(Polygon); return pg.fetchSplits(self.allocator, symbol); }, OptionsChain => { var cboe = try self.getProvider(Cboe); return cboe.fetchOptionsChain(self.allocator, symbol); }, else => @compileError("unsupported type for fetchFromProvider"), }; } /// Invalidate cached data for a symbol so the next get* call forces a fresh fetch. pub fn invalidate(self: *DataService, symbol: []const u8, data_type: cache.DataType) void { var s = self.store(); s.clearData(symbol, data_type); // Also clear candle metadata when invalidating candle data if (data_type == .candles_daily) { s.clearData(symbol, .candles_meta); } } // ── Public data methods ────────────────────────────────────── /// Fetch candles from providers with fallback logic. /// Tries the provider recorded in meta (if any), then TwelveData, then Yahoo. /// Returns the candles and which provider succeeded. fn fetchCandlesFromProviders( self: *DataService, symbol: []const u8, from: Date, to: Date, preferred: cache.Store.CandleProvider, ) !struct { candles: []Candle, provider: cache.Store.CandleProvider } { // If preferred is Yahoo, try it first if (preferred == .yahoo) { if (self.getProvider(Yahoo)) |yh| { if (yh.fetchCandles(self.allocator, symbol, from, to)) |candles| { log.debug("{s}: candles from Yahoo (preferred)", .{symbol}); return .{ .candles = candles, .provider = .yahoo }; } else |_| {} } else |_| {} } // Try TwelveData if (self.getProvider(TwelveData)) |td| { if (td.fetchCandles(self.allocator, symbol, from, to)) |candles| { log.debug("{s}: candles from TwelveData", .{symbol}); return .{ .candles = candles, .provider = .twelvedata }; } else |err| { if (err == error.RateLimited) { self.rateLimitBackoff(); if (td.fetchCandles(self.allocator, symbol, from, to)) |candles| { log.debug("{s}: candles from TwelveData (after rate limit retry)", .{symbol}); return .{ .candles = candles, .provider = .twelvedata }; } else |_| {} } } } else |_| {} // Fallback: Yahoo (if not already tried as preferred) if (preferred != .yahoo) { if (self.getProvider(Yahoo)) |yh| { if (yh.fetchCandles(self.allocator, symbol, from, to)) |candles| { log.debug("{s}: candles from Yahoo (fallback)", .{symbol}); return .{ .candles = candles, .provider = .yahoo }; } else |_| {} } else |_| {} } return error.FetchFailed; } /// Fetch daily candles for a symbol (10+ years for trailing returns). /// Checks cache first; fetches from TwelveData if stale/missing. /// Uses incremental updates: when the cache is stale, only fetches /// candles newer than the last cached date rather than re-fetching /// the entire history. pub fn getCandles(self: *DataService, symbol: []const u8) DataError!FetchResult(Candle) { var s = self.store(); const today = fmt.todayDate(); // Check candle metadata for freshness (tiny file, no candle deserialization) const meta_result = s.readCandleMeta(symbol); if (meta_result) |mr| { const m = mr.meta; if (s.isCandleMetaFresh(symbol)) { // Fresh — deserialize candles and return log.debug("{s}: candles fresh in local cache", .{symbol}); if (s.read(Candle, symbol, null, .any)) |r| return .{ .data = r.data, .source = .cached, .timestamp = mr.created }; } // Stale — try server sync before incremental fetch if (self.syncCandlesFromServer(symbol)) { if (s.isCandleMetaFresh(symbol)) { log.debug("{s}: candles synced from server and fresh", .{symbol}); if (s.read(Candle, symbol, null, .any)) |r| return .{ .data = r.data, .source = .cached, .timestamp = std.time.timestamp() }; } log.debug("{s}: candles synced from server but stale, falling through to incremental fetch", .{symbol}); // Server data also stale — fall through to incremental fetch } // Stale — try incremental update using last_date from meta const fetch_from = m.last_date.addDays(1); // If last cached date is today or later, just refresh the TTL (meta only) if (!fetch_from.lessThan(today)) { s.updateCandleMeta(symbol, m.last_close, m.last_date); if (s.read(Candle, symbol, null, .any)) |r| return .{ .data = r.data, .source = .cached, .timestamp = std.time.timestamp() }; } else { // Incremental fetch from day after last cached candle const result = self.fetchCandlesFromProviders(symbol, fetch_from, today, m.provider) catch { // All providers failed — return stale data if (s.read(Candle, symbol, null, .any)) |r| return .{ .data = r.data, .source = .cached, .timestamp = mr.created }; return DataError.FetchFailed; }; const new_candles = result.candles; if (new_candles.len == 0) { // No new candles (weekend/holiday) — refresh TTL only (meta rewrite) self.allocator.free(new_candles); s.updateCandleMetaWithProvider(symbol, m.last_close, m.last_date, result.provider); if (s.read(Candle, symbol, null, .any)) |r| return .{ .data = r.data, .source = .cached, .timestamp = std.time.timestamp() }; } else { // Append new candles to existing file + update meta s.appendCandles(symbol, new_candles); // Load the full (now-updated) file for the caller if (s.read(Candle, symbol, null, .any)) |r| { self.allocator.free(new_candles); return .{ .data = r.data, .source = .fetched, .timestamp = std.time.timestamp() }; } // Append failed or file unreadable — just return new candles return .{ .data = new_candles, .source = .fetched, .timestamp = std.time.timestamp() }; } } } // No usable cache — try server sync first if (self.syncCandlesFromServer(symbol)) { if (s.isCandleMetaFresh(symbol)) { log.debug("{s}: candles synced from server and fresh (no prior cache)", .{symbol}); if (s.read(Candle, symbol, null, .any)) |r| return .{ .data = r.data, .source = .cached, .timestamp = std.time.timestamp() }; } log.debug("{s}: candles synced from server but stale, falling through to full fetch", .{symbol}); // Server data also stale — fall through to full fetch } // No usable cache — full fetch (~10 years, plus buffer for leap years) log.debug("{s}: fetching full candle history from provider", .{symbol}); const from = today.addDays(-3700); const result = self.fetchCandlesFromProviders(symbol, from, today, .twelvedata) catch { s.writeNegative(symbol, .candles_daily); return DataError.FetchFailed; }; if (result.candles.len > 0) { s.cacheCandles(symbol, result.candles); // Record which provider sourced this data const last = result.candles[result.candles.len - 1]; s.updateCandleMetaWithProvider(symbol, last.close, last.date, result.provider); } return .{ .data = result.candles, .source = .fetched, .timestamp = std.time.timestamp() }; } /// Fetch dividend history for a symbol. pub fn getDividends(self: *DataService, symbol: []const u8) DataError!FetchResult(Dividend) { return self.fetchCached(Dividend, symbol, dividendPostProcess); } /// Fetch split history for a symbol. pub fn getSplits(self: *DataService, symbol: []const u8) DataError!FetchResult(Split) { return self.fetchCached(Split, symbol, null); } /// Fetch options chain for a symbol (all expirations, no API key needed). pub fn getOptions(self: *DataService, symbol: []const u8) DataError!FetchResult(OptionsChain) { return self.fetchCached(OptionsChain, symbol, null); } /// Fetch earnings history for a symbol (5 years back, 1 year forward). /// Checks cache first; fetches from Finnhub if stale/missing. /// Smart refresh: even if cache is fresh, re-fetches when a past earnings /// date has no actual results yet (i.e. results just came out). pub fn getEarnings(self: *DataService, symbol: []const u8) DataError!FetchResult(EarningsEvent) { // Mutual funds (5-letter tickers ending in X) don't have quarterly earnings. if (isMutualFund(symbol)) { return .{ .data = &.{}, .source = .cached, .timestamp = std.time.timestamp() }; } var s = self.store(); const today = fmt.todayDate(); if (s.read(EarningsEvent, symbol, earningsPostProcess, .fresh_only)) |cached| { // Check if any past/today earnings event is still missing actual results. // If so, the announcement likely just happened — force a refresh. const needs_refresh = for (cached.data) |ev| { if (ev.actual == null and !today.lessThan(ev.date)) break true; } else false; if (!needs_refresh) { log.debug("{s}: earnings fresh in local cache", .{symbol}); return .{ .data = cached.data, .source = .cached, .timestamp = cached.timestamp }; } // Stale: free cached events and re-fetch below self.allocator.free(cached.data); } // Try server sync before hitting Finnhub if (self.syncFromServer(symbol, .earnings)) { if (s.read(EarningsEvent, symbol, earningsPostProcess, .fresh_only)) |cached| { log.debug("{s}: earnings synced from server and fresh", .{symbol}); return .{ .data = cached.data, .source = .cached, .timestamp = cached.timestamp }; } log.debug("{s}: earnings synced from server but stale, falling through to provider", .{symbol}); } log.debug("{s}: fetching earnings from provider", .{symbol}); var fh = try self.getProvider(Finnhub); const from = today.subtractYears(5); const to = today.addDays(365); const fetched = fh.fetchEarnings(self.allocator, symbol, from, to) catch |err| blk: { if (err == error.RateLimited) { self.rateLimitBackoff(); break :blk fh.fetchEarnings(self.allocator, symbol, from, to) catch { return DataError.FetchFailed; }; } s.writeNegative(symbol, .earnings); return DataError.FetchFailed; }; s.write(EarningsEvent, symbol, fetched, cache.Ttl.earnings); return .{ .data = fetched, .source = .fetched, .timestamp = std.time.timestamp() }; } /// Fetch ETF profile for a symbol. /// Checks cache first; fetches from Alpha Vantage if stale/missing. pub fn getEtfProfile(self: *DataService, symbol: []const u8) DataError!FetchResult(EtfProfile) { var s = self.store(); if (s.read(EtfProfile, symbol, null, .fresh_only)) |cached| return .{ .data = cached.data, .source = .cached, .timestamp = cached.timestamp }; var av = try self.getProvider(AlphaVantage); const fetched = av.fetchEtfProfile(self.allocator, symbol) catch |err| blk: { if (err == error.RateLimited) { self.rateLimitBackoff(); break :blk av.fetchEtfProfile(self.allocator, symbol) catch { return DataError.FetchFailed; }; } s.writeNegative(symbol, .etf_profile); return DataError.FetchFailed; }; s.write(EtfProfile, symbol, fetched, cache.Ttl.etf_profile); return .{ .data = fetched, .source = .fetched, .timestamp = std.time.timestamp() }; } /// Fetch a real-time quote for a symbol. /// Yahoo Finance is primary (free, no API key, no 15-min delay). /// Falls back to TwelveData if Yahoo fails. pub fn getQuote(self: *DataService, symbol: []const u8) DataError!Quote { // Primary: Yahoo Finance (free, real-time) if (self.getProvider(Yahoo)) |yh| { if (yh.fetchQuote(self.allocator, symbol)) |quote| { log.debug("{s}: quote from Yahoo", .{symbol}); return quote; } else |_| {} } else |_| {} // Fallback: TwelveData (requires API key, may be 15-min delayed) var td = try self.getProvider(TwelveData); log.debug("{s}: quote fallback to TwelveData", .{symbol}); return td.fetchQuote(self.allocator, symbol) catch return DataError.FetchFailed; } /// Fetch company overview (sector, industry, country, market cap) from Alpha Vantage. /// No cache -- always fetches fresh. Caller must free the returned string fields. pub fn getCompanyOverview(self: *DataService, symbol: []const u8) DataError!CompanyOverview { var av = try self.getProvider(AlphaVantage); return av.fetchCompanyOverview(self.allocator, symbol) catch return DataError.FetchFailed; } /// Compute trailing returns for a symbol (fetches candles + dividends). /// Returns both as-of-date and month-end trailing returns. /// As-of-date: end = latest close. Matches Morningstar "Trailing Returns" page. /// Month-end: end = last business day of prior month. Matches Morningstar "Performance" page. pub fn getTrailingReturns(self: *DataService, symbol: []const u8) DataError!struct { asof_price: performance.TrailingReturns, asof_total: ?performance.TrailingReturns, me_price: performance.TrailingReturns, me_total: ?performance.TrailingReturns, candles: []Candle, dividends: ?[]Dividend, source: Source, timestamp: i64, } { const candle_result = try self.getCandles(symbol); const c = candle_result.data; if (c.len == 0) return DataError.FetchFailed; const today = fmt.todayDate(); // As-of-date (end = last candle) const asof_price = performance.trailingReturns(c); // Month-end (end = last business day of prior month) const me_price = performance.trailingReturnsMonthEnd(c, today); // Try to get dividends (non-fatal if unavailable) var divs: ?[]Dividend = null; var asof_total: ?performance.TrailingReturns = null; var me_total: ?performance.TrailingReturns = null; if (self.getDividends(symbol)) |div_result| { divs = div_result.data; asof_total = performance.trailingReturnsWithDividends(c, div_result.data); me_total = performance.trailingReturnsMonthEndWithDividends(c, div_result.data, today); } else |_| {} return .{ .asof_price = asof_price, .asof_total = asof_total, .me_price = me_price, .me_total = me_total, .candles = c, .dividends = divs, .source = candle_result.source, .timestamp = candle_result.timestamp, }; } /// Check if candle data is fresh in cache without full deserialization. pub fn isCandleCacheFresh(self: *DataService, symbol: []const u8) bool { var s = self.store(); return s.isCandleMetaFresh(symbol); } /// Read only the latest close price from cached candles (no full deserialization). /// Returns null if no cached data exists. pub fn getCachedLastClose(self: *DataService, symbol: []const u8) ?f64 { var s = self.store(); return s.readLastClose(symbol); } /// Estimate wait time (in seconds) before the next TwelveData API call can proceed. /// Returns 0 if a request can be made immediately. Returns null if no API key. pub fn estimateWaitSeconds(self: *DataService) ?u64 { if (self.td) |*td| { const ns = td.rate_limiter.estimateWaitNs(); return if (ns == 0) 0 else @max(1, ns / std.time.ns_per_s); } return null; } /// Read candles from cache only (no network fetch). Used by TUI for display. /// Returns null if no cached data exists or if the entry is a negative cache (fetch_failed). pub fn getCachedCandles(self: *DataService, symbol: []const u8) ?[]Candle { var s = self.store(); if (s.isNegative(symbol, .candles_daily)) return null; const result = s.read(Candle, symbol, null, .any) orelse return null; return result.data; } /// Read dividends from cache only (no network fetch). pub fn getCachedDividends(self: *DataService, symbol: []const u8) ?[]Dividend { var s = self.store(); const result = s.read(Dividend, symbol, dividendPostProcess, .any) orelse return null; return result.data; } /// Read earnings from cache only (no network fetch). pub fn getCachedEarnings(self: *DataService, symbol: []const u8) ?[]EarningsEvent { var s = self.store(); const result = s.read(EarningsEvent, symbol, earningsPostProcess, .any) orelse return null; return result.data; } /// Read options from cache only (no network fetch). pub fn getCachedOptions(self: *DataService, symbol: []const u8) ?[]OptionsChain { var s = self.store(); const result = s.read(OptionsChain, symbol, null, .any) orelse return null; return result.data; } // ── Portfolio price loading ────────────────────────────────── /// Result of loading prices for a set of symbols. pub const PriceLoadResult = struct { /// Number of symbols whose price came from fresh cache. cached_count: usize, /// Number of symbols successfully fetched from API. fetched_count: usize, /// Number of symbols where API fetch failed. fail_count: usize, /// Number of failed symbols that fell back to stale cache. stale_count: usize, /// Latest candle date seen across all symbols. latest_date: ?Date, }; /// Status emitted for each symbol during price loading. pub const SymbolStatus = enum { /// Price resolved from fresh cache. cached, /// About to attempt an API fetch (emitted before the network call). fetching, /// Price fetched successfully from API. fetched, /// API fetch failed but stale cached price was used. failed_used_stale, /// API fetch failed and no cached price exists. failed, }; /// Callback for progress reporting during price loading. /// `context` is an opaque pointer to caller-owned state. pub const ProgressCallback = struct { context: *anyopaque, on_progress: *const fn (ctx: *anyopaque, index: usize, total: usize, symbol: []const u8, status: SymbolStatus) void, fn emit(self: ProgressCallback, index: usize, total: usize, symbol: []const u8, status: SymbolStatus) void { self.on_progress(self.context, index, total, symbol, status); } }; /// Load current prices for a list of symbols into `prices`. /// /// For each symbol the resolution order is: /// 1. Fresh cache -> use cached last close (fast, no deserialization) /// 2. API fetch -> use latest candle close /// 3. Stale cache -> use last close from expired cache entry /// /// If `force_refresh` is true, cache is invalidated before checking freshness. /// If `progress` is provided, it is called for each symbol with the outcome. pub fn loadPrices( self: *DataService, symbols: []const []const u8, prices: *std.StringHashMap(f64), force_refresh: bool, progress: ?ProgressCallback, ) PriceLoadResult { var result = PriceLoadResult{ .cached_count = 0, .fetched_count = 0, .fail_count = 0, .stale_count = 0, .latest_date = null, }; const total = symbols.len; for (symbols, 0..) |sym, i| { if (force_refresh) { self.invalidate(sym, .candles_daily); } // 1. Fresh cache — fast path (no full deserialization) if (!force_refresh and self.isCandleCacheFresh(sym)) { if (self.getCachedLastClose(sym)) |close| { prices.put(sym, close) catch {}; } result.cached_count += 1; if (progress) |p| p.emit(i, total, sym, .cached); continue; } // About to fetch — notify caller (so it can show rate-limit waits etc.) if (progress) |p| p.emit(i, total, sym, .fetching); // 2. Try API fetch if (self.getCandles(sym)) |candle_result| { defer self.allocator.free(candle_result.data); if (candle_result.data.len > 0) { const last = candle_result.data[candle_result.data.len - 1]; prices.put(sym, last.close) catch {}; if (result.latest_date == null or last.date.days > result.latest_date.?.days) { result.latest_date = last.date; } } result.fetched_count += 1; if (progress) |p| p.emit(i, total, sym, .fetched); continue; } else |_| {} // 3. Fetch failed — fall back to stale cache result.fail_count += 1; if (self.getCachedLastClose(sym)) |close| { prices.put(sym, close) catch {}; result.stale_count += 1; if (progress) |p| p.emit(i, total, sym, .failed_used_stale); } else { if (progress) |p| p.emit(i, total, sym, .failed); } } return result; } // ── CUSIP Resolution ────────────────────────────────────────── /// Look up multiple CUSIPs in a single batch request via OpenFIGI. /// Results array is parallel to the input cusips array (same length, same order). /// Caller owns the returned slice and all strings within each CusipResult. pub fn lookupCusips(self: *DataService, cusips: []const []const u8) DataError![]CusipResult { return OpenFigi.lookupCusips(self.allocator, cusips, self.config.openfigi_key) catch return DataError.FetchFailed; } /// Look up a CUSIP via OpenFIGI API. Returns the ticker if found, null otherwise. /// Results are cached in {cache_dir}/cusip_tickers.srf. /// Caller owns the returned string. pub fn lookupCusip(self: *DataService, cusip: []const u8) ?[]const u8 { // Check local cache first if (self.getCachedCusipTicker(cusip)) |t| return t; // Try OpenFIGI const result = OpenFigi.lookupCusip(self.allocator, cusip, self.config.openfigi_key) catch return null; defer { if (result.name) |n| self.allocator.free(n); if (result.security_type) |s| self.allocator.free(s); } if (result.ticker) |ticker| { // Cache the mapping self.cacheCusipTicker(cusip, ticker); return ticker; // caller takes ownership } return null; } /// A single CUSIP-to-ticker mapping record in the cache file. const CusipEntry = struct { cusip: []const u8 = "", ticker: []const u8 = "", }; /// Read a cached CUSIP->ticker mapping. Returns null if not cached. /// Caller owns the returned string. fn getCachedCusipTicker(self: *DataService, cusip: []const u8) ?[]const u8 { const path = std.fs.path.join(self.allocator, &.{ self.config.cache_dir, "cusip_tickers.srf" }) catch return null; defer self.allocator.free(path); const data = std.fs.cwd().readFileAlloc(self.allocator, path, 64 * 1024) catch return null; defer self.allocator.free(data); var reader = std.Io.Reader.fixed(data); var it = srf.iterator(&reader, self.allocator, .{ .alloc_strings = false }) catch return null; defer it.deinit(); while (it.next() catch return null) |fields| { const entry = fields.to(CusipEntry) catch continue; if (std.mem.eql(u8, entry.cusip, cusip) and entry.ticker.len > 0) { return self.allocator.dupe(u8, entry.ticker) catch null; } } return null; } /// Append a CUSIP->ticker mapping to the cache file. pub fn cacheCusipTicker(self: *DataService, cusip: []const u8, ticker: []const u8) void { const path = std.fs.path.join(self.allocator, &.{ self.config.cache_dir, "cusip_tickers.srf" }) catch return; defer self.allocator.free(path); // Ensure cache dir exists if (std.fs.path.dirnamePosix(path)) |dir| { std.fs.cwd().makePath(dir) catch {}; } // Open existing (append) or create new (with header) var emit_directives = false; const file = std.fs.cwd().openFile(path, .{ .mode = .write_only }) catch blk: { emit_directives = true; break :blk std.fs.cwd().createFile(path, .{}) catch return; }; defer file.close(); if (!emit_directives) file.seekFromEnd(0) catch {}; const entry = [_]CusipEntry{.{ .cusip = cusip, .ticker = ticker }}; var buf: [256]u8 = undefined; var writer = file.writer(&buf); writer.interface.print("{f}", .{srf.fmtFrom(CusipEntry, self.allocator, &entry, .{ .emit_directives = emit_directives })}) catch return; writer.interface.flush() catch {}; } // ── Utility ────────────────────────────────────────────────── /// Sleep before retrying after a rate limit error. /// Uses the provider's rate limiter if available, otherwise a fixed 10s backoff. fn rateLimitBackoff(self: *DataService) void { if (self.td) |*td| { td.rate_limiter.backoff(); } else { std.Thread.sleep(10 * std.time.ns_per_s); } } // ── Server sync ────────────────────────────────────────────── /// Try to sync a cache file from the configured zfin-server. /// Returns true if the file was successfully synced, false on any error. /// Silently returns false if no server is configured. fn syncFromServer(self: *DataService, symbol: []const u8, data_type: cache.DataType) bool { const server_url = self.config.server_url orelse return false; const endpoint = switch (data_type) { .candles_daily => "/candles", .candles_meta => "/candles_meta", .dividends => "/dividends", .earnings => "/earnings", .options => "/options", .splits => return false, // not served .etf_profile => return false, // not served .meta => return false, }; const full_url = std.fmt.allocPrint(self.allocator, "{s}/{s}{s}", .{ server_url, symbol, endpoint }) catch return false; defer self.allocator.free(full_url); log.debug("{s}: syncing {s} from server", .{ symbol, @tagName(data_type) }); var client = http.Client.init(self.allocator); defer client.deinit(); var response = client.get(full_url) catch |err| { log.debug("{s}: server sync failed for {s}: {s}", .{ symbol, @tagName(data_type), @errorName(err) }); return false; }; defer response.deinit(); // Write to local cache var s = self.store(); s.writeRaw(symbol, data_type, response.body) catch |err| { log.debug("{s}: failed to write synced {s} to cache: {s}", .{ symbol, @tagName(data_type), @errorName(err) }); return false; }; log.debug("{s}: synced {s} from server ({d} bytes)", .{ symbol, @tagName(data_type), response.body.len }); return true; } /// Sync candle data (both daily and meta) from the server. fn syncCandlesFromServer(self: *DataService, symbol: []const u8) bool { const daily = self.syncFromServer(symbol, .candles_daily); const meta = self.syncFromServer(symbol, .candles_meta); return daily and meta; } /// Mutual funds use 5-letter tickers ending in X (e.g. FDSCX, VSTCX, FAGIX). /// These don't have quarterly earnings on Finnhub. fn isMutualFund(symbol: []const u8) bool { return symbol.len == 5 and symbol[4] == 'X'; } };