zfin/src/service.zig

1655 lines
71 KiB
Zig

//! DataService -- unified data access layer for zfin.
//!
//! Encapsulates the "check cache -> fresh? return -> else fetch from provider -> cache -> return"
//! pattern that was previously duplicated between CLI and TUI. Both frontends should use this
//! as their sole data source.
//!
//! Provider selection is internal: each data type routes to the appropriate provider
//! based on available API keys. Callers never need to know which provider was used.
const std = @import("std");
const log = std.log.scoped(.service);
const Date = @import("models/date.zig").Date;
const Candle = @import("models/candle.zig").Candle;
const Dividend = @import("models/dividend.zig").Dividend;
const Split = @import("models/split.zig").Split;
const OptionsChain = @import("models/option.zig").OptionsChain;
const EarningsEvent = @import("models/earnings.zig").EarningsEvent;
const Quote = @import("models/quote.zig").Quote;
const EtfProfile = @import("models/etf_profile.zig").EtfProfile;
const Config = @import("Config.zig");
const cache = @import("cache/store.zig");
const srf = @import("srf");
const analysis = @import("analytics/analysis.zig");
const transaction_log = @import("models/transaction_log.zig");
const TwelveData = @import("providers/twelvedata.zig").TwelveData;
const Polygon = @import("providers/polygon.zig").Polygon;
const Fmp = @import("providers/fmp.zig").Fmp;
const Cboe = @import("providers/cboe.zig").Cboe;
const AlphaVantage = @import("providers/alphavantage.zig").AlphaVantage;
const alphavantage = @import("providers/alphavantage.zig");
const OpenFigi = @import("providers/openfigi.zig");
const Yahoo = @import("providers/yahoo.zig").Yahoo;
const Tiingo = @import("providers/tiingo.zig").Tiingo;
const fmt = @import("format.zig");
const performance = @import("analytics/performance.zig");
const http = @import("net/http.zig");
const atomic = @import("atomic.zig");
pub const DataError = error{
NoApiKey,
FetchFailed,
CacheError,
ParseError,
OutOfMemory,
/// Transient provider failure (server error, connection issue).
/// Caller should stop and retry later.
TransientError,
/// Provider auth failure (bad API key). Entire refresh should stop.
AuthError,
};
/// Re-exported provider types needed by commands via DataService.
pub const CompanyOverview = alphavantage.CompanyOverview;
/// Result of a CUSIP-to-ticker lookup (provider-agnostic).
pub const CusipResult = OpenFigi.FigiResult;
/// Indicates whether the returned data came from cache or was freshly fetched.
pub const Source = enum {
cached,
fetched,
};
/// Generic result type for all fetch operations: data payload + provenance metadata.
///
/// `data` is owned by `allocator` — call `result.deinit()` to release
/// it (both the outer slice/struct and any nested owned fields). This
/// replaces the earlier "caller frees with whatever allocator they
/// happen to have" pattern, which was error-prone when the caller's
/// allocator (e.g. an arena) differed from the service's allocator.
pub fn FetchResult(comptime T: type) type {
return struct {
data: cache.Store.DataFor(T),
source: Source,
timestamp: i64,
/// Allocator that owns `data`. Populated by the service on
/// every return path; callers use it via `deinit` rather than
/// touching it directly.
allocator: std.mem.Allocator,
/// Free `data` and any nested owned fields.
///
/// Dispatches at comptime:
/// - If `T` has a `freeSlice` helper (Dividend, OptionsChain),
/// call it — handles element deinit plus the outer slice.
/// - Else if `data` is a slice (Candle, Split, EarningsEvent),
/// do a simple slice free.
/// - Else if `T` has a `deinit` method (EtfProfile), call it
/// on the struct itself.
pub fn deinit(self: @This()) void {
const DT = @TypeOf(self.data);
if (@hasDecl(T, "freeSlice")) {
T.freeSlice(self.allocator, self.data);
} else if (@typeInfo(DT) == .pointer) {
self.allocator.free(self.data);
} else if (@hasDecl(T, "deinit")) {
self.data.deinit(self.allocator);
}
}
};
}
// ── PostProcess callbacks ────────────────────────────────────
// These are passed to Store.read to handle type-specific
// concerns: string duping (serialization plumbing) and domain transforms.
/// Dupe the currency string so it outlives the SRF iterator's backing buffer.
fn dividendPostProcess(div: *Dividend, allocator: std.mem.Allocator) anyerror!void {
if (div.currency) |c| {
div.currency = try allocator.dupe(u8, c);
}
}
/// Recompute surprise/surprise_percent from actual and estimate fields.
/// SRF only stores actual and estimate; surprise is derived.
fn earningsPostProcess(ev: *EarningsEvent, _: std.mem.Allocator) anyerror!void {
if (ev.actual != null and ev.estimate != null) {
ev.surprise = ev.actual.? - ev.estimate.?;
if (ev.estimate.? != 0) {
ev.surprise_percent = (ev.surprise.? / @abs(ev.estimate.?)) * 100.0;
}
}
}
pub const DataService = struct {
/// Thread-safe wrapper over the caller-provided base allocator.
///
/// Why this exists: `parallelServerSync` spawns worker threads that
/// each allocate through `DataService` — HTTP client init, TLS cert
/// bundle parsing, request/response buffers, and `Store.writeRaw`
/// path joins. The CLI's root allocator is an `ArenaAllocator`
/// (`src/main.zig`), which is NOT thread-safe. Unsynchronized
/// concurrent allocs from workers corrupt the arena's free list.
/// Symptoms seen in the wild:
///
/// thread N panic: reached unreachable code
/// std/mem/Allocator.zig:147 grow
/// std/hash_map.zig:1296 addCertsFromFile
/// std/crypto/Certificate/Bundle.zig:206 request
/// std/http/Client.zig:1789 request
/// src/net/http.zig:43 syncFromServer
///
/// and bare segfaults mid-heap on whatever pointer the arena
/// scrambled that run.
///
/// The wrapper serializes every allocation with a mutex. Cost is
/// one lock acquire/release per alloc — negligible next to the I/O
/// these allocations feed (HTTP requests, cache writes). The
/// alternative (threading per-worker arenas through every
/// transitive callsite) was rejected as error-prone.
///
/// DO NOT add an "unwrap" method or store the child allocator
/// directly. The point is that internal callers don't need to
/// know whether they're running under threads — every path goes
/// through the lock by construction.
thread_safe: std.heap.ThreadSafeAllocator,
config: Config,
// Lazily initialized providers (null until first use)
td: ?TwelveData = null,
pg: ?Polygon = null,
fmp: ?Fmp = null,
cboe: ?Cboe = null,
av: ?AlphaVantage = null,
yh: ?Yahoo = null,
tg: ?Tiingo = null,
pub fn init(base_allocator: std.mem.Allocator, config: Config) DataService {
const self = DataService{
.thread_safe = .{ .child_allocator = base_allocator },
.config = config,
};
self.logMissingKeys();
return self;
}
/// Return the thread-safe allocator. Always go through this, never
/// access the child allocator directly — see the doc-comment on
/// `thread_safe` for why.
///
/// Safe to call from any method that holds `*DataService`. The
/// returned `std.mem.Allocator` embeds `&self.thread_safe`, which
/// is stable for as long as `self` is.
pub fn allocator(self: *DataService) std.mem.Allocator {
return self.thread_safe.allocator();
}
/// Log warnings for missing API keys so users know which features are unavailable.
fn logMissingKeys(self: DataService) void {
// Primary candle provider
if (self.config.tiingo_key == null) {
log.warn("TIINGO_API_KEY not set — candle data will fall back to TwelveData/Yahoo", .{});
}
// Dividend/split data
if (self.config.polygon_key == null) {
log.warn("POLYGON_API_KEY not set — dividend and split data unavailable", .{});
}
// Earnings data
if (self.config.fmp_key == null) {
log.warn("FMP_API_KEY not set — earnings data unavailable", .{});
}
// ETF profiles
if (self.config.alphavantage_key == null) {
log.warn("ALPHAVANTAGE_API_KEY not set — ETF profiles unavailable", .{});
}
// Candle fallback
if (self.config.twelvedata_key == null and self.config.tiingo_key == null) {
log.warn("TWELVEDATA_API_KEY not set — no candle fallback if Yahoo fails", .{});
}
// CUSIP lookups
if (self.config.openfigi_key == null) {
log.info("OPENFIGI_API_KEY not set — CUSIP lookups will use anonymous rate limits", .{});
}
}
pub fn deinit(self: *DataService) void {
if (self.td) |*td| td.deinit();
if (self.pg) |*pg| pg.deinit();
if (self.fmp) |*fmp| fmp.deinit();
if (self.cboe) |*c| c.deinit();
if (self.av) |*av| av.deinit();
if (self.yh) |*yh| yh.deinit();
if (self.tg) |*tg| tg.deinit();
}
// ── Provider accessor ──────────────────────────────────────────
fn getProvider(self: *DataService, comptime T: type) DataError!*T {
const field_name = comptime providerField(T);
if (@field(self, field_name)) |*p| return p;
if (T == Cboe or T == Yahoo) {
// CBOE and Yahoo have no API key
@field(self, field_name) = T.init(self.allocator());
} else {
// All we're doing here is lower casing the type name, then
// appending _key to it, so AlphaVantage -> alphavantage_key
const config_key = comptime blk: {
const full = @typeName(T);
var start: usize = 0;
for (full, 0..) |c, i| {
if (c == '.') start = i + 1;
}
const short = full[start..];
var buf: [short.len + 4]u8 = undefined;
_ = std.ascii.lowerString(buf[0..short.len], short);
@memcpy(buf[short.len..][0..4], "_key");
break :blk buf[0 .. short.len + 4];
};
const key = @field(self.config, config_key) orelse return DataError.NoApiKey;
@field(self, field_name) = T.init(self.allocator(), key);
}
return &@field(self, field_name).?;
}
fn providerField(comptime T: type) []const u8 {
inline for (std.meta.fields(DataService)) |f| {
if (f.type == ?T) return f.name;
}
@compileError("unknown provider type");
}
// ── Cache helper ─────────────────────────────────────────────
fn store(self: *DataService) cache.Store {
return cache.Store.init(self.allocator(), self.config.cache_dir);
}
/// Generic fetch-or-cache for simple data types (dividends, splits, options).
/// Checks cache first; on miss, fetches from the appropriate provider,
/// writes to cache, and returns. On permanent fetch failure, writes a negative
/// cache entry. Rate limit failures are retried once.
fn fetchCached(
self: *DataService,
comptime T: type,
symbol: []const u8,
comptime postProcess: ?*const fn (*T, std.mem.Allocator) anyerror!void,
) DataError!FetchResult(T) {
var s = self.store();
const data_type = comptime cache.Store.dataTypeFor(T);
if (s.read(T, symbol, postProcess, .fresh_only)) |cached| {
log.debug("{s}: {s} fresh in local cache", .{ symbol, @tagName(data_type) });
return .{ .data = cached.data, .source = .cached, .timestamp = cached.timestamp, .allocator = self.allocator() };
}
// Try server sync before hitting providers
if (self.syncFromServer(symbol, data_type)) {
if (s.read(T, symbol, postProcess, .fresh_only)) |cached| {
log.debug("{s}: {s} synced from server and fresh", .{ symbol, @tagName(data_type) });
return .{ .data = cached.data, .source = .cached, .timestamp = cached.timestamp, .allocator = self.allocator() };
}
log.debug("{s}: {s} synced from server but stale, falling through to provider", .{ symbol, @tagName(data_type) });
}
log.debug("{s}: fetching {s} from provider", .{ symbol, @tagName(data_type) });
const fetched = self.fetchFromProvider(T, symbol) catch |err| {
if (err == error.RateLimited) {
// Wait and retry once
self.rateLimitBackoff();
const retried = self.fetchFromProvider(T, symbol) catch {
return DataError.FetchFailed;
};
s.write(T, symbol, retried, data_type.ttl());
return .{ .data = retried, .source = .fetched, .timestamp = std.time.timestamp(), .allocator = self.allocator() };
}
s.writeNegative(symbol, data_type);
return DataError.FetchFailed;
};
s.write(T, symbol, fetched, data_type.ttl());
return .{ .data = fetched, .source = .fetched, .timestamp = std.time.timestamp(), .allocator = self.allocator() };
}
/// Dispatch a fetch to the correct provider based on model type.
fn fetchFromProvider(self: *DataService, comptime T: type, symbol: []const u8) !cache.Store.DataFor(T) {
return switch (T) {
Dividend => {
var pg = try self.getProvider(Polygon);
return pg.fetchDividends(self.allocator(), symbol, null, null);
},
Split => {
var pg = try self.getProvider(Polygon);
return pg.fetchSplits(self.allocator(), symbol);
},
OptionsChain => {
var cboe = try self.getProvider(Cboe);
return cboe.fetchOptionsChain(self.allocator(), symbol);
},
else => @compileError("unsupported type for fetchFromProvider"),
};
}
/// Invalidate cached data for a symbol so the next get* call forces a fresh fetch.
pub fn invalidate(self: *DataService, symbol: []const u8, data_type: cache.DataType) void {
var s = self.store();
s.clearData(symbol, data_type);
// Also clear candle metadata when invalidating candle data
if (data_type == .candles_daily) {
s.clearData(symbol, .candles_meta);
}
}
// ── Public data methods ──────────────────────────────────────
/// Fetch candles from providers with error classification.
///
/// Error handling:
/// - ServerError/RateLimited/RequestFailed from Tiingo → TransientError (stop refresh, retry later)
/// - NotFound/ParseError/InvalidResponse from Tiingo → try Yahoo (symbol-level issue)
/// - Unauthorized → TransientError (config problem, stop refresh)
///
/// The `preferred` param controls incremental fetch consistency: use the same
/// provider that sourced the existing cache data.
fn fetchCandlesFromProviders(
self: *DataService,
symbol: []const u8,
from: Date,
to: Date,
preferred: cache.Store.CandleProvider,
) (DataError || error{NotFound})!struct { candles: []Candle, provider: cache.Store.CandleProvider } {
// If preferred is Yahoo (degraded symbol), try Yahoo first
if (preferred == .yahoo) {
if (self.getProvider(Yahoo)) |yh| {
if (yh.fetchCandles(self.allocator(), symbol, from, to)) |candles| {
log.debug("{s}: candles from Yahoo (preferred)", .{symbol});
return .{ .candles = candles, .provider = .yahoo };
} else |err| {
log.warn("{s}: Yahoo (preferred) failed: {s}", .{ symbol, @errorName(err) });
}
} else |_| {}
}
// Primary: Tiingo
if (self.getProvider(Tiingo)) |tg| {
if (tg.fetchCandles(self.allocator(), symbol, from, to)) |candles| {
log.debug("{s}: candles from Tiingo", .{symbol});
return .{ .candles = candles, .provider = .tiingo };
} else |err| {
log.warn("{s}: Tiingo failed: {s}", .{ symbol, @errorName(err) });
if (err == error.Unauthorized) {
log.err("{s}: Tiingo auth failed — check TIINGO_API_KEY", .{symbol});
return DataError.AuthError;
}
if (err == error.RateLimited) {
// Rate limited: back off and retry — this is expected, not a failure
log.info("{s}: Tiingo rate limited, backing off", .{symbol});
self.rateLimitBackoff();
if (tg.fetchCandles(self.allocator(), symbol, from, to)) |candles| {
log.debug("{s}: candles from Tiingo (after rate limit backoff)", .{symbol});
return .{ .candles = candles, .provider = .tiingo };
} else |retry_err| {
log.warn("{s}: Tiingo retry after backoff failed: {s}", .{ symbol, @errorName(retry_err) });
if (retry_err == error.RateLimited) {
// Still rate limited after backoff — one more try
self.rateLimitBackoff();
if (tg.fetchCandles(self.allocator(), symbol, from, to)) |candles| {
log.debug("{s}: candles from Tiingo (after second backoff)", .{symbol});
return .{ .candles = candles, .provider = .tiingo };
} else |_| {}
}
// Exhausted rate limit retries — treat as transient
return DataError.TransientError;
}
}
if (isTransientError(err)) {
// Server error or connection failure — stop, don't fall back
return DataError.TransientError;
}
// NotFound, ParseError, InvalidResponse — symbol-level issue, try Yahoo
log.info("{s}: Tiingo does not have this symbol, trying Yahoo", .{symbol});
}
} else |_| {
log.warn("{s}: Tiingo provider not available (no API key?)", .{symbol});
}
// Fallback: Yahoo (symbol not on Tiingo)
if (preferred != .yahoo) {
if (self.getProvider(Yahoo)) |yh| {
if (yh.fetchCandles(self.allocator(), symbol, from, to)) |candles| {
log.info("{s}: candles from Yahoo (Tiingo fallback)", .{symbol});
return .{ .candles = candles, .provider = .yahoo };
} else |err| {
log.warn("{s}: Yahoo fallback also failed: {s}", .{ symbol, @errorName(err) });
}
} else |_| {
log.warn("{s}: Yahoo provider not available", .{symbol});
}
}
return DataError.FetchFailed;
}
/// Classify whether a provider error is transient (provider is down).
/// ServerError = HTTP 5xx, RequestFailed = connection/network failure.
/// Note: RateLimited and Unauthorized are handled separately.
fn isTransientError(err: anyerror) bool {
return err == error.ServerError or
err == error.RequestFailed;
}
/// Fetch daily candles for a symbol (10+ years for trailing returns).
/// Checks cache first; fetches from Tiingo (primary) or Yahoo (fallback) if stale/missing.
/// Uses incremental updates: when the cache is stale, only fetches
/// candles newer than the last cached date rather than re-fetching
/// the entire history.
pub fn getCandles(self: *DataService, symbol: []const u8) DataError!FetchResult(Candle) {
var s = self.store();
const today = fmt.todayDate();
// Check candle metadata for freshness (tiny file, no candle deserialization)
const meta_result = s.readCandleMeta(symbol);
if (meta_result) |mr| {
const m = mr.meta;
// If cached data is from TwelveData (deprecated for candles due to
// unreliable adj_close), skip cache and fall through to full re-fetch.
if (m.provider == .twelvedata) {
log.debug("{s}: cached candles from TwelveData — forcing full re-fetch", .{symbol});
} else if (s.isCandleMetaFresh(symbol)) {
// Fresh — deserialize candles and return
log.debug("{s}: candles fresh in local cache", .{symbol});
if (s.read(Candle, symbol, null, .any)) |r|
return .{ .data = r.data, .source = .cached, .timestamp = mr.created, .allocator = self.allocator() };
} else {
// Stale — try server sync before incremental fetch
if (self.syncCandlesFromServer(symbol)) {
if (s.isCandleMetaFresh(symbol)) {
log.debug("{s}: candles synced from server and fresh", .{symbol});
if (s.read(Candle, symbol, null, .any)) |r|
return .{ .data = r.data, .source = .cached, .timestamp = std.time.timestamp(), .allocator = self.allocator() };
}
log.debug("{s}: candles synced from server but stale, falling through to incremental fetch", .{symbol});
}
// Stale — try incremental update using last_date from meta
const fetch_from = m.last_date.addDays(1);
// If last cached date is today or later, just refresh the TTL (meta only)
if (!fetch_from.lessThan(today)) {
s.updateCandleMeta(symbol, m.last_close, m.last_date, m.provider, m.fail_count);
if (s.read(Candle, symbol, null, .any)) |r|
return .{ .data = r.data, .source = .cached, .timestamp = std.time.timestamp(), .allocator = self.allocator() };
} else {
// Incremental fetch from day after last cached candle
const result = self.fetchCandlesFromProviders(symbol, fetch_from, today, m.provider) catch |err| {
if (err == DataError.TransientError) {
// Increment fail_count for this symbol
const new_fail_count = m.fail_count +| 1; // saturating add
log.warn("{s}: transient failure (fail_count now {d})", .{ symbol, new_fail_count });
s.updateCandleMeta(symbol, m.last_close, m.last_date, m.provider, new_fail_count);
// If degraded (fail_count >= 3), return stale data rather than failing
if (new_fail_count >= 3) {
log.warn("{s}: degraded after {d} consecutive failures, returning stale data", .{ symbol, new_fail_count });
if (s.read(Candle, symbol, null, .any)) |r|
return .{ .data = r.data, .source = .cached, .timestamp = mr.created, .allocator = self.allocator() };
}
return DataError.TransientError;
}
// Non-transient failure — return stale data if available
if (s.read(Candle, symbol, null, .any)) |r|
return .{ .data = r.data, .source = .cached, .timestamp = mr.created, .allocator = self.allocator() };
return DataError.FetchFailed;
};
const new_candles = result.candles;
if (new_candles.len == 0) {
// No new candles (weekend/holiday) — refresh TTL, reset fail_count
self.allocator().free(new_candles);
s.updateCandleMeta(symbol, m.last_close, m.last_date, result.provider, 0);
if (s.read(Candle, symbol, null, .any)) |r|
return .{ .data = r.data, .source = .cached, .timestamp = std.time.timestamp(), .allocator = self.allocator() };
} else {
// Append new candles to existing file + update meta, reset fail_count
s.appendCandles(symbol, new_candles, result.provider, 0);
if (s.read(Candle, symbol, null, .any)) |r| {
self.allocator().free(new_candles);
return .{ .data = r.data, .source = .fetched, .timestamp = std.time.timestamp(), .allocator = self.allocator() };
}
return .{ .data = new_candles, .source = .fetched, .timestamp = std.time.timestamp(), .allocator = self.allocator() };
}
}
}
}
// No usable cache — try server sync first
if (self.syncCandlesFromServer(symbol)) {
if (s.isCandleMetaFresh(symbol)) {
log.debug("{s}: candles synced from server and fresh (no prior cache)", .{symbol});
if (s.read(Candle, symbol, null, .any)) |r|
return .{ .data = r.data, .source = .cached, .timestamp = std.time.timestamp(), .allocator = self.allocator() };
}
log.debug("{s}: candles synced from server but stale, falling through to full fetch", .{symbol});
}
// No usable cache — full fetch (~10 years, plus buffer for leap years)
log.debug("{s}: fetching full candle history from provider", .{symbol});
const from = today.addDays(-3700);
const result = self.fetchCandlesFromProviders(symbol, from, today, .tiingo) catch |err| {
if (err == DataError.TransientError) {
// On a fresh fetch, increment fail_count if we have meta
if (meta_result) |mr| {
const new_fail_count = mr.meta.fail_count +| 1;
s.updateCandleMeta(symbol, mr.meta.last_close, mr.meta.last_date, mr.meta.provider, new_fail_count);
}
return DataError.TransientError;
}
s.writeNegative(symbol, .candles_daily);
return DataError.FetchFailed;
};
if (result.candles.len > 0) {
s.cacheCandles(symbol, result.candles, result.provider, 0); // reset fail_count on success
}
return .{ .data = result.candles, .source = .fetched, .timestamp = std.time.timestamp(), .allocator = self.allocator() };
}
/// Fetch dividend history for a symbol.
pub fn getDividends(self: *DataService, symbol: []const u8) DataError!FetchResult(Dividend) {
return self.fetchCached(Dividend, symbol, dividendPostProcess);
}
/// Fetch split history for a symbol.
pub fn getSplits(self: *DataService, symbol: []const u8) DataError!FetchResult(Split) {
return self.fetchCached(Split, symbol, null);
}
/// Fetch options chain for a symbol (all expirations, no API key needed).
pub fn getOptions(self: *DataService, symbol: []const u8) DataError!FetchResult(OptionsChain) {
return self.fetchCached(OptionsChain, symbol, null);
}
/// Fetch earnings history for a symbol.
/// Checks cache first; fetches from FMP if stale/missing.
/// Smart refresh: even if cache is fresh, re-fetches when a past earnings
/// date has no actual results yet (i.e. results just came out).
pub fn getEarnings(self: *DataService, symbol: []const u8) DataError!FetchResult(EarningsEvent) {
// Mutual funds (5-letter tickers ending in X) don't have quarterly earnings.
if (isMutualFund(symbol)) {
return .{ .data = &.{}, .source = .cached, .timestamp = std.time.timestamp(), .allocator = self.allocator() };
}
var s = self.store();
const today = fmt.todayDate();
if (s.read(EarningsEvent, symbol, earningsPostProcess, .fresh_only)) |cached| {
// Check if any past/today earnings event is still missing actual results.
// If so, the announcement likely just happened — force a refresh.
const needs_refresh = for (cached.data) |ev| {
if (ev.actual == null and !today.lessThan(ev.date)) break true;
} else false;
if (!needs_refresh) {
log.debug("{s}: earnings fresh in local cache", .{symbol});
return .{ .data = cached.data, .source = .cached, .timestamp = cached.timestamp, .allocator = self.allocator() };
}
// Stale: free cached events and re-fetch below
self.allocator().free(cached.data);
}
// Try server sync before hitting FMP
if (self.syncFromServer(symbol, .earnings)) {
if (s.read(EarningsEvent, symbol, earningsPostProcess, .fresh_only)) |cached| {
log.debug("{s}: earnings synced from server and fresh", .{symbol});
return .{ .data = cached.data, .source = .cached, .timestamp = cached.timestamp, .allocator = self.allocator() };
}
log.debug("{s}: earnings synced from server but stale, falling through to provider", .{symbol});
}
log.debug("{s}: fetching earnings from provider", .{symbol});
var fmp = try self.getProvider(Fmp);
const fetched = fmp.fetchEarnings(self.allocator(), symbol) catch |err| blk: {
if (err == error.RateLimited) {
self.rateLimitBackoff();
break :blk fmp.fetchEarnings(self.allocator(), symbol) catch {
return DataError.FetchFailed;
};
}
s.writeNegative(symbol, .earnings);
return DataError.FetchFailed;
};
s.write(EarningsEvent, symbol, fetched, cache.Ttl.earnings);
return .{ .data = fetched, .source = .fetched, .timestamp = std.time.timestamp(), .allocator = self.allocator() };
}
/// Fetch ETF profile for a symbol.
/// Checks cache first; fetches from Alpha Vantage if stale/missing.
pub fn getEtfProfile(self: *DataService, symbol: []const u8) DataError!FetchResult(EtfProfile) {
var s = self.store();
if (s.read(EtfProfile, symbol, null, .fresh_only)) |cached|
return .{ .data = cached.data, .source = .cached, .timestamp = cached.timestamp, .allocator = self.allocator() };
var av = try self.getProvider(AlphaVantage);
const fetched = av.fetchEtfProfile(self.allocator(), symbol) catch |err| blk: {
if (err == error.RateLimited) {
self.rateLimitBackoff();
break :blk av.fetchEtfProfile(self.allocator(), symbol) catch {
return DataError.FetchFailed;
};
}
s.writeNegative(symbol, .etf_profile);
return DataError.FetchFailed;
};
s.write(EtfProfile, symbol, fetched, cache.Ttl.etf_profile);
return .{ .data = fetched, .source = .fetched, .timestamp = std.time.timestamp(), .allocator = self.allocator() };
}
/// Fetch a real-time quote for a symbol.
/// Yahoo Finance is primary (free, no API key, no 15-min delay).
/// Falls back to TwelveData if Yahoo fails.
pub fn getQuote(self: *DataService, symbol: []const u8) DataError!Quote {
// Primary: Yahoo Finance (free, real-time)
if (self.getProvider(Yahoo)) |yh| {
if (yh.fetchQuote(self.allocator(), symbol)) |quote| {
log.debug("{s}: quote from Yahoo", .{symbol});
return quote;
} else |_| {}
} else |_| {}
// Fallback: TwelveData (requires API key, may be 15-min delayed)
var td = try self.getProvider(TwelveData);
log.debug("{s}: quote fallback to TwelveData", .{symbol});
return td.fetchQuote(self.allocator(), symbol) catch
return DataError.FetchFailed;
}
/// Fetch company overview (sector, industry, country, market cap) from Alpha Vantage.
/// No cache -- always fetches fresh. Caller must free the returned string fields.
pub fn getCompanyOverview(self: *DataService, symbol: []const u8) DataError!CompanyOverview {
var av = try self.getProvider(AlphaVantage);
return av.fetchCompanyOverview(self.allocator(), symbol) catch
return DataError.FetchFailed;
}
/// Compute trailing returns for a symbol (fetches candles + dividends).
/// Returns both as-of-date and month-end trailing returns.
/// As-of-date: end = latest close. Matches Morningstar "Trailing Returns" page.
/// Month-end: end = last business day of prior month. Matches Morningstar "Performance" page.
pub fn getTrailingReturns(self: *DataService, symbol: []const u8) DataError!struct {
asof_price: performance.TrailingReturns,
asof_total: ?performance.TrailingReturns,
me_price: performance.TrailingReturns,
me_total: ?performance.TrailingReturns,
candles: []Candle,
dividends: ?[]Dividend,
source: Source,
timestamp: i64,
} {
const candle_result = try self.getCandles(symbol);
const c = candle_result.data;
if (c.len == 0) return DataError.FetchFailed;
const today = fmt.todayDate();
// As-of-date (end = last candle)
const asof_price = performance.trailingReturns(c);
// Month-end (end = last business day of prior month)
const me_price = performance.trailingReturnsMonthEnd(c, today);
// Try to get dividends (non-fatal if unavailable)
var divs: ?[]Dividend = null;
var asof_total: ?performance.TrailingReturns = null;
var me_total: ?performance.TrailingReturns = null;
if (self.getDividends(symbol)) |div_result| {
divs = div_result.data;
const asof_div = performance.trailingReturnsWithDividends(c, div_result.data);
const me_div = performance.trailingReturnsMonthEndWithDividends(c, div_result.data, today);
// Dividend reinvestment is preferred (compounds correctly).
// adj_close fills gaps where dividend data is insufficient
// (e.g. stable-NAV funds with short candle history).
asof_total = performance.withDividendFallback(asof_div, asof_price);
me_total = performance.withDividendFallback(me_div, me_price);
} else |_| {}
return .{
.asof_price = asof_price,
.asof_total = asof_total,
.me_price = me_price,
.me_total = me_total,
.candles = c,
.dividends = divs,
.source = candle_result.source,
.timestamp = candle_result.timestamp,
};
}
/// Check if candle data is fresh in cache without full deserialization.
pub fn isCandleCacheFresh(self: *DataService, symbol: []const u8) bool {
var s = self.store();
return s.isCandleMetaFresh(symbol);
}
/// Read only the latest close price from cached candles (no full deserialization).
/// Returns null if no cached data exists.
pub fn getCachedLastClose(self: *DataService, symbol: []const u8) ?f64 {
var s = self.store();
return s.readLastClose(symbol);
}
/// Read the latest cached candle date for `symbol` without deserializing
/// the full candle history. Returns null if no cached metadata exists.
///
/// Callers should pair this with `isCandleCacheFresh` before trusting
/// the date: a stale cache entry can return a date from days or weeks
/// ago, which is fine for diagnostics but wrong for anything that
/// needs "the current market date".
pub fn getCachedLastDate(self: *DataService, symbol: []const u8) ?Date {
var s = self.store();
const mr = s.readCandleMeta(symbol) orelse return null;
return mr.meta.last_date;
}
/// Estimate wait time (in seconds) before the next TwelveData API call can proceed.
/// Returns 0 if a request can be made immediately. Returns null if no API key.
pub fn estimateWaitSeconds(self: *DataService) ?u64 {
if (self.td) |*td| {
const ns = td.rate_limiter.estimateWaitNs();
return if (ns == 0) 0 else @max(1, ns / std.time.ns_per_s);
}
return null;
}
/// Read candles from cache only (no network fetch). Used by TUI for display.
/// Returns null if no cached data exists or if the entry is a negative cache (fetch_failed).
///
/// Returns a `FetchResult(Candle)` so the caller can `result.deinit()`
/// without needing to know the service's internal allocator.
pub fn getCachedCandles(self: *DataService, symbol: []const u8) ?FetchResult(Candle) {
var s = self.store();
if (s.isNegative(symbol, .candles_daily)) return null;
const result = s.read(Candle, symbol, null, .any) orelse return null;
return .{ .data = result.data, .source = .cached, .timestamp = result.timestamp, .allocator = self.allocator() };
}
/// Read dividends from cache only (no network fetch).
pub fn getCachedDividends(self: *DataService, symbol: []const u8) ?[]Dividend {
var s = self.store();
const result = s.read(Dividend, symbol, dividendPostProcess, .any) orelse return null;
return result.data;
}
/// Read earnings from cache only (no network fetch).
pub fn getCachedEarnings(self: *DataService, symbol: []const u8) ?[]EarningsEvent {
var s = self.store();
const result = s.read(EarningsEvent, symbol, earningsPostProcess, .any) orelse return null;
return result.data;
}
/// Read options from cache only (no network fetch).
pub fn getCachedOptions(self: *DataService, symbol: []const u8) ?[]OptionsChain {
var s = self.store();
const result = s.read(OptionsChain, symbol, null, .any) orelse return null;
return result.data;
}
// ── Portfolio price loading ──────────────────────────────────
/// Result of loading prices for a set of symbols.
pub const PriceLoadResult = struct {
/// Number of symbols whose price came from fresh cache.
cached_count: usize,
/// Number of symbols successfully fetched from API.
fetched_count: usize,
/// Number of symbols where API fetch failed.
fail_count: usize,
/// Number of failed symbols that fell back to stale cache.
stale_count: usize,
/// Latest candle date seen across all symbols.
latest_date: ?Date,
};
/// Status emitted for each symbol during price loading.
pub const SymbolStatus = enum {
/// Price resolved from fresh cache.
cached,
/// About to attempt an API fetch (emitted before the network call).
fetching,
/// Price fetched successfully from API.
fetched,
/// API fetch failed but stale cached price was used.
failed_used_stale,
/// API fetch failed and no cached price exists.
failed,
};
/// Callback for progress reporting during price loading.
/// `context` is an opaque pointer to caller-owned state.
pub const ProgressCallback = struct {
context: *anyopaque,
on_progress: *const fn (ctx: *anyopaque, index: usize, total: usize, symbol: []const u8, status: SymbolStatus) void,
fn emit(self: ProgressCallback, index: usize, total: usize, symbol: []const u8, status: SymbolStatus) void {
self.on_progress(self.context, index, total, symbol, status);
}
};
/// Load current prices for a list of symbols into `prices`.
///
/// For each symbol the resolution order is:
/// 1. Fresh cache -> use cached last close (fast, no deserialization)
/// 2. API fetch -> use latest candle close
/// 3. Stale cache -> use last close from expired cache entry
///
/// If `force_refresh` is true, cache is invalidated before checking freshness.
/// If `progress` is provided, it is called for each symbol with the outcome.
pub fn loadPrices(
self: *DataService,
symbols: []const []const u8,
prices: *std.StringHashMap(f64),
force_refresh: bool,
progress: ?ProgressCallback,
) PriceLoadResult {
var result = PriceLoadResult{
.cached_count = 0,
.fetched_count = 0,
.fail_count = 0,
.stale_count = 0,
.latest_date = null,
};
const total = symbols.len;
for (symbols, 0..) |sym, i| {
if (force_refresh) {
self.invalidate(sym, .candles_daily);
}
// 1. Fresh cache — fast path (no full deserialization)
if (!force_refresh and self.isCandleCacheFresh(sym)) {
if (self.getCachedLastClose(sym)) |close| {
prices.put(sym, close) catch {};
}
result.cached_count += 1;
if (progress) |p| p.emit(i, total, sym, .cached);
continue;
}
// About to fetch — notify caller (so it can show rate-limit waits etc.)
if (progress) |p| p.emit(i, total, sym, .fetching);
// 2. Try API fetch
if (self.getCandles(sym)) |candle_result| {
defer self.allocator().free(candle_result.data);
if (candle_result.data.len > 0) {
const last = candle_result.data[candle_result.data.len - 1];
prices.put(sym, last.close) catch {};
if (result.latest_date == null or last.date.days > result.latest_date.?.days) {
result.latest_date = last.date;
}
}
result.fetched_count += 1;
if (progress) |p| p.emit(i, total, sym, .fetched);
continue;
} else |_| {}
// 3. Fetch failed — fall back to stale cache
result.fail_count += 1;
if (self.getCachedLastClose(sym)) |close| {
prices.put(sym, close) catch {};
result.stale_count += 1;
if (progress) |p| p.emit(i, total, sym, .failed_used_stale);
} else {
if (progress) |p| p.emit(i, total, sym, .failed);
}
}
return result;
}
// ── Consolidated Price Loading (Parallel Server + Sequential Provider) ──
/// Configuration for loadAllPrices.
pub const LoadAllConfig = struct {
force_refresh: bool = false,
color: bool = true,
/// Maximum concurrent server sync requests. 0 = auto (8).
max_concurrent: usize = 0,
};
/// Result of loadAllPrices operation.
pub const LoadAllResult = struct {
prices: std.StringHashMap(f64),
/// Number of symbols resolved from fresh local cache.
cached_count: usize,
/// Number of symbols synced from server.
server_synced_count: usize,
/// Number of symbols fetched from providers (rate-limited APIs).
provider_fetched_count: usize,
/// Number of symbols that failed all sources but used stale cache.
stale_count: usize,
/// Number of symbols that failed completely (no data).
failed_count: usize,
/// Latest candle date seen.
latest_date: ?Date,
/// Free the prices hashmap. Call this if you don't transfer ownership.
pub fn deinit(self: *LoadAllResult) void {
self.prices.deinit();
}
};
/// Progress callback for aggregate (parallel) progress reporting.
/// Called periodically during parallel operations with current counts.
pub const AggregateProgressCallback = struct {
context: *anyopaque,
on_progress: *const fn (ctx: *anyopaque, completed: usize, total: usize, phase: Phase) void,
pub const Phase = enum {
/// Checking local cache
cache_check,
/// Syncing from ZFIN_SERVER
server_sync,
/// Fetching from rate-limited providers
provider_fetch,
/// Done
complete,
};
fn emit(self: AggregateProgressCallback, completed: usize, total: usize, phase: Phase) void {
self.on_progress(self.context, completed, total, phase);
}
};
/// Thread-safe counter for parallel progress tracking.
const AtomicCounter = struct {
value: std.atomic.Value(usize) = std.atomic.Value(usize).init(0),
fn increment(self: *AtomicCounter) usize {
return self.value.fetchAdd(1, .monotonic);
}
fn load(self: *const AtomicCounter) usize {
return self.value.load(.monotonic);
}
};
/// Per-symbol result from parallel server sync.
const ServerSyncResult = struct {
symbol: []const u8,
success: bool,
};
/// Load prices for portfolio and watchlist symbols with automatic parallelization.
///
/// When ZFIN_SERVER is configured:
/// 1. Check local cache (fast, parallel-safe)
/// 2. Parallel sync from server for cache misses
/// 3. Sequential provider fallback for server failures
///
/// When ZFIN_SERVER is not configured:
/// Falls back to sequential loading with per-symbol progress.
///
/// Progress is reported via `aggregate_progress` for parallel phases
/// and `symbol_progress` for sequential provider fallback.
pub fn loadAllPrices(
self: *DataService,
portfolio_syms: ?[]const []const u8,
watch_syms: []const []const u8,
config: LoadAllConfig,
aggregate_progress: ?AggregateProgressCallback,
symbol_progress: ?ProgressCallback,
) LoadAllResult {
var result = LoadAllResult{
.prices = std.StringHashMap(f64).init(self.allocator()),
.cached_count = 0,
.server_synced_count = 0,
.provider_fetched_count = 0,
.stale_count = 0,
.failed_count = 0,
.latest_date = null,
};
// Combine all symbols
const portfolio_count = if (portfolio_syms) |ps| ps.len else 0;
const watch_count = watch_syms.len;
const total_count = portfolio_count + watch_count;
if (total_count == 0) return result;
// Build combined symbol list
var all_symbols = std.ArrayList([]const u8).initCapacity(self.allocator(), total_count) catch return result;
defer all_symbols.deinit(self.allocator());
if (portfolio_syms) |ps| {
for (ps) |sym| all_symbols.append(self.allocator(), sym) catch {};
}
for (watch_syms) |sym| all_symbols.append(self.allocator(), sym) catch {};
// Invalidate cache if force refresh
if (config.force_refresh) {
for (all_symbols.items) |sym| {
self.invalidate(sym, .candles_daily);
}
}
// Phase 1: Check local cache (fast path)
var needs_fetch: std.ArrayList([]const u8) = .empty;
defer needs_fetch.deinit(self.allocator());
if (aggregate_progress) |p| p.emit(0, total_count, .cache_check);
for (all_symbols.items) |sym| {
if (!config.force_refresh and self.isCandleCacheFresh(sym)) {
if (self.getCachedLastClose(sym)) |close| {
result.prices.put(sym, close) catch {};
self.updateLatestDate(&result, sym);
}
result.cached_count += 1;
} else {
needs_fetch.append(self.allocator(), sym) catch {};
}
}
if (aggregate_progress) |p| p.emit(result.cached_count, total_count, .cache_check);
if (needs_fetch.items.len == 0) {
if (aggregate_progress) |p| p.emit(total_count, total_count, .complete);
return result;
}
// Phase 2: Server sync (parallel if server configured)
var server_failures: std.ArrayList([]const u8) = .empty;
defer server_failures.deinit(self.allocator());
if (self.config.server_url != null) {
self.parallelServerSync(
needs_fetch.items,
&result,
&server_failures,
config,
aggregate_progress,
total_count,
);
} else {
// No server — all need provider fetch
for (needs_fetch.items) |sym| {
server_failures.append(self.allocator(), sym) catch {};
}
}
// Phase 3: Sequential provider fallback for server failures
if (server_failures.items.len > 0) {
if (aggregate_progress) |p| p.emit(
result.cached_count + result.server_synced_count,
total_count,
.provider_fetch,
);
self.sequentialProviderFetch(
server_failures.items,
&result,
symbol_progress,
total_count - server_failures.items.len, // offset for progress display
);
}
if (aggregate_progress) |p| p.emit(total_count, total_count, .complete);
return result;
}
/// Parallel server sync using thread pool.
fn parallelServerSync(
self: *DataService,
symbols: []const []const u8,
result: *LoadAllResult,
failures: *std.ArrayList([]const u8),
config: LoadAllConfig,
aggregate_progress: ?AggregateProgressCallback,
total_count: usize,
) void {
const max_threads = if (config.max_concurrent > 0) config.max_concurrent else 8;
const thread_count = @min(symbols.len, max_threads);
if (aggregate_progress) |p| p.emit(result.cached_count, total_count, .server_sync);
// Shared state for worker threads
var completed = AtomicCounter{};
var next_index = AtomicCounter{};
const sync_results = self.allocator().alloc(ServerSyncResult, symbols.len) catch {
// Allocation failed — fall back to marking all as failures
for (symbols) |sym| failures.append(self.allocator(), sym) catch {};
return;
};
defer self.allocator().free(sync_results);
// Initialize results
for (sync_results, 0..) |*sr, i| {
sr.* = .{ .symbol = symbols[i], .success = false };
}
// Spawn worker threads
var threads = self.allocator().alloc(std.Thread, thread_count) catch {
for (symbols) |sym| failures.append(self.allocator(), sym) catch {};
return;
};
defer self.allocator().free(threads);
const WorkerContext = struct {
svc: *DataService,
symbols: []const []const u8,
results: []ServerSyncResult,
next_index: *AtomicCounter,
completed: *AtomicCounter,
};
var ctx = WorkerContext{
.svc = self,
.symbols = symbols,
.results = sync_results,
.next_index = &next_index,
.completed = &completed,
};
const worker = struct {
fn run(wctx: *WorkerContext) void {
while (true) {
const idx = wctx.next_index.increment();
if (idx >= wctx.symbols.len) break;
const sym = wctx.symbols[idx];
const success = wctx.svc.syncCandlesFromServer(sym);
wctx.results[idx].success = success;
_ = wctx.completed.increment();
}
}
};
// Start threads
var spawned: usize = 0;
for (threads) |*t| {
t.* = std.Thread.spawn(.{}, worker.run, .{&ctx}) catch continue;
spawned += 1;
}
// Progress reporting while waiting
if (aggregate_progress) |p| {
while (completed.load() < symbols.len) {
std.Thread.sleep(50 * std.time.ns_per_ms);
p.emit(result.cached_count + completed.load(), total_count, .server_sync);
}
}
// Wait for all threads
for (threads[0..spawned]) |t| {
t.join();
}
// Process results
for (sync_results) |sr| {
if (sr.success) {
// Server sync succeeded — read from cache
if (self.getCachedLastClose(sr.symbol)) |close| {
result.prices.put(sr.symbol, close) catch {};
self.updateLatestDate(result, sr.symbol);
result.server_synced_count += 1;
} else {
// Sync said success but can't read cache — treat as failure
failures.append(self.allocator(), sr.symbol) catch {};
}
} else {
failures.append(self.allocator(), sr.symbol) catch {};
}
}
}
/// Sequential provider fetch for symbols that failed server sync.
fn sequentialProviderFetch(
self: *DataService,
symbols: []const []const u8,
result: *LoadAllResult,
progress: ?ProgressCallback,
index_offset: usize,
) void {
const total = index_offset + symbols.len;
for (symbols, 0..) |sym, i| {
const display_idx = index_offset + i;
// Notify: about to fetch
if (progress) |p| p.emit(display_idx, total, sym, .fetching);
// Try provider fetch
if (self.getCandles(sym)) |candle_result| {
defer self.allocator().free(candle_result.data);
if (candle_result.data.len > 0) {
const last = candle_result.data[candle_result.data.len - 1];
result.prices.put(sym, last.close) catch {};
if (result.latest_date == null or last.date.days > result.latest_date.?.days) {
result.latest_date = last.date;
}
}
result.provider_fetched_count += 1;
if (progress) |p| p.emit(display_idx, total, sym, .fetched);
continue;
} else |_| {}
// Provider failed — try stale cache
result.failed_count += 1;
if (self.getCachedLastClose(sym)) |close| {
result.prices.put(sym, close) catch {};
result.stale_count += 1;
if (progress) |p| p.emit(display_idx, total, sym, .failed_used_stale);
} else {
if (progress) |p| p.emit(display_idx, total, sym, .failed);
}
}
}
/// Update latest_date in result from cached candle metadata.
fn updateLatestDate(self: *DataService, result: *LoadAllResult, symbol: []const u8) void {
var s = self.store();
if (s.readCandleMeta(symbol)) |cm| {
const d = cm.meta.last_date;
if (result.latest_date == null or d.days > result.latest_date.?.days) {
result.latest_date = d;
}
}
}
// ── CUSIP Resolution ──────────────────────────────────────────
/// Look up multiple CUSIPs in a single batch request via OpenFIGI.
/// Results array is parallel to the input cusips array (same length, same order).
/// Caller owns the returned slice and all strings within each CusipResult.
pub fn lookupCusips(self: *DataService, cusips: []const []const u8) DataError![]CusipResult {
return OpenFigi.lookupCusips(self.allocator(), cusips, self.config.openfigi_key) catch
return DataError.FetchFailed;
}
/// Look up a CUSIP via OpenFIGI API. Returns the ticker if found, null otherwise.
/// Results are cached in {cache_dir}/cusip_tickers.srf.
/// Caller owns the returned string.
pub fn lookupCusip(self: *DataService, cusip: []const u8) ?[]const u8 {
// Check local cache first
if (self.getCachedCusipTicker(cusip)) |t| return t;
// Try OpenFIGI
const result = OpenFigi.lookupCusip(self.allocator(), cusip, self.config.openfigi_key) catch return null;
defer {
if (result.name) |n| self.allocator().free(n);
if (result.security_type) |s| self.allocator().free(s);
}
if (result.ticker) |ticker| {
// Cache the mapping
self.cacheCusipTicker(cusip, ticker);
return ticker; // caller takes ownership
}
return null;
}
/// A single CUSIP-to-ticker mapping record in the cache file.
const CusipEntry = struct {
cusip: []const u8 = "",
ticker: []const u8 = "",
};
/// Read a cached CUSIP->ticker mapping. Returns null if not cached.
/// Caller owns the returned string.
fn getCachedCusipTicker(self: *DataService, cusip: []const u8) ?[]const u8 {
const path = std.fs.path.join(self.allocator(), &.{ self.config.cache_dir, "cusip_tickers.srf" }) catch return null;
defer self.allocator().free(path);
const data = std.fs.cwd().readFileAlloc(self.allocator(), path, 64 * 1024) catch return null;
defer self.allocator().free(data);
var reader = std.Io.Reader.fixed(data);
var it = srf.iterator(&reader, self.allocator(), .{ .alloc_strings = false }) catch return null;
defer it.deinit();
while (it.next() catch return null) |fields| {
const entry = fields.to(CusipEntry) catch continue;
if (std.mem.eql(u8, entry.cusip, cusip) and entry.ticker.len > 0) {
return self.allocator().dupe(u8, entry.ticker) catch null;
}
}
return null;
}
/// Append a CUSIP->ticker mapping to the cache file.
///
/// Implemented as read-append-atomic-write (rather than a direct
/// open-for-append) so a concurrent reader never sees a file with a
/// valid header plus partial trailing record. See `cache/store.zig
/// appendRaw` for the same pattern and rationale.
pub fn cacheCusipTicker(self: *DataService, cusip: []const u8, ticker: []const u8) void {
const path = std.fs.path.join(self.allocator(), &.{ self.config.cache_dir, "cusip_tickers.srf" }) catch return;
defer self.allocator().free(path);
// Ensure cache dir exists
if (std.fs.path.dirnamePosix(path)) |dir| {
std.fs.cwd().makePath(dir) catch {};
}
// Read existing cache if present.
const existing = std.fs.cwd().readFileAlloc(self.allocator(), path, 4 * 1024 * 1024) catch |err| switch (err) {
error.FileNotFound => @as([]u8, &.{}),
else => return,
};
const owns_existing = existing.len > 0;
defer if (owns_existing) self.allocator().free(existing);
// Serialize the new entry (with `#!srfv1` directives only if the
// cache file doesn't exist yet).
const emit_directives = !owns_existing;
const entry = [_]CusipEntry{.{ .cusip = cusip, .ticker = ticker }};
var aw: std.Io.Writer.Allocating = .init(self.allocator());
defer aw.deinit();
aw.writer.print("{f}", .{srf.fmtFrom(CusipEntry, self.allocator(), &entry, .{ .emit_directives = emit_directives })}) catch return;
const encoded = aw.writer.buffered();
if (encoded.len == 0) return;
// Concat existing + new, then atomic-write.
const combined = self.allocator().alloc(u8, existing.len + encoded.len) catch return;
defer self.allocator().free(combined);
@memcpy(combined[0..existing.len], existing);
@memcpy(combined[existing.len..], encoded);
atomic.writeFileAtomic(self.allocator(), path, combined) catch {};
}
// ── Utility ──────────────────────────────────────────────────
/// Sleep before retrying after a rate limit error.
/// Uses the provider's rate limiter if available, otherwise a fixed 10s backoff.
fn rateLimitBackoff(self: *DataService) void {
if (self.td) |*td| {
td.rate_limiter.backoff();
} else {
std.Thread.sleep(10 * std.time.ns_per_s);
}
}
// ── Server sync ──────────────────────────────────────────────
/// Try to sync a cache file from the configured zfin-server.
/// Returns true if the file was successfully synced, false on any error.
/// Silently returns false if no server is configured.
fn syncFromServer(self: *DataService, symbol: []const u8, data_type: cache.DataType) bool {
const server_url = self.config.server_url orelse return false;
const endpoint = switch (data_type) {
.candles_daily => "/candles",
.candles_meta => "/candles_meta",
.dividends => "/dividends",
.earnings => "/earnings",
.options => "/options",
.splits => return false, // not served
.etf_profile => return false, // not served
.meta => return false,
};
const full_url = std.fmt.allocPrint(self.allocator(), "{s}/{s}{s}", .{ server_url, symbol, endpoint }) catch return false;
defer self.allocator().free(full_url);
log.debug("{s}: syncing {s} from server", .{ symbol, @tagName(data_type) });
var client = http.Client.init(self.allocator());
defer client.deinit();
var response = client.get(full_url) catch |err| {
log.debug("{s}: server sync failed for {s}: {s}", .{ symbol, @tagName(data_type), @errorName(err) });
return false;
};
defer response.deinit();
// Validate the response body looks like a complete SRF file before
// writing it to cache. This guards against HTTP body truncation
// (TCP reset, Content-Length mismatch, proxy that flushed a
// partial response, etc.) — torn bodies get written atomically
// to the cache otherwise, producing the classic SRF parse error
// on the next read:
// error(srf): custom parse of value YYYY-MM failed : InvalidDateFormat
//
// When the check rejects a body, archive the raw bytes + context
// under `{cache_dir}/_torn/` so the next time this recurs we
// have ammunition for root-cause analysis. The log line is kept
// at debug level on purpose — user explicitly asked that routine
// rejections not be noisy in production runs. The `.meta`
// sidecar on disk is the durable signal.
if (!cache.Store.looksCompleteSrf(response.body)) {
cache.Store.archiveTornBody(
self.allocator(),
self.config.cache_dir,
symbol,
data_type,
response.body,
.{
.failure_reason = .looks_complete_srf_failed,
.http_status = @intFromEnum(response.status),
.server_url = full_url,
},
) catch |err| {
log.debug(
"{s}: failed to archive torn {s} body: {s}",
.{ symbol, @tagName(data_type), @errorName(err) },
);
};
log.debug(
"{s}: rejecting torn {s} server response ({d} bytes) — archived under _torn/, not writing to cache",
.{ symbol, @tagName(data_type), response.body.len },
);
return false;
}
// Write to local cache
var s = self.store();
s.writeRaw(symbol, data_type, response.body) catch |err| {
log.debug("{s}: failed to write synced {s} to cache: {s}", .{ symbol, @tagName(data_type), @errorName(err) });
return false;
};
log.debug("{s}: synced {s} from server ({d} bytes)", .{ symbol, @tagName(data_type), response.body.len });
return true;
}
/// Sync candle data (both daily and meta) from the server.
fn syncCandlesFromServer(self: *DataService, symbol: []const u8) bool {
const daily = self.syncFromServer(symbol, .candles_daily);
const meta = self.syncFromServer(symbol, .candles_meta);
return daily and meta;
}
/// Mutual funds use 5-letter tickers ending in X (e.g. FDSCX, VSTCX, FAGIX).
/// These don't have quarterly earnings — skip the fetch rather than
/// round-tripping to the provider just to get an empty response.
fn isMutualFund(symbol: []const u8) bool {
return symbol.len == 5 and symbol[4] == 'X';
}
// ── User config files ─────────────────────────────────────────
/// Load and parse accounts.srf from the same directory as the given portfolio path.
/// Returns null if the file doesn't exist or can't be parsed.
/// Caller owns the returned AccountMap and must call deinit().
pub fn loadAccountMap(self: *DataService, portfolio_path: []const u8) ?analysis.AccountMap {
const dir_end = if (std.mem.lastIndexOfScalar(u8, portfolio_path, std.fs.path.sep)) |idx| idx + 1 else 0;
const acct_path = std.fmt.allocPrint(self.allocator(), "{s}accounts.srf", .{portfolio_path[0..dir_end]}) catch return null;
defer self.allocator().free(acct_path);
const data = std.fs.cwd().readFileAlloc(self.allocator(), acct_path, 1024 * 1024) catch return null;
defer self.allocator().free(data);
return analysis.parseAccountsFile(self.allocator(), data) catch null;
}
/// Load and parse `transaction_log.srf` from the same directory as
/// the given portfolio path. Returns null if the file doesn't
/// exist or can't be parsed — the contributions pipeline falls
/// back to the pre-transaction-log behavior (no transfer netting)
/// when null is returned.
///
/// Caller owns the returned `TransactionLog` and must call
/// `deinit()`.
pub fn loadTransferLog(self: *DataService, portfolio_path: []const u8) ?transaction_log.TransactionLog {
const dir_end = if (std.mem.lastIndexOfScalar(u8, portfolio_path, std.fs.path.sep)) |idx| idx + 1 else 0;
const path = std.fmt.allocPrint(self.allocator(), "{s}transaction_log.srf", .{portfolio_path[0..dir_end]}) catch return null;
defer self.allocator().free(path);
const data = std.fs.cwd().readFileAlloc(self.allocator(), path, 1024 * 1024) catch return null;
defer self.allocator().free(data);
return transaction_log.parseTransactionLogFile(self.allocator(), data) catch null;
}
};
// ── Tests ─────────────────────────────────────────────────────────
test "isMutualFund identifies mutual funds" {
// Standard mutual fund tickers (5 letters ending in X)
try std.testing.expect(DataService.isMutualFund("FDSCX"));
try std.testing.expect(DataService.isMutualFund("VSTCX"));
try std.testing.expect(DataService.isMutualFund("FAGIX"));
try std.testing.expect(DataService.isMutualFund("VFINX"));
// Not mutual funds
try std.testing.expect(!DataService.isMutualFund("AAPL"));
try std.testing.expect(!DataService.isMutualFund("VTI"));
try std.testing.expect(!DataService.isMutualFund("SPY"));
try std.testing.expect(!DataService.isMutualFund("GOOGL"));
try std.testing.expect(!DataService.isMutualFund("")); // empty
try std.testing.expect(!DataService.isMutualFund("X")); // too short
try std.testing.expect(!DataService.isMutualFund("FDSCA")); // 5 letters but not ending in X
try std.testing.expect(!DataService.isMutualFund("FDSCXA")); // 6 letters ending in A
}
test "DataService init/deinit lifecycle" {
const allocator = std.testing.allocator;
const config = Config{
.cache_dir = "/tmp/zfin-test-cache",
};
var svc = DataService.init(allocator, config);
defer svc.deinit();
// Should be able to access config
try std.testing.expectEqualStrings("/tmp/zfin-test-cache", svc.config.cache_dir);
// Providers should be null (lazy init)
try std.testing.expect(svc.td == null);
try std.testing.expect(svc.pg == null);
try std.testing.expect(svc.fmp == null);
try std.testing.expect(svc.yh == null);
try std.testing.expect(svc.tg == null);
}
test "DataService store helper creates valid store" {
const allocator = std.testing.allocator;
const config = Config{
.cache_dir = "/tmp/zfin-test-cache",
};
var svc = DataService.init(allocator, config);
defer svc.deinit();
const s = svc.store();
try std.testing.expectEqualStrings("/tmp/zfin-test-cache", s.cache_dir);
}
test "DataService getProvider returns NoApiKey without key" {
const allocator = std.testing.allocator;
const config = Config{
.cache_dir = "/tmp/zfin-test-cache",
// No API keys set
};
var svc = DataService.init(allocator, config);
defer svc.deinit();
// TwelveData requires API key
const td_result = svc.getProvider(TwelveData);
try std.testing.expectError(DataError.NoApiKey, td_result);
// Polygon requires API key
const pg_result = svc.getProvider(Polygon);
try std.testing.expectError(DataError.NoApiKey, pg_result);
// Yahoo doesn't require API key
const yh_result = svc.getProvider(Yahoo);
try std.testing.expect(yh_result != error.NoApiKey);
}
test "DataService getProvider initializes provider with key" {
const allocator = std.testing.allocator;
const config = Config{
.cache_dir = "/tmp/zfin-test-cache",
.tiingo_key = "test-tiingo-key",
};
var svc = DataService.init(allocator, config);
defer svc.deinit();
// First call initializes
const tg1 = try svc.getProvider(Tiingo);
try std.testing.expect(svc.tg != null);
// Second call returns same instance
const tg2 = try svc.getProvider(Tiingo);
try std.testing.expect(tg1 == tg2);
}
test "DataService PriceLoadResult default values" {
const result = DataService.PriceLoadResult{
.cached_count = 0,
.fetched_count = 0,
.fail_count = 0,
.stale_count = 0,
.latest_date = null,
};
try std.testing.expectEqual(@as(usize, 0), result.cached_count);
try std.testing.expect(result.latest_date == null);
}
test "DataService LoadAllResult default values" {
const allocator = std.testing.allocator;
var result = DataService.LoadAllResult{
.prices = std.StringHashMap(f64).init(allocator),
.cached_count = 0,
.server_synced_count = 0,
.provider_fetched_count = 0,
.stale_count = 0,
.failed_count = 0,
.latest_date = null,
};
defer result.deinit();
try std.testing.expectEqual(@as(usize, 0), result.prices.count());
}
test "FetchResult type construction" {
// Verify FetchResult works for different types
const candle_result = FetchResult(Candle){
.data = &.{},
.source = .cached,
.timestamp = 0,
.allocator = std.testing.allocator,
};
try std.testing.expect(candle_result.source == .cached);
const div_result = FetchResult(Dividend){
.data = &.{},
.source = .fetched,
.timestamp = 12345,
.allocator = std.testing.allocator,
};
try std.testing.expect(div_result.source == .fetched);
try std.testing.expectEqual(@as(i64, 12345), div_result.timestamp);
}