more measured fallback and logging
All checks were successful
Generic zig build / build (push) Successful in 31s

This commit is contained in:
Emil Lerch 2026-03-24 18:57:22 -07:00
parent ee9d749a6f
commit ecadfb492d
Signed by: lobo
GPG key ID: A7B62D657EF764F8
2 changed files with 157 additions and 72 deletions

15
src/cache/store.zig vendored
View file

@ -215,7 +215,7 @@ pub const Store = struct {
/// Write a full set of candles to cache (no expiry historical facts don't expire). /// Write a full set of candles to cache (no expiry historical facts don't expire).
/// Also updates candle metadata. /// Also updates candle metadata.
pub fn cacheCandles(self: *Store, symbol: []const u8, candles: []const Candle, provider: CandleProvider) void { pub fn cacheCandles(self: *Store, symbol: []const u8, candles: []const Candle, provider: CandleProvider, fail_count: u8) void {
if (serializeCandles(self.allocator, candles, .{})) |srf_data| { if (serializeCandles(self.allocator, candles, .{})) |srf_data| {
defer self.allocator.free(srf_data); defer self.allocator.free(srf_data);
self.writeRaw(symbol, .candles_daily, srf_data) catch |err| { self.writeRaw(symbol, .candles_daily, srf_data) catch |err| {
@ -227,14 +227,14 @@ pub const Store = struct {
if (candles.len > 0) { if (candles.len > 0) {
const last = candles[candles.len - 1]; const last = candles[candles.len - 1];
self.updateCandleMeta(symbol, last.close, last.date, provider); self.updateCandleMeta(symbol, last.close, last.date, provider, fail_count);
} }
} }
/// Append new candle records to the existing cache file. /// Append new candle records to the existing cache file.
/// Falls back to a full rewrite if append fails (e.g. file doesn't exist). /// Falls back to a full rewrite if append fails (e.g. file doesn't exist).
/// Also updates candle metadata. /// Also updates candle metadata.
pub fn appendCandles(self: *Store, symbol: []const u8, new_candles: []const Candle, provider: CandleProvider) void { pub fn appendCandles(self: *Store, symbol: []const u8, new_candles: []const Candle, provider: CandleProvider, fail_count: u8) void {
if (new_candles.len == 0) return; if (new_candles.len == 0) return;
if (serializeCandles(self.allocator, new_candles, .{ .emit_directives = false })) |srf_data| { if (serializeCandles(self.allocator, new_candles, .{ .emit_directives = false })) |srf_data| {
@ -263,16 +263,17 @@ pub const Store = struct {
} }
const last = new_candles[new_candles.len - 1]; const last = new_candles[new_candles.len - 1];
self.updateCandleMeta(symbol, last.close, last.date, provider); self.updateCandleMeta(symbol, last.close, last.date, provider, fail_count);
} }
/// Write (or refresh) candle metadata with a specific provider source. /// Write (or refresh) candle metadata with a specific provider source.
pub fn updateCandleMeta(self: *Store, symbol: []const u8, last_close: f64, last_date: Date, provider: CandleProvider) void { pub fn updateCandleMeta(self: *Store, symbol: []const u8, last_close: f64, last_date: Date, provider: CandleProvider, fail_count: u8) void {
const expires = std.time.timestamp() + Ttl.candles_latest; const expires = std.time.timestamp() + Ttl.candles_latest;
const meta = CandleMeta{ const meta = CandleMeta{
.last_close = last_close, .last_close = last_close,
.last_date = last_date, .last_date = last_date,
.provider = provider, .provider = provider,
.fail_count = fail_count,
}; };
if (serializeCandleMeta(self.allocator, meta, .{ .expires = expires })) |meta_data| { if (serializeCandleMeta(self.allocator, meta, .{ .expires = expires })) |meta_data| {
defer self.allocator.free(meta_data); defer self.allocator.free(meta_data);
@ -395,6 +396,10 @@ pub const Store = struct {
/// Which provider sourced the candle data. Used during incremental refresh /// Which provider sourced the candle data. Used during incremental refresh
/// to go directly to the right provider instead of trying Tiingo first. /// to go directly to the right provider instead of trying Tiingo first.
provider: CandleProvider = .tiingo, provider: CandleProvider = .tiingo,
/// Consecutive transient failure count for the primary provider (Tiingo).
/// Incremented on ServerError; reset to 0 on success. When >= 3, the
/// symbol is degraded to a fallback provider until Tiingo recovers.
fail_count: u8 = 0,
}; };
pub const CandleProvider = enum { pub const CandleProvider = enum {

View file

@ -39,6 +39,11 @@ pub const DataError = error{
CacheError, CacheError,
ParseError, ParseError,
OutOfMemory, OutOfMemory,
/// Transient provider failure (server error, connection issue).
/// Caller should stop and retry later.
TransientError,
/// Provider auth failure (bad API key). Entire refresh should stop.
AuthError,
}; };
/// Re-exported provider types needed by commands via DataService. /// Re-exported provider types needed by commands via DataService.
@ -263,45 +268,104 @@ pub const DataService = struct {
// Public data methods // Public data methods
/// Fetch candles from providers with fallback logic. /// Fetch candles from providers with error classification.
/// Tries the provider recorded in meta (if any), then Tiingo (primary), then TwelveData, then Yahoo. ///
/// Returns the candles and which provider succeeded. /// Error handling:
/// - ServerError/RateLimited/RequestFailed from Tiingo TransientError (stop refresh, retry later)
/// - NotFound/ParseError/InvalidResponse from Tiingo try Yahoo (symbol-level issue)
/// - Unauthorized TransientError (config problem, stop refresh)
///
/// The `preferred` param controls incremental fetch consistency: use the same
/// provider that sourced the existing cache data.
fn fetchCandlesFromProviders( fn fetchCandlesFromProviders(
self: *DataService, self: *DataService,
symbol: []const u8, symbol: []const u8,
from: Date, from: Date,
to: Date, to: Date,
preferred: cache.Store.CandleProvider, preferred: cache.Store.CandleProvider,
) !struct { candles: []Candle, provider: cache.Store.CandleProvider } { ) (DataError || error{NotFound})!struct { candles: []Candle, provider: cache.Store.CandleProvider } {
// If preferred is Yahoo, try it first // If preferred is Yahoo (degraded symbol), try Yahoo first
if (preferred == .yahoo) { if (preferred == .yahoo) {
if (self.getProvider(Yahoo)) |yh| { if (self.getProvider(Yahoo)) |yh| {
if (yh.fetchCandles(self.allocator, symbol, from, to)) |candles| { if (yh.fetchCandles(self.allocator, symbol, from, to)) |candles| {
log.debug("{s}: candles from Yahoo (preferred)", .{symbol}); log.debug("{s}: candles from Yahoo (preferred)", .{symbol});
return .{ .candles = candles, .provider = .yahoo }; return .{ .candles = candles, .provider = .yahoo };
} else |_| {} } else |err| {
log.warn("{s}: Yahoo (preferred) failed: {s}", .{ symbol, @errorName(err) });
}
} else |_| {} } else |_| {}
} }
// Primary: Tiingo (1000 req/day, no per-minute limit, adj_close includes dividends) // Primary: Tiingo
if (self.getProvider(Tiingo)) |tg| { if (self.getProvider(Tiingo)) |tg| {
if (tg.fetchCandles(self.allocator, symbol, from, to)) |candles| { if (tg.fetchCandles(self.allocator, symbol, from, to)) |candles| {
log.debug("{s}: candles from Tiingo", .{symbol}); log.debug("{s}: candles from Tiingo", .{symbol});
return .{ .candles = candles, .provider = .tiingo }; return .{ .candles = candles, .provider = .tiingo };
} else |_| {} } else |err| {
} else |_| {} log.warn("{s}: Tiingo failed: {s}", .{ symbol, @errorName(err) });
// Fallback: Yahoo (if not already tried as preferred) if (err == error.Unauthorized) {
log.err("{s}: Tiingo auth failed — check TIINGO_API_KEY", .{symbol});
return DataError.AuthError;
}
if (err == error.RateLimited) {
// Rate limited: back off and retry this is expected, not a failure
log.info("{s}: Tiingo rate limited, backing off", .{symbol});
self.rateLimitBackoff();
if (tg.fetchCandles(self.allocator, symbol, from, to)) |candles| {
log.debug("{s}: candles from Tiingo (after rate limit backoff)", .{symbol});
return .{ .candles = candles, .provider = .tiingo };
} else |retry_err| {
log.warn("{s}: Tiingo retry after backoff failed: {s}", .{ symbol, @errorName(retry_err) });
if (retry_err == error.RateLimited) {
// Still rate limited after backoff one more try
self.rateLimitBackoff();
if (tg.fetchCandles(self.allocator, symbol, from, to)) |candles| {
log.debug("{s}: candles from Tiingo (after second backoff)", .{symbol});
return .{ .candles = candles, .provider = .tiingo };
} else |_| {}
}
// Exhausted rate limit retries treat as transient
return DataError.TransientError;
}
}
if (isTransientError(err)) {
// Server error or connection failure stop, don't fall back
return DataError.TransientError;
}
// NotFound, ParseError, InvalidResponse symbol-level issue, try Yahoo
log.info("{s}: Tiingo does not have this symbol, trying Yahoo", .{symbol});
}
} else |_| {
log.warn("{s}: Tiingo provider not available (no API key?)", .{symbol});
}
// Fallback: Yahoo (symbol not on Tiingo)
if (preferred != .yahoo) { if (preferred != .yahoo) {
if (self.getProvider(Yahoo)) |yh| { if (self.getProvider(Yahoo)) |yh| {
if (yh.fetchCandles(self.allocator, symbol, from, to)) |candles| { if (yh.fetchCandles(self.allocator, symbol, from, to)) |candles| {
log.debug("{s}: candles from Yahoo (fallback)", .{symbol}); log.info("{s}: candles from Yahoo (Tiingo fallback)", .{symbol});
return .{ .candles = candles, .provider = .yahoo }; return .{ .candles = candles, .provider = .yahoo };
} else |_| {} } else |err| {
} else |_| {} log.warn("{s}: Yahoo fallback also failed: {s}", .{ symbol, @errorName(err) });
}
} else |_| {
log.warn("{s}: Yahoo provider not available", .{symbol});
}
} }
return error.FetchFailed; return DataError.FetchFailed;
}
/// Classify whether a provider error is transient (provider is down).
/// ServerError = HTTP 5xx, RequestFailed = connection/network failure.
/// Note: RateLimited and Unauthorized are handled separately.
fn isTransientError(err: anyerror) bool {
return err == error.ServerError or
err == error.RequestFailed;
} }
/// Fetch daily candles for a symbol (10+ years for trailing returns). /// Fetch daily candles for a symbol (10+ years for trailing returns).
@ -315,22 +379,19 @@ pub const DataService = struct {
// Check candle metadata for freshness (tiny file, no candle deserialization) // Check candle metadata for freshness (tiny file, no candle deserialization)
const meta_result = s.readCandleMeta(symbol); const meta_result = s.readCandleMeta(symbol);
const is_twelvedata = if (meta_result) |mr| mr.meta.provider == .twelvedata else false; if (meta_result) |mr| {
const m = mr.meta;
// If cached data is from TwelveData (deprecated for candles due to // If cached data is from TwelveData (deprecated for candles due to
// unreliable adj_close), skip cache and fall through to full re-fetch. // unreliable adj_close), skip cache and fall through to full re-fetch.
if (is_twelvedata) if (m.provider == .twelvedata) {
log.debug("{s}: cached candles from TwelveData — forcing full re-fetch", .{symbol}); log.debug("{s}: cached candles from TwelveData — forcing full re-fetch", .{symbol});
} else if (s.isCandleMetaFresh(symbol)) {
if (!is_twelvedata) if (meta_result) |mr| {
const m = mr.meta;
if (s.isCandleMetaFresh(symbol)) {
// Fresh deserialize candles and return // Fresh deserialize candles and return
log.debug("{s}: candles fresh in local cache", .{symbol}); log.debug("{s}: candles fresh in local cache", .{symbol});
if (s.read(Candle, symbol, null, .any)) |r| if (s.read(Candle, symbol, null, .any)) |r|
return .{ .data = r.data, .source = .cached, .timestamp = mr.created }; return .{ .data = r.data, .source = .cached, .timestamp = mr.created };
} } else {
// Stale try server sync before incremental fetch // Stale try server sync before incremental fetch
if (self.syncCandlesFromServer(symbol)) { if (self.syncCandlesFromServer(symbol)) {
if (s.isCandleMetaFresh(symbol)) { if (s.isCandleMetaFresh(symbol)) {
@ -339,7 +400,6 @@ pub const DataService = struct {
return .{ .data = r.data, .source = .cached, .timestamp = std.time.timestamp() }; return .{ .data = r.data, .source = .cached, .timestamp = std.time.timestamp() };
} }
log.debug("{s}: candles synced from server but stale, falling through to incremental fetch", .{symbol}); log.debug("{s}: candles synced from server but stale, falling through to incremental fetch", .{symbol});
// Server data also stale fall through to incremental fetch
} }
// Stale try incremental update using last_date from meta // Stale try incremental update using last_date from meta
@ -347,13 +407,27 @@ pub const DataService = struct {
// If last cached date is today or later, just refresh the TTL (meta only) // If last cached date is today or later, just refresh the TTL (meta only)
if (!fetch_from.lessThan(today)) { if (!fetch_from.lessThan(today)) {
s.updateCandleMeta(symbol, m.last_close, m.last_date, m.provider); s.updateCandleMeta(symbol, m.last_close, m.last_date, m.provider, m.fail_count);
if (s.read(Candle, symbol, null, .any)) |r| if (s.read(Candle, symbol, null, .any)) |r|
return .{ .data = r.data, .source = .cached, .timestamp = std.time.timestamp() }; return .{ .data = r.data, .source = .cached, .timestamp = std.time.timestamp() };
} else { } else {
// Incremental fetch from day after last cached candle // Incremental fetch from day after last cached candle
const result = self.fetchCandlesFromProviders(symbol, fetch_from, today, m.provider) catch { const result = self.fetchCandlesFromProviders(symbol, fetch_from, today, m.provider) catch |err| {
// All providers failed return stale data if (err == DataError.TransientError) {
// Increment fail_count for this symbol
const new_fail_count = m.fail_count +| 1; // saturating add
log.warn("{s}: transient failure (fail_count now {d})", .{ symbol, new_fail_count });
s.updateCandleMeta(symbol, m.last_close, m.last_date, m.provider, new_fail_count);
// If degraded (fail_count >= 3), return stale data rather than failing
if (new_fail_count >= 3) {
log.warn("{s}: degraded after {d} consecutive failures, returning stale data", .{ symbol, new_fail_count });
if (s.read(Candle, symbol, null, .any)) |r|
return .{ .data = r.data, .source = .cached, .timestamp = mr.created };
}
return DataError.TransientError;
}
// Non-transient failure return stale data if available
if (s.read(Candle, symbol, null, .any)) |r| if (s.read(Candle, symbol, null, .any)) |r|
return .{ .data = r.data, .source = .cached, .timestamp = mr.created }; return .{ .data = r.data, .source = .cached, .timestamp = mr.created };
return DataError.FetchFailed; return DataError.FetchFailed;
@ -361,24 +435,23 @@ pub const DataService = struct {
const new_candles = result.candles; const new_candles = result.candles;
if (new_candles.len == 0) { if (new_candles.len == 0) {
// No new candles (weekend/holiday) refresh TTL only (meta rewrite) // No new candles (weekend/holiday) refresh TTL, reset fail_count
self.allocator.free(new_candles); self.allocator.free(new_candles);
s.updateCandleMeta(symbol, m.last_close, m.last_date, result.provider); s.updateCandleMeta(symbol, m.last_close, m.last_date, result.provider, 0);
if (s.read(Candle, symbol, null, .any)) |r| if (s.read(Candle, symbol, null, .any)) |r|
return .{ .data = r.data, .source = .cached, .timestamp = std.time.timestamp() }; return .{ .data = r.data, .source = .cached, .timestamp = std.time.timestamp() };
} else { } else {
// Append new candles to existing file + update meta // Append new candles to existing file + update meta, reset fail_count
s.appendCandles(symbol, new_candles, result.provider); s.appendCandles(symbol, new_candles, result.provider, 0);
// Load the full (now-updated) file for the caller
if (s.read(Candle, symbol, null, .any)) |r| { if (s.read(Candle, symbol, null, .any)) |r| {
self.allocator.free(new_candles); self.allocator.free(new_candles);
return .{ .data = r.data, .source = .fetched, .timestamp = std.time.timestamp() }; return .{ .data = r.data, .source = .fetched, .timestamp = std.time.timestamp() };
} }
// Append failed or file unreadable just return new candles
return .{ .data = new_candles, .source = .fetched, .timestamp = std.time.timestamp() }; return .{ .data = new_candles, .source = .fetched, .timestamp = std.time.timestamp() };
} }
} }
}; }
}
// No usable cache try server sync first // No usable cache try server sync first
if (self.syncCandlesFromServer(symbol)) { if (self.syncCandlesFromServer(symbol)) {
@ -388,20 +461,27 @@ pub const DataService = struct {
return .{ .data = r.data, .source = .cached, .timestamp = std.time.timestamp() }; return .{ .data = r.data, .source = .cached, .timestamp = std.time.timestamp() };
} }
log.debug("{s}: candles synced from server but stale, falling through to full fetch", .{symbol}); log.debug("{s}: candles synced from server but stale, falling through to full fetch", .{symbol});
// Server data also stale fall through to full fetch
} }
// No usable cache full fetch (~10 years, plus buffer for leap years) // No usable cache full fetch (~10 years, plus buffer for leap years)
log.debug("{s}: fetching full candle history from provider", .{symbol}); log.debug("{s}: fetching full candle history from provider", .{symbol});
const from = today.addDays(-3700); const from = today.addDays(-3700);
const result = self.fetchCandlesFromProviders(symbol, from, today, .tiingo) catch { const result = self.fetchCandlesFromProviders(symbol, from, today, .tiingo) catch |err| {
if (err == DataError.TransientError) {
// On a fresh fetch, increment fail_count if we have meta
if (meta_result) |mr| {
const new_fail_count = mr.meta.fail_count +| 1;
s.updateCandleMeta(symbol, mr.meta.last_close, mr.meta.last_date, mr.meta.provider, new_fail_count);
}
return DataError.TransientError;
}
s.writeNegative(symbol, .candles_daily); s.writeNegative(symbol, .candles_daily);
return DataError.FetchFailed; return DataError.FetchFailed;
}; };
if (result.candles.len > 0) { if (result.candles.len > 0) {
s.cacheCandles(symbol, result.candles, result.provider); s.cacheCandles(symbol, result.candles, result.provider, 0); // reset fail_count on success
} }
return .{ .data = result.candles, .source = .fetched, .timestamp = std.time.timestamp() }; return .{ .data = result.candles, .source = .fetched, .timestamp = std.time.timestamp() };