From f99ec545adf36765de6e9b31323d62ce5af2ceba Mon Sep 17 00:00:00 2001 From: Emil Lerch Date: Fri, 24 Apr 2026 06:20:23 -0700 Subject: [PATCH] move to thread safe allocator to address issues on parallel fetch --- src/service.zig | 169 ++++++++++++++++++++++++++++++------------------ 1 file changed, 105 insertions(+), 64 deletions(-) diff --git a/src/service.zig b/src/service.zig index cffb20e..de89fe3 100644 --- a/src/service.zig +++ b/src/service.zig @@ -91,7 +91,37 @@ fn earningsPostProcess(ev: *EarningsEvent, _: std.mem.Allocator) anyerror!void { } pub const DataService = struct { - allocator: std.mem.Allocator, + /// Thread-safe wrapper over the caller-provided base allocator. + /// + /// Why this exists: `parallelServerSync` spawns worker threads that + /// each allocate through `DataService` — HTTP client init, TLS cert + /// bundle parsing, request/response buffers, and `Store.writeRaw` + /// path joins. The CLI's root allocator is an `ArenaAllocator` + /// (`src/main.zig`), which is NOT thread-safe. Unsynchronized + /// concurrent allocs from workers corrupt the arena's free list. + /// Symptoms seen in the wild: + /// + /// thread N panic: reached unreachable code + /// std/mem/Allocator.zig:147 grow + /// std/hash_map.zig:1296 addCertsFromFile + /// std/crypto/Certificate/Bundle.zig:206 request + /// std/http/Client.zig:1789 request + /// src/net/http.zig:43 syncFromServer + /// + /// and bare segfaults mid-heap on whatever pointer the arena + /// scrambled that run. + /// + /// The wrapper serializes every allocation with a mutex. Cost is + /// one lock acquire/release per alloc — negligible next to the I/O + /// these allocations feed (HTTP requests, cache writes). The + /// alternative (threading per-worker arenas through every + /// transitive callsite) was rejected as error-prone. + /// + /// DO NOT add an "unwrap" method or store the child allocator + /// directly. The point is that internal callers don't need to + /// know whether they're running under threads — every path goes + /// through the lock by construction. + thread_safe: std.heap.ThreadSafeAllocator, config: Config, // Lazily initialized providers (null until first use) @@ -103,15 +133,26 @@ pub const DataService = struct { yh: ?Yahoo = null, tg: ?Tiingo = null, - pub fn init(allocator: std.mem.Allocator, config: Config) DataService { + pub fn init(base_allocator: std.mem.Allocator, config: Config) DataService { const self = DataService{ - .allocator = allocator, + .thread_safe = .{ .child_allocator = base_allocator }, .config = config, }; self.logMissingKeys(); return self; } + /// Return the thread-safe allocator. Always go through this, never + /// access the child allocator directly — see the doc-comment on + /// `thread_safe` for why. + /// + /// Safe to call from any method that holds `*DataService`. The + /// returned `std.mem.Allocator` embeds `&self.thread_safe`, which + /// is stable for as long as `self` is. + pub fn allocator(self: *DataService) std.mem.Allocator { + return self.thread_safe.allocator(); + } + /// Log warnings for missing API keys so users know which features are unavailable. fn logMissingKeys(self: DataService) void { // Primary candle provider @@ -157,7 +198,7 @@ pub const DataService = struct { if (@field(self, field_name)) |*p| return p; if (T == Cboe or T == Yahoo) { // CBOE and Yahoo have no API key - @field(self, field_name) = T.init(self.allocator); + @field(self, field_name) = T.init(self.allocator()); } else { // All we're doing here is lower casing the type name, then // appending _key to it, so AlphaVantage -> alphavantage_key @@ -174,7 +215,7 @@ pub const DataService = struct { break :blk buf[0 .. short.len + 4]; }; const key = @field(self.config, config_key) orelse return DataError.NoApiKey; - @field(self, field_name) = T.init(self.allocator, key); + @field(self, field_name) = T.init(self.allocator(), key); } return &@field(self, field_name).?; } @@ -189,7 +230,7 @@ pub const DataService = struct { // ── Cache helper ───────────────────────────────────────────── fn store(self: *DataService) cache.Store { - return cache.Store.init(self.allocator, self.config.cache_dir); + return cache.Store.init(self.allocator(), self.config.cache_dir); } /// Generic fetch-or-cache for simple data types (dividends, splits, options). @@ -243,15 +284,15 @@ pub const DataService = struct { return switch (T) { Dividend => { var pg = try self.getProvider(Polygon); - return pg.fetchDividends(self.allocator, symbol, null, null); + return pg.fetchDividends(self.allocator(), symbol, null, null); }, Split => { var pg = try self.getProvider(Polygon); - return pg.fetchSplits(self.allocator, symbol); + return pg.fetchSplits(self.allocator(), symbol); }, OptionsChain => { var cboe = try self.getProvider(Cboe); - return cboe.fetchOptionsChain(self.allocator, symbol); + return cboe.fetchOptionsChain(self.allocator(), symbol); }, else => @compileError("unsupported type for fetchFromProvider"), }; @@ -288,7 +329,7 @@ pub const DataService = struct { // If preferred is Yahoo (degraded symbol), try Yahoo first if (preferred == .yahoo) { if (self.getProvider(Yahoo)) |yh| { - if (yh.fetchCandles(self.allocator, symbol, from, to)) |candles| { + if (yh.fetchCandles(self.allocator(), symbol, from, to)) |candles| { log.debug("{s}: candles from Yahoo (preferred)", .{symbol}); return .{ .candles = candles, .provider = .yahoo }; } else |err| { @@ -299,7 +340,7 @@ pub const DataService = struct { // Primary: Tiingo if (self.getProvider(Tiingo)) |tg| { - if (tg.fetchCandles(self.allocator, symbol, from, to)) |candles| { + if (tg.fetchCandles(self.allocator(), symbol, from, to)) |candles| { log.debug("{s}: candles from Tiingo", .{symbol}); return .{ .candles = candles, .provider = .tiingo }; } else |err| { @@ -314,7 +355,7 @@ pub const DataService = struct { // Rate limited: back off and retry — this is expected, not a failure log.info("{s}: Tiingo rate limited, backing off", .{symbol}); self.rateLimitBackoff(); - if (tg.fetchCandles(self.allocator, symbol, from, to)) |candles| { + if (tg.fetchCandles(self.allocator(), symbol, from, to)) |candles| { log.debug("{s}: candles from Tiingo (after rate limit backoff)", .{symbol}); return .{ .candles = candles, .provider = .tiingo }; } else |retry_err| { @@ -322,7 +363,7 @@ pub const DataService = struct { if (retry_err == error.RateLimited) { // Still rate limited after backoff — one more try self.rateLimitBackoff(); - if (tg.fetchCandles(self.allocator, symbol, from, to)) |candles| { + if (tg.fetchCandles(self.allocator(), symbol, from, to)) |candles| { log.debug("{s}: candles from Tiingo (after second backoff)", .{symbol}); return .{ .candles = candles, .provider = .tiingo }; } else |_| {} @@ -347,7 +388,7 @@ pub const DataService = struct { // Fallback: Yahoo (symbol not on Tiingo) if (preferred != .yahoo) { if (self.getProvider(Yahoo)) |yh| { - if (yh.fetchCandles(self.allocator, symbol, from, to)) |candles| { + if (yh.fetchCandles(self.allocator(), symbol, from, to)) |candles| { log.info("{s}: candles from Yahoo (Tiingo fallback)", .{symbol}); return .{ .candles = candles, .provider = .yahoo }; } else |err| { @@ -437,7 +478,7 @@ pub const DataService = struct { if (new_candles.len == 0) { // No new candles (weekend/holiday) — refresh TTL, reset fail_count - self.allocator.free(new_candles); + self.allocator().free(new_candles); s.updateCandleMeta(symbol, m.last_close, m.last_date, result.provider, 0); if (s.read(Candle, symbol, null, .any)) |r| return .{ .data = r.data, .source = .cached, .timestamp = std.time.timestamp() }; @@ -445,7 +486,7 @@ pub const DataService = struct { // Append new candles to existing file + update meta, reset fail_count s.appendCandles(symbol, new_candles, result.provider, 0); if (s.read(Candle, symbol, null, .any)) |r| { - self.allocator.free(new_candles); + self.allocator().free(new_candles); return .{ .data = r.data, .source = .fetched, .timestamp = std.time.timestamp() }; } return .{ .data = new_candles, .source = .fetched, .timestamp = std.time.timestamp() }; @@ -528,7 +569,7 @@ pub const DataService = struct { return .{ .data = cached.data, .source = .cached, .timestamp = cached.timestamp }; } // Stale: free cached events and re-fetch below - self.allocator.free(cached.data); + self.allocator().free(cached.data); } // Try server sync before hitting Finnhub @@ -545,10 +586,10 @@ pub const DataService = struct { const from = today.subtractYears(5); const to = today.addDays(365); - const fetched = fh.fetchEarnings(self.allocator, symbol, from, to) catch |err| blk: { + const fetched = fh.fetchEarnings(self.allocator(), symbol, from, to) catch |err| blk: { if (err == error.RateLimited) { self.rateLimitBackoff(); - break :blk fh.fetchEarnings(self.allocator, symbol, from, to) catch { + break :blk fh.fetchEarnings(self.allocator(), symbol, from, to) catch { return DataError.FetchFailed; }; } @@ -570,10 +611,10 @@ pub const DataService = struct { return .{ .data = cached.data, .source = .cached, .timestamp = cached.timestamp }; var av = try self.getProvider(AlphaVantage); - const fetched = av.fetchEtfProfile(self.allocator, symbol) catch |err| blk: { + const fetched = av.fetchEtfProfile(self.allocator(), symbol) catch |err| blk: { if (err == error.RateLimited) { self.rateLimitBackoff(); - break :blk av.fetchEtfProfile(self.allocator, symbol) catch { + break :blk av.fetchEtfProfile(self.allocator(), symbol) catch { return DataError.FetchFailed; }; } @@ -592,7 +633,7 @@ pub const DataService = struct { pub fn getQuote(self: *DataService, symbol: []const u8) DataError!Quote { // Primary: Yahoo Finance (free, real-time) if (self.getProvider(Yahoo)) |yh| { - if (yh.fetchQuote(self.allocator, symbol)) |quote| { + if (yh.fetchQuote(self.allocator(), symbol)) |quote| { log.debug("{s}: quote from Yahoo", .{symbol}); return quote; } else |_| {} @@ -601,7 +642,7 @@ pub const DataService = struct { // Fallback: TwelveData (requires API key, may be 15-min delayed) var td = try self.getProvider(TwelveData); log.debug("{s}: quote fallback to TwelveData", .{symbol}); - return td.fetchQuote(self.allocator, symbol) catch + return td.fetchQuote(self.allocator(), symbol) catch return DataError.FetchFailed; } @@ -609,7 +650,7 @@ pub const DataService = struct { /// No cache -- always fetches fresh. Caller must free the returned string fields. pub fn getCompanyOverview(self: *DataService, symbol: []const u8) DataError!CompanyOverview { var av = try self.getProvider(AlphaVantage); - return av.fetchCompanyOverview(self.allocator, symbol) catch + return av.fetchCompanyOverview(self.allocator(), symbol) catch return DataError.FetchFailed; } @@ -818,7 +859,7 @@ pub const DataService = struct { // 2. Try API fetch if (self.getCandles(sym)) |candle_result| { - defer self.allocator.free(candle_result.data); + defer self.allocator().free(candle_result.data); if (candle_result.data.len > 0) { const last = candle_result.data[candle_result.data.len - 1]; prices.put(sym, last.close) catch {}; @@ -939,7 +980,7 @@ pub const DataService = struct { symbol_progress: ?ProgressCallback, ) LoadAllResult { var result = LoadAllResult{ - .prices = std.StringHashMap(f64).init(self.allocator), + .prices = std.StringHashMap(f64).init(self.allocator()), .cached_count = 0, .server_synced_count = 0, .provider_fetched_count = 0, @@ -956,13 +997,13 @@ pub const DataService = struct { if (total_count == 0) return result; // Build combined symbol list - var all_symbols = std.ArrayList([]const u8).initCapacity(self.allocator, total_count) catch return result; - defer all_symbols.deinit(self.allocator); + var all_symbols = std.ArrayList([]const u8).initCapacity(self.allocator(), total_count) catch return result; + defer all_symbols.deinit(self.allocator()); if (portfolio_syms) |ps| { - for (ps) |sym| all_symbols.append(self.allocator, sym) catch {}; + for (ps) |sym| all_symbols.append(self.allocator(), sym) catch {}; } - for (watch_syms) |sym| all_symbols.append(self.allocator, sym) catch {}; + for (watch_syms) |sym| all_symbols.append(self.allocator(), sym) catch {}; // Invalidate cache if force refresh if (config.force_refresh) { @@ -973,7 +1014,7 @@ pub const DataService = struct { // Phase 1: Check local cache (fast path) var needs_fetch: std.ArrayList([]const u8) = .empty; - defer needs_fetch.deinit(self.allocator); + defer needs_fetch.deinit(self.allocator()); if (aggregate_progress) |p| p.emit(0, total_count, .cache_check); @@ -985,7 +1026,7 @@ pub const DataService = struct { } result.cached_count += 1; } else { - needs_fetch.append(self.allocator, sym) catch {}; + needs_fetch.append(self.allocator(), sym) catch {}; } } @@ -998,7 +1039,7 @@ pub const DataService = struct { // Phase 2: Server sync (parallel if server configured) var server_failures: std.ArrayList([]const u8) = .empty; - defer server_failures.deinit(self.allocator); + defer server_failures.deinit(self.allocator()); if (self.config.server_url != null) { self.parallelServerSync( @@ -1012,7 +1053,7 @@ pub const DataService = struct { } else { // No server — all need provider fetch for (needs_fetch.items) |sym| { - server_failures.append(self.allocator, sym) catch {}; + server_failures.append(self.allocator(), sym) catch {}; } } @@ -1054,12 +1095,12 @@ pub const DataService = struct { // Shared state for worker threads var completed = AtomicCounter{}; var next_index = AtomicCounter{}; - const sync_results = self.allocator.alloc(ServerSyncResult, symbols.len) catch { + const sync_results = self.allocator().alloc(ServerSyncResult, symbols.len) catch { // Allocation failed — fall back to marking all as failures - for (symbols) |sym| failures.append(self.allocator, sym) catch {}; + for (symbols) |sym| failures.append(self.allocator(), sym) catch {}; return; }; - defer self.allocator.free(sync_results); + defer self.allocator().free(sync_results); // Initialize results for (sync_results, 0..) |*sr, i| { @@ -1067,11 +1108,11 @@ pub const DataService = struct { } // Spawn worker threads - var threads = self.allocator.alloc(std.Thread, thread_count) catch { - for (symbols) |sym| failures.append(self.allocator, sym) catch {}; + var threads = self.allocator().alloc(std.Thread, thread_count) catch { + for (symbols) |sym| failures.append(self.allocator(), sym) catch {}; return; }; - defer self.allocator.free(threads); + defer self.allocator().free(threads); const WorkerContext = struct { svc: *DataService, @@ -1133,10 +1174,10 @@ pub const DataService = struct { result.server_synced_count += 1; } else { // Sync said success but can't read cache — treat as failure - failures.append(self.allocator, sr.symbol) catch {}; + failures.append(self.allocator(), sr.symbol) catch {}; } } else { - failures.append(self.allocator, sr.symbol) catch {}; + failures.append(self.allocator(), sr.symbol) catch {}; } } } @@ -1159,7 +1200,7 @@ pub const DataService = struct { // Try provider fetch if (self.getCandles(sym)) |candle_result| { - defer self.allocator.free(candle_result.data); + defer self.allocator().free(candle_result.data); if (candle_result.data.len > 0) { const last = candle_result.data[candle_result.data.len - 1]; result.prices.put(sym, last.close) catch {}; @@ -1201,7 +1242,7 @@ pub const DataService = struct { /// Results array is parallel to the input cusips array (same length, same order). /// Caller owns the returned slice and all strings within each CusipResult. pub fn lookupCusips(self: *DataService, cusips: []const []const u8) DataError![]CusipResult { - return OpenFigi.lookupCusips(self.allocator, cusips, self.config.openfigi_key) catch + return OpenFigi.lookupCusips(self.allocator(), cusips, self.config.openfigi_key) catch return DataError.FetchFailed; } @@ -1213,10 +1254,10 @@ pub const DataService = struct { if (self.getCachedCusipTicker(cusip)) |t| return t; // Try OpenFIGI - const result = OpenFigi.lookupCusip(self.allocator, cusip, self.config.openfigi_key) catch return null; + const result = OpenFigi.lookupCusip(self.allocator(), cusip, self.config.openfigi_key) catch return null; defer { - if (result.name) |n| self.allocator.free(n); - if (result.security_type) |s| self.allocator.free(s); + if (result.name) |n| self.allocator().free(n); + if (result.security_type) |s| self.allocator().free(s); } if (result.ticker) |ticker| { @@ -1237,20 +1278,20 @@ pub const DataService = struct { /// Read a cached CUSIP->ticker mapping. Returns null if not cached. /// Caller owns the returned string. fn getCachedCusipTicker(self: *DataService, cusip: []const u8) ?[]const u8 { - const path = std.fs.path.join(self.allocator, &.{ self.config.cache_dir, "cusip_tickers.srf" }) catch return null; - defer self.allocator.free(path); + const path = std.fs.path.join(self.allocator(), &.{ self.config.cache_dir, "cusip_tickers.srf" }) catch return null; + defer self.allocator().free(path); - const data = std.fs.cwd().readFileAlloc(self.allocator, path, 64 * 1024) catch return null; - defer self.allocator.free(data); + const data = std.fs.cwd().readFileAlloc(self.allocator(), path, 64 * 1024) catch return null; + defer self.allocator().free(data); var reader = std.Io.Reader.fixed(data); - var it = srf.iterator(&reader, self.allocator, .{ .alloc_strings = false }) catch return null; + var it = srf.iterator(&reader, self.allocator(), .{ .alloc_strings = false }) catch return null; defer it.deinit(); while (it.next() catch return null) |fields| { const entry = fields.to(CusipEntry) catch continue; if (std.mem.eql(u8, entry.cusip, cusip) and entry.ticker.len > 0) { - return self.allocator.dupe(u8, entry.ticker) catch null; + return self.allocator().dupe(u8, entry.ticker) catch null; } } return null; @@ -1258,8 +1299,8 @@ pub const DataService = struct { /// Append a CUSIP->ticker mapping to the cache file. pub fn cacheCusipTicker(self: *DataService, cusip: []const u8, ticker: []const u8) void { - const path = std.fs.path.join(self.allocator, &.{ self.config.cache_dir, "cusip_tickers.srf" }) catch return; - defer self.allocator.free(path); + const path = std.fs.path.join(self.allocator(), &.{ self.config.cache_dir, "cusip_tickers.srf" }) catch return; + defer self.allocator().free(path); // Ensure cache dir exists if (std.fs.path.dirnamePosix(path)) |dir| { @@ -1278,7 +1319,7 @@ pub const DataService = struct { const entry = [_]CusipEntry{.{ .cusip = cusip, .ticker = ticker }}; var buf: [256]u8 = undefined; var writer = file.writer(&buf); - writer.interface.print("{f}", .{srf.fmtFrom(CusipEntry, self.allocator, &entry, .{ .emit_directives = emit_directives })}) catch return; + writer.interface.print("{f}", .{srf.fmtFrom(CusipEntry, self.allocator(), &entry, .{ .emit_directives = emit_directives })}) catch return; writer.interface.flush() catch {}; } @@ -1312,12 +1353,12 @@ pub const DataService = struct { .meta => return false, }; - const full_url = std.fmt.allocPrint(self.allocator, "{s}/{s}{s}", .{ server_url, symbol, endpoint }) catch return false; - defer self.allocator.free(full_url); + const full_url = std.fmt.allocPrint(self.allocator(), "{s}/{s}{s}", .{ server_url, symbol, endpoint }) catch return false; + defer self.allocator().free(full_url); log.debug("{s}: syncing {s} from server", .{ symbol, @tagName(data_type) }); - var client = http.Client.init(self.allocator); + var client = http.Client.init(self.allocator()); defer client.deinit(); var response = client.get(full_url) catch |err| { @@ -1356,13 +1397,13 @@ pub const DataService = struct { /// Caller owns the returned AccountMap and must call deinit(). pub fn loadAccountMap(self: *DataService, portfolio_path: []const u8) ?analysis.AccountMap { const dir_end = if (std.mem.lastIndexOfScalar(u8, portfolio_path, std.fs.path.sep)) |idx| idx + 1 else 0; - const acct_path = std.fmt.allocPrint(self.allocator, "{s}accounts.srf", .{portfolio_path[0..dir_end]}) catch return null; - defer self.allocator.free(acct_path); + const acct_path = std.fmt.allocPrint(self.allocator(), "{s}accounts.srf", .{portfolio_path[0..dir_end]}) catch return null; + defer self.allocator().free(acct_path); - const data = std.fs.cwd().readFileAlloc(self.allocator, acct_path, 1024 * 1024) catch return null; - defer self.allocator.free(data); + const data = std.fs.cwd().readFileAlloc(self.allocator(), acct_path, 1024 * 1024) catch return null; + defer self.allocator().free(data); - return analysis.parseAccountsFile(self.allocator, data) catch null; + return analysis.parseAccountsFile(self.allocator(), data) catch null; } };