move to thread safe allocator to address issues on parallel fetch
All checks were successful
Generic zig build / build (push) Successful in 2m29s
Generic zig build / deploy (push) Successful in 19s

This commit is contained in:
Emil Lerch 2026-04-24 06:20:23 -07:00
parent 11a282e2db
commit f99ec545ad
Signed by: lobo
GPG key ID: A7B62D657EF764F8

View file

@ -91,7 +91,37 @@ fn earningsPostProcess(ev: *EarningsEvent, _: std.mem.Allocator) anyerror!void {
}
pub const DataService = struct {
allocator: std.mem.Allocator,
/// Thread-safe wrapper over the caller-provided base allocator.
///
/// Why this exists: `parallelServerSync` spawns worker threads that
/// each allocate through `DataService` HTTP client init, TLS cert
/// bundle parsing, request/response buffers, and `Store.writeRaw`
/// path joins. The CLI's root allocator is an `ArenaAllocator`
/// (`src/main.zig`), which is NOT thread-safe. Unsynchronized
/// concurrent allocs from workers corrupt the arena's free list.
/// Symptoms seen in the wild:
///
/// thread N panic: reached unreachable code
/// std/mem/Allocator.zig:147 grow
/// std/hash_map.zig:1296 addCertsFromFile
/// std/crypto/Certificate/Bundle.zig:206 request
/// std/http/Client.zig:1789 request
/// src/net/http.zig:43 syncFromServer
///
/// and bare segfaults mid-heap on whatever pointer the arena
/// scrambled that run.
///
/// The wrapper serializes every allocation with a mutex. Cost is
/// one lock acquire/release per alloc negligible next to the I/O
/// these allocations feed (HTTP requests, cache writes). The
/// alternative (threading per-worker arenas through every
/// transitive callsite) was rejected as error-prone.
///
/// DO NOT add an "unwrap" method or store the child allocator
/// directly. The point is that internal callers don't need to
/// know whether they're running under threads every path goes
/// through the lock by construction.
thread_safe: std.heap.ThreadSafeAllocator,
config: Config,
// Lazily initialized providers (null until first use)
@ -103,15 +133,26 @@ pub const DataService = struct {
yh: ?Yahoo = null,
tg: ?Tiingo = null,
pub fn init(allocator: std.mem.Allocator, config: Config) DataService {
pub fn init(base_allocator: std.mem.Allocator, config: Config) DataService {
const self = DataService{
.allocator = allocator,
.thread_safe = .{ .child_allocator = base_allocator },
.config = config,
};
self.logMissingKeys();
return self;
}
/// Return the thread-safe allocator. Always go through this, never
/// access the child allocator directly see the doc-comment on
/// `thread_safe` for why.
///
/// Safe to call from any method that holds `*DataService`. The
/// returned `std.mem.Allocator` embeds `&self.thread_safe`, which
/// is stable for as long as `self` is.
pub fn allocator(self: *DataService) std.mem.Allocator {
return self.thread_safe.allocator();
}
/// Log warnings for missing API keys so users know which features are unavailable.
fn logMissingKeys(self: DataService) void {
// Primary candle provider
@ -157,7 +198,7 @@ pub const DataService = struct {
if (@field(self, field_name)) |*p| return p;
if (T == Cboe or T == Yahoo) {
// CBOE and Yahoo have no API key
@field(self, field_name) = T.init(self.allocator);
@field(self, field_name) = T.init(self.allocator());
} else {
// All we're doing here is lower casing the type name, then
// appending _key to it, so AlphaVantage -> alphavantage_key
@ -174,7 +215,7 @@ pub const DataService = struct {
break :blk buf[0 .. short.len + 4];
};
const key = @field(self.config, config_key) orelse return DataError.NoApiKey;
@field(self, field_name) = T.init(self.allocator, key);
@field(self, field_name) = T.init(self.allocator(), key);
}
return &@field(self, field_name).?;
}
@ -189,7 +230,7 @@ pub const DataService = struct {
// Cache helper
fn store(self: *DataService) cache.Store {
return cache.Store.init(self.allocator, self.config.cache_dir);
return cache.Store.init(self.allocator(), self.config.cache_dir);
}
/// Generic fetch-or-cache for simple data types (dividends, splits, options).
@ -243,15 +284,15 @@ pub const DataService = struct {
return switch (T) {
Dividend => {
var pg = try self.getProvider(Polygon);
return pg.fetchDividends(self.allocator, symbol, null, null);
return pg.fetchDividends(self.allocator(), symbol, null, null);
},
Split => {
var pg = try self.getProvider(Polygon);
return pg.fetchSplits(self.allocator, symbol);
return pg.fetchSplits(self.allocator(), symbol);
},
OptionsChain => {
var cboe = try self.getProvider(Cboe);
return cboe.fetchOptionsChain(self.allocator, symbol);
return cboe.fetchOptionsChain(self.allocator(), symbol);
},
else => @compileError("unsupported type for fetchFromProvider"),
};
@ -288,7 +329,7 @@ pub const DataService = struct {
// If preferred is Yahoo (degraded symbol), try Yahoo first
if (preferred == .yahoo) {
if (self.getProvider(Yahoo)) |yh| {
if (yh.fetchCandles(self.allocator, symbol, from, to)) |candles| {
if (yh.fetchCandles(self.allocator(), symbol, from, to)) |candles| {
log.debug("{s}: candles from Yahoo (preferred)", .{symbol});
return .{ .candles = candles, .provider = .yahoo };
} else |err| {
@ -299,7 +340,7 @@ pub const DataService = struct {
// Primary: Tiingo
if (self.getProvider(Tiingo)) |tg| {
if (tg.fetchCandles(self.allocator, symbol, from, to)) |candles| {
if (tg.fetchCandles(self.allocator(), symbol, from, to)) |candles| {
log.debug("{s}: candles from Tiingo", .{symbol});
return .{ .candles = candles, .provider = .tiingo };
} else |err| {
@ -314,7 +355,7 @@ pub const DataService = struct {
// Rate limited: back off and retry this is expected, not a failure
log.info("{s}: Tiingo rate limited, backing off", .{symbol});
self.rateLimitBackoff();
if (tg.fetchCandles(self.allocator, symbol, from, to)) |candles| {
if (tg.fetchCandles(self.allocator(), symbol, from, to)) |candles| {
log.debug("{s}: candles from Tiingo (after rate limit backoff)", .{symbol});
return .{ .candles = candles, .provider = .tiingo };
} else |retry_err| {
@ -322,7 +363,7 @@ pub const DataService = struct {
if (retry_err == error.RateLimited) {
// Still rate limited after backoff one more try
self.rateLimitBackoff();
if (tg.fetchCandles(self.allocator, symbol, from, to)) |candles| {
if (tg.fetchCandles(self.allocator(), symbol, from, to)) |candles| {
log.debug("{s}: candles from Tiingo (after second backoff)", .{symbol});
return .{ .candles = candles, .provider = .tiingo };
} else |_| {}
@ -347,7 +388,7 @@ pub const DataService = struct {
// Fallback: Yahoo (symbol not on Tiingo)
if (preferred != .yahoo) {
if (self.getProvider(Yahoo)) |yh| {
if (yh.fetchCandles(self.allocator, symbol, from, to)) |candles| {
if (yh.fetchCandles(self.allocator(), symbol, from, to)) |candles| {
log.info("{s}: candles from Yahoo (Tiingo fallback)", .{symbol});
return .{ .candles = candles, .provider = .yahoo };
} else |err| {
@ -437,7 +478,7 @@ pub const DataService = struct {
if (new_candles.len == 0) {
// No new candles (weekend/holiday) refresh TTL, reset fail_count
self.allocator.free(new_candles);
self.allocator().free(new_candles);
s.updateCandleMeta(symbol, m.last_close, m.last_date, result.provider, 0);
if (s.read(Candle, symbol, null, .any)) |r|
return .{ .data = r.data, .source = .cached, .timestamp = std.time.timestamp() };
@ -445,7 +486,7 @@ pub const DataService = struct {
// Append new candles to existing file + update meta, reset fail_count
s.appendCandles(symbol, new_candles, result.provider, 0);
if (s.read(Candle, symbol, null, .any)) |r| {
self.allocator.free(new_candles);
self.allocator().free(new_candles);
return .{ .data = r.data, .source = .fetched, .timestamp = std.time.timestamp() };
}
return .{ .data = new_candles, .source = .fetched, .timestamp = std.time.timestamp() };
@ -528,7 +569,7 @@ pub const DataService = struct {
return .{ .data = cached.data, .source = .cached, .timestamp = cached.timestamp };
}
// Stale: free cached events and re-fetch below
self.allocator.free(cached.data);
self.allocator().free(cached.data);
}
// Try server sync before hitting Finnhub
@ -545,10 +586,10 @@ pub const DataService = struct {
const from = today.subtractYears(5);
const to = today.addDays(365);
const fetched = fh.fetchEarnings(self.allocator, symbol, from, to) catch |err| blk: {
const fetched = fh.fetchEarnings(self.allocator(), symbol, from, to) catch |err| blk: {
if (err == error.RateLimited) {
self.rateLimitBackoff();
break :blk fh.fetchEarnings(self.allocator, symbol, from, to) catch {
break :blk fh.fetchEarnings(self.allocator(), symbol, from, to) catch {
return DataError.FetchFailed;
};
}
@ -570,10 +611,10 @@ pub const DataService = struct {
return .{ .data = cached.data, .source = .cached, .timestamp = cached.timestamp };
var av = try self.getProvider(AlphaVantage);
const fetched = av.fetchEtfProfile(self.allocator, symbol) catch |err| blk: {
const fetched = av.fetchEtfProfile(self.allocator(), symbol) catch |err| blk: {
if (err == error.RateLimited) {
self.rateLimitBackoff();
break :blk av.fetchEtfProfile(self.allocator, symbol) catch {
break :blk av.fetchEtfProfile(self.allocator(), symbol) catch {
return DataError.FetchFailed;
};
}
@ -592,7 +633,7 @@ pub const DataService = struct {
pub fn getQuote(self: *DataService, symbol: []const u8) DataError!Quote {
// Primary: Yahoo Finance (free, real-time)
if (self.getProvider(Yahoo)) |yh| {
if (yh.fetchQuote(self.allocator, symbol)) |quote| {
if (yh.fetchQuote(self.allocator(), symbol)) |quote| {
log.debug("{s}: quote from Yahoo", .{symbol});
return quote;
} else |_| {}
@ -601,7 +642,7 @@ pub const DataService = struct {
// Fallback: TwelveData (requires API key, may be 15-min delayed)
var td = try self.getProvider(TwelveData);
log.debug("{s}: quote fallback to TwelveData", .{symbol});
return td.fetchQuote(self.allocator, symbol) catch
return td.fetchQuote(self.allocator(), symbol) catch
return DataError.FetchFailed;
}
@ -609,7 +650,7 @@ pub const DataService = struct {
/// No cache -- always fetches fresh. Caller must free the returned string fields.
pub fn getCompanyOverview(self: *DataService, symbol: []const u8) DataError!CompanyOverview {
var av = try self.getProvider(AlphaVantage);
return av.fetchCompanyOverview(self.allocator, symbol) catch
return av.fetchCompanyOverview(self.allocator(), symbol) catch
return DataError.FetchFailed;
}
@ -818,7 +859,7 @@ pub const DataService = struct {
// 2. Try API fetch
if (self.getCandles(sym)) |candle_result| {
defer self.allocator.free(candle_result.data);
defer self.allocator().free(candle_result.data);
if (candle_result.data.len > 0) {
const last = candle_result.data[candle_result.data.len - 1];
prices.put(sym, last.close) catch {};
@ -939,7 +980,7 @@ pub const DataService = struct {
symbol_progress: ?ProgressCallback,
) LoadAllResult {
var result = LoadAllResult{
.prices = std.StringHashMap(f64).init(self.allocator),
.prices = std.StringHashMap(f64).init(self.allocator()),
.cached_count = 0,
.server_synced_count = 0,
.provider_fetched_count = 0,
@ -956,13 +997,13 @@ pub const DataService = struct {
if (total_count == 0) return result;
// Build combined symbol list
var all_symbols = std.ArrayList([]const u8).initCapacity(self.allocator, total_count) catch return result;
defer all_symbols.deinit(self.allocator);
var all_symbols = std.ArrayList([]const u8).initCapacity(self.allocator(), total_count) catch return result;
defer all_symbols.deinit(self.allocator());
if (portfolio_syms) |ps| {
for (ps) |sym| all_symbols.append(self.allocator, sym) catch {};
for (ps) |sym| all_symbols.append(self.allocator(), sym) catch {};
}
for (watch_syms) |sym| all_symbols.append(self.allocator, sym) catch {};
for (watch_syms) |sym| all_symbols.append(self.allocator(), sym) catch {};
// Invalidate cache if force refresh
if (config.force_refresh) {
@ -973,7 +1014,7 @@ pub const DataService = struct {
// Phase 1: Check local cache (fast path)
var needs_fetch: std.ArrayList([]const u8) = .empty;
defer needs_fetch.deinit(self.allocator);
defer needs_fetch.deinit(self.allocator());
if (aggregate_progress) |p| p.emit(0, total_count, .cache_check);
@ -985,7 +1026,7 @@ pub const DataService = struct {
}
result.cached_count += 1;
} else {
needs_fetch.append(self.allocator, sym) catch {};
needs_fetch.append(self.allocator(), sym) catch {};
}
}
@ -998,7 +1039,7 @@ pub const DataService = struct {
// Phase 2: Server sync (parallel if server configured)
var server_failures: std.ArrayList([]const u8) = .empty;
defer server_failures.deinit(self.allocator);
defer server_failures.deinit(self.allocator());
if (self.config.server_url != null) {
self.parallelServerSync(
@ -1012,7 +1053,7 @@ pub const DataService = struct {
} else {
// No server all need provider fetch
for (needs_fetch.items) |sym| {
server_failures.append(self.allocator, sym) catch {};
server_failures.append(self.allocator(), sym) catch {};
}
}
@ -1054,12 +1095,12 @@ pub const DataService = struct {
// Shared state for worker threads
var completed = AtomicCounter{};
var next_index = AtomicCounter{};
const sync_results = self.allocator.alloc(ServerSyncResult, symbols.len) catch {
const sync_results = self.allocator().alloc(ServerSyncResult, symbols.len) catch {
// Allocation failed fall back to marking all as failures
for (symbols) |sym| failures.append(self.allocator, sym) catch {};
for (symbols) |sym| failures.append(self.allocator(), sym) catch {};
return;
};
defer self.allocator.free(sync_results);
defer self.allocator().free(sync_results);
// Initialize results
for (sync_results, 0..) |*sr, i| {
@ -1067,11 +1108,11 @@ pub const DataService = struct {
}
// Spawn worker threads
var threads = self.allocator.alloc(std.Thread, thread_count) catch {
for (symbols) |sym| failures.append(self.allocator, sym) catch {};
var threads = self.allocator().alloc(std.Thread, thread_count) catch {
for (symbols) |sym| failures.append(self.allocator(), sym) catch {};
return;
};
defer self.allocator.free(threads);
defer self.allocator().free(threads);
const WorkerContext = struct {
svc: *DataService,
@ -1133,10 +1174,10 @@ pub const DataService = struct {
result.server_synced_count += 1;
} else {
// Sync said success but can't read cache treat as failure
failures.append(self.allocator, sr.symbol) catch {};
failures.append(self.allocator(), sr.symbol) catch {};
}
} else {
failures.append(self.allocator, sr.symbol) catch {};
failures.append(self.allocator(), sr.symbol) catch {};
}
}
}
@ -1159,7 +1200,7 @@ pub const DataService = struct {
// Try provider fetch
if (self.getCandles(sym)) |candle_result| {
defer self.allocator.free(candle_result.data);
defer self.allocator().free(candle_result.data);
if (candle_result.data.len > 0) {
const last = candle_result.data[candle_result.data.len - 1];
result.prices.put(sym, last.close) catch {};
@ -1201,7 +1242,7 @@ pub const DataService = struct {
/// Results array is parallel to the input cusips array (same length, same order).
/// Caller owns the returned slice and all strings within each CusipResult.
pub fn lookupCusips(self: *DataService, cusips: []const []const u8) DataError![]CusipResult {
return OpenFigi.lookupCusips(self.allocator, cusips, self.config.openfigi_key) catch
return OpenFigi.lookupCusips(self.allocator(), cusips, self.config.openfigi_key) catch
return DataError.FetchFailed;
}
@ -1213,10 +1254,10 @@ pub const DataService = struct {
if (self.getCachedCusipTicker(cusip)) |t| return t;
// Try OpenFIGI
const result = OpenFigi.lookupCusip(self.allocator, cusip, self.config.openfigi_key) catch return null;
const result = OpenFigi.lookupCusip(self.allocator(), cusip, self.config.openfigi_key) catch return null;
defer {
if (result.name) |n| self.allocator.free(n);
if (result.security_type) |s| self.allocator.free(s);
if (result.name) |n| self.allocator().free(n);
if (result.security_type) |s| self.allocator().free(s);
}
if (result.ticker) |ticker| {
@ -1237,20 +1278,20 @@ pub const DataService = struct {
/// Read a cached CUSIP->ticker mapping. Returns null if not cached.
/// Caller owns the returned string.
fn getCachedCusipTicker(self: *DataService, cusip: []const u8) ?[]const u8 {
const path = std.fs.path.join(self.allocator, &.{ self.config.cache_dir, "cusip_tickers.srf" }) catch return null;
defer self.allocator.free(path);
const path = std.fs.path.join(self.allocator(), &.{ self.config.cache_dir, "cusip_tickers.srf" }) catch return null;
defer self.allocator().free(path);
const data = std.fs.cwd().readFileAlloc(self.allocator, path, 64 * 1024) catch return null;
defer self.allocator.free(data);
const data = std.fs.cwd().readFileAlloc(self.allocator(), path, 64 * 1024) catch return null;
defer self.allocator().free(data);
var reader = std.Io.Reader.fixed(data);
var it = srf.iterator(&reader, self.allocator, .{ .alloc_strings = false }) catch return null;
var it = srf.iterator(&reader, self.allocator(), .{ .alloc_strings = false }) catch return null;
defer it.deinit();
while (it.next() catch return null) |fields| {
const entry = fields.to(CusipEntry) catch continue;
if (std.mem.eql(u8, entry.cusip, cusip) and entry.ticker.len > 0) {
return self.allocator.dupe(u8, entry.ticker) catch null;
return self.allocator().dupe(u8, entry.ticker) catch null;
}
}
return null;
@ -1258,8 +1299,8 @@ pub const DataService = struct {
/// Append a CUSIP->ticker mapping to the cache file.
pub fn cacheCusipTicker(self: *DataService, cusip: []const u8, ticker: []const u8) void {
const path = std.fs.path.join(self.allocator, &.{ self.config.cache_dir, "cusip_tickers.srf" }) catch return;
defer self.allocator.free(path);
const path = std.fs.path.join(self.allocator(), &.{ self.config.cache_dir, "cusip_tickers.srf" }) catch return;
defer self.allocator().free(path);
// Ensure cache dir exists
if (std.fs.path.dirnamePosix(path)) |dir| {
@ -1278,7 +1319,7 @@ pub const DataService = struct {
const entry = [_]CusipEntry{.{ .cusip = cusip, .ticker = ticker }};
var buf: [256]u8 = undefined;
var writer = file.writer(&buf);
writer.interface.print("{f}", .{srf.fmtFrom(CusipEntry, self.allocator, &entry, .{ .emit_directives = emit_directives })}) catch return;
writer.interface.print("{f}", .{srf.fmtFrom(CusipEntry, self.allocator(), &entry, .{ .emit_directives = emit_directives })}) catch return;
writer.interface.flush() catch {};
}
@ -1312,12 +1353,12 @@ pub const DataService = struct {
.meta => return false,
};
const full_url = std.fmt.allocPrint(self.allocator, "{s}/{s}{s}", .{ server_url, symbol, endpoint }) catch return false;
defer self.allocator.free(full_url);
const full_url = std.fmt.allocPrint(self.allocator(), "{s}/{s}{s}", .{ server_url, symbol, endpoint }) catch return false;
defer self.allocator().free(full_url);
log.debug("{s}: syncing {s} from server", .{ symbol, @tagName(data_type) });
var client = http.Client.init(self.allocator);
var client = http.Client.init(self.allocator());
defer client.deinit();
var response = client.get(full_url) catch |err| {
@ -1356,13 +1397,13 @@ pub const DataService = struct {
/// Caller owns the returned AccountMap and must call deinit().
pub fn loadAccountMap(self: *DataService, portfolio_path: []const u8) ?analysis.AccountMap {
const dir_end = if (std.mem.lastIndexOfScalar(u8, portfolio_path, std.fs.path.sep)) |idx| idx + 1 else 0;
const acct_path = std.fmt.allocPrint(self.allocator, "{s}accounts.srf", .{portfolio_path[0..dir_end]}) catch return null;
defer self.allocator.free(acct_path);
const acct_path = std.fmt.allocPrint(self.allocator(), "{s}accounts.srf", .{portfolio_path[0..dir_end]}) catch return null;
defer self.allocator().free(acct_path);
const data = std.fs.cwd().readFileAlloc(self.allocator, acct_path, 1024 * 1024) catch return null;
defer self.allocator.free(data);
const data = std.fs.cwd().readFileAlloc(self.allocator(), acct_path, 1024 * 1024) catch return null;
defer self.allocator().free(data);
return analysis.parseAccountsFile(self.allocator, data) catch null;
return analysis.parseAccountsFile(self.allocator(), data) catch null;
}
};