From 7144f60d10842d75bd877a6edd070d6346fff733 Mon Sep 17 00:00:00 2001 From: Emil Lerch Date: Fri, 20 Mar 2026 09:48:03 -0700 Subject: [PATCH] rework the data load --- src/commands/common.zig | 123 +++++++++++++ src/commands/portfolio.zig | 51 ++---- src/service.zig | 350 +++++++++++++++++++++++++++++++++++++ src/tui.zig | 35 +--- 4 files changed, 496 insertions(+), 63 deletions(-) diff --git a/src/commands/common.zig b/src/commands/common.zig index 59f29e9..953c340 100644 --- a/src/commands/common.zig +++ b/src/commands/common.zig @@ -123,6 +123,129 @@ pub const LoadProgress = struct { } }; +/// Aggregate progress callback for parallel loading operations. +/// Displays a single updating line with progress bar. +pub const AggregateProgress = struct { + color: bool, + last_phase: ?zfin.DataService.AggregateProgressCallback.Phase = null, + + fn onProgress(ctx: *anyopaque, completed: usize, total: usize, phase: zfin.DataService.AggregateProgressCallback.Phase) void { + const self: *AggregateProgress = @ptrCast(@alignCast(ctx)); + + // Track phase transitions for newlines + const phase_changed = self.last_phase == null or self.last_phase.? != phase; + self.last_phase = phase; + + var buf: [256]u8 = undefined; + var writer = std.fs.File.stderr().writer(&buf); + const out = &writer.interface; + + switch (phase) { + .cache_check => { + // Brief phase, no output needed + }, + .server_sync => { + // Single updating line with carriage return + if (self.color) fmt.ansiSetFg(out, CLR_MUTED[0], CLR_MUTED[1], CLR_MUTED[2]) catch {}; + out.print("\r Syncing from server... [{d}/{d}]", .{ completed, total }) catch {}; + if (self.color) fmt.ansiReset(out) catch {}; + out.flush() catch {}; + }, + .provider_fetch => { + if (phase_changed) { + // Clear the server sync line and print newline + out.print("\r\x1b[K", .{}) catch {}; // clear line + if (self.color) fmt.ansiSetFg(out, CLR_MUTED[0], CLR_MUTED[1], CLR_MUTED[2]) catch {}; + out.print(" Synced {d} from server, fetching remaining from providers...\n", .{completed}) catch {}; + if (self.color) fmt.ansiReset(out) catch {}; + out.flush() catch {}; + } + }, + .complete => { + // Final newline if we were on server_sync line + if (self.last_phase != null and + (self.last_phase.? == .server_sync or self.last_phase.? == .cache_check)) + { + out.print("\r\x1b[K", .{}) catch {}; // clear line + } + }, + } + } + + pub fn callback(self: *AggregateProgress) zfin.DataService.AggregateProgressCallback { + return .{ + .context = @ptrCast(self), + .on_progress = onProgress, + }; + } +}; + +/// Unified price loading for both CLI and TUI. +/// Handles parallel server sync when ZFIN_SERVER is configured, +/// with sequential provider fallback for failures. +pub fn loadPortfolioPrices( + svc: *zfin.DataService, + portfolio_syms: ?[]const []const u8, + watch_syms: []const []const u8, + force_refresh: bool, + color: bool, +) zfin.DataService.LoadAllResult { + var aggregate = AggregateProgress{ .color = color }; + var symbol_progress = LoadProgress{ + .svc = svc, + .color = color, + .index_offset = 0, + .grand_total = (if (portfolio_syms) |ps| ps.len else 0) + watch_syms.len, + }; + + const result = svc.loadAllPrices( + portfolio_syms, + watch_syms, + .{ .force_refresh = force_refresh, .color = color }, + aggregate.callback(), + symbol_progress.callback(), + ); + + // Print summary + const total = symbol_progress.grand_total; + const from_cache = result.cached_count; + const from_server = result.server_synced_count; + const from_provider = result.provider_fetched_count; + const failed = result.failed_count; + const stale = result.stale_count; + + var buf: [256]u8 = undefined; + var writer = std.fs.File.stderr().writer(&buf); + const out = &writer.interface; + + if (from_cache == total) { + if (color) fmt.ansiSetFg(out, CLR_MUTED[0], CLR_MUTED[1], CLR_MUTED[2]) catch {}; + out.print(" Loaded {d} symbols from cache\n", .{total}) catch {}; + if (color) fmt.ansiReset(out) catch {}; + } else if (failed > 0) { + if (color) fmt.ansiSetFg(out, CLR_MUTED[0], CLR_MUTED[1], CLR_MUTED[2]) catch {}; + if (stale > 0) { + out.print(" Loaded {d} symbols ({d} cached, {d} server, {d} provider, {d} failed — {d} using stale)\n", .{ total, from_cache, from_server, from_provider, failed, stale }) catch {}; + } else { + out.print(" Loaded {d} symbols ({d} cached, {d} server, {d} provider, {d} failed)\n", .{ total, from_cache, from_server, from_provider, failed }) catch {}; + } + if (color) fmt.ansiReset(out) catch {}; + } else { + if (color) fmt.ansiSetFg(out, CLR_MUTED[0], CLR_MUTED[1], CLR_MUTED[2]) catch {}; + if (from_server > 0 and from_provider > 0) { + out.print(" Loaded {d} symbols ({d} cached, {d} server, {d} provider)\n", .{ total, from_cache, from_server, from_provider }) catch {}; + } else if (from_server > 0) { + out.print(" Loaded {d} symbols ({d} cached, {d} server)\n", .{ total, from_cache, from_server }) catch {}; + } else { + out.print(" Loaded {d} symbols ({d} cached, {d} fetched)\n", .{ total, from_cache, from_provider }) catch {}; + } + if (color) fmt.ansiReset(out) catch {}; + } + out.flush() catch {}; + + return result; +} + // ── Portfolio data pipeline ────────────────────────────────── /// Result of the shared portfolio data pipeline. Caller must call deinit(). diff --git a/src/commands/portfolio.zig b/src/commands/portfolio.zig index 4cb18ea..77440ac 100644 --- a/src/commands/portfolio.zig +++ b/src/commands/portfolio.zig @@ -56,45 +56,22 @@ pub fn run(allocator: std.mem.Allocator, svc: *zfin.DataService, file_path: []co const all_syms_count = syms.len + watch_syms.items.len; if (all_syms_count > 0) { + // Use consolidated parallel loader + var load_result = cli.loadPortfolioPrices( + svc, + syms, + watch_syms.items, + force_refresh, + color, + ); + defer load_result.deinit(); // Free the prices hashmap after we copy - // Progress callback for per-symbol output - var progress_ctx = cli.LoadProgress{ - .svc = svc, - .color = color, - .index_offset = 0, - .grand_total = all_syms_count, - }; - - // Load prices for stock/ETF positions - const load_result = svc.loadPrices(syms, &prices, force_refresh, progress_ctx.callback()); - fail_count = load_result.fail_count; - - // Fetch watch symbol candles (for watchlist display, not portfolio value) - progress_ctx.index_offset = syms.len; - _ = svc.loadPrices(watch_syms.items, &prices, force_refresh, progress_ctx.callback()); - - // Summary line - { - const cached_count = load_result.cached_count; - const fetched_count = load_result.fetched_count; - var msg_buf: [256]u8 = undefined; - if (cached_count == all_syms_count) { - const msg = std.fmt.bufPrint(&msg_buf, "All {d} symbols loaded from cache\n", .{all_syms_count}) catch "Loaded from cache\n"; - try cli.stderrPrint(msg); - } else if (fail_count > 0) { - const stale = load_result.stale_count; - if (stale > 0) { - const msg = std.fmt.bufPrint(&msg_buf, "Loaded {d} symbols ({d} cached, {d} fetched, {d} failed — {d} using stale cache)\n", .{ all_syms_count, cached_count, fetched_count, fail_count, stale }) catch "Done loading\n"; - try cli.stderrPrint(msg); - } else { - const msg = std.fmt.bufPrint(&msg_buf, "Loaded {d} symbols ({d} cached, {d} fetched, {d} failed)\n", .{ all_syms_count, cached_count, fetched_count, fail_count }) catch "Done loading\n"; - try cli.stderrPrint(msg); - } - } else { - const msg = std.fmt.bufPrint(&msg_buf, "Loaded {d} symbols ({d} cached, {d} fetched)\n", .{ all_syms_count, cached_count, fetched_count }) catch "Done loading\n"; - try cli.stderrPrint(msg); - } + // Transfer prices to our local map + var it = load_result.prices.iterator(); + while (it.next()) |entry| { + prices.put(entry.key_ptr.*, entry.value_ptr.*) catch {}; } + fail_count = load_result.failed_count; } // Build portfolio summary, candle map, and historical snapshots diff --git a/src/service.zig b/src/service.zig index 18012ff..356b150 100644 --- a/src/service.zig +++ b/src/service.zig @@ -748,6 +748,356 @@ pub const DataService = struct { return result; } + // ── Consolidated Price Loading (Parallel Server + Sequential Provider) ── + + /// Configuration for loadAllPrices. + pub const LoadAllConfig = struct { + force_refresh: bool = false, + color: bool = true, + /// Maximum concurrent server sync requests. 0 = auto (8). + max_concurrent: usize = 0, + }; + + /// Result of loadAllPrices operation. + pub const LoadAllResult = struct { + prices: std.StringHashMap(f64), + /// Number of symbols resolved from fresh local cache. + cached_count: usize, + /// Number of symbols synced from server. + server_synced_count: usize, + /// Number of symbols fetched from providers (rate-limited APIs). + provider_fetched_count: usize, + /// Number of symbols that failed all sources but used stale cache. + stale_count: usize, + /// Number of symbols that failed completely (no data). + failed_count: usize, + /// Latest candle date seen. + latest_date: ?Date, + + /// Free the prices hashmap. Call this if you don't transfer ownership. + pub fn deinit(self: *LoadAllResult) void { + self.prices.deinit(); + } + }; + + /// Progress callback for aggregate (parallel) progress reporting. + /// Called periodically during parallel operations with current counts. + pub const AggregateProgressCallback = struct { + context: *anyopaque, + on_progress: *const fn (ctx: *anyopaque, completed: usize, total: usize, phase: Phase) void, + + pub const Phase = enum { + /// Checking local cache + cache_check, + /// Syncing from ZFIN_SERVER + server_sync, + /// Fetching from rate-limited providers + provider_fetch, + /// Done + complete, + }; + + fn emit(self: AggregateProgressCallback, completed: usize, total: usize, phase: Phase) void { + self.on_progress(self.context, completed, total, phase); + } + }; + + /// Thread-safe counter for parallel progress tracking. + const AtomicCounter = struct { + value: std.atomic.Value(usize) = std.atomic.Value(usize).init(0), + + fn increment(self: *AtomicCounter) usize { + return self.value.fetchAdd(1, .monotonic); + } + + fn load(self: *const AtomicCounter) usize { + return self.value.load(.monotonic); + } + }; + + /// Per-symbol result from parallel server sync. + const ServerSyncResult = struct { + symbol: []const u8, + success: bool, + }; + + /// Load prices for portfolio and watchlist symbols with automatic parallelization. + /// + /// When ZFIN_SERVER is configured: + /// 1. Check local cache (fast, parallel-safe) + /// 2. Parallel sync from server for cache misses + /// 3. Sequential provider fallback for server failures + /// + /// When ZFIN_SERVER is not configured: + /// Falls back to sequential loading with per-symbol progress. + /// + /// Progress is reported via `aggregate_progress` for parallel phases + /// and `symbol_progress` for sequential provider fallback. + pub fn loadAllPrices( + self: *DataService, + portfolio_syms: ?[]const []const u8, + watch_syms: []const []const u8, + config: LoadAllConfig, + aggregate_progress: ?AggregateProgressCallback, + symbol_progress: ?ProgressCallback, + ) LoadAllResult { + var result = LoadAllResult{ + .prices = std.StringHashMap(f64).init(self.allocator), + .cached_count = 0, + .server_synced_count = 0, + .provider_fetched_count = 0, + .stale_count = 0, + .failed_count = 0, + .latest_date = null, + }; + + // Combine all symbols + const portfolio_count = if (portfolio_syms) |ps| ps.len else 0; + const watch_count = watch_syms.len; + const total_count = portfolio_count + watch_count; + + if (total_count == 0) return result; + + // Build combined symbol list + var all_symbols = std.ArrayList([]const u8).initCapacity(self.allocator, total_count) catch return result; + defer all_symbols.deinit(self.allocator); + + if (portfolio_syms) |ps| { + for (ps) |sym| all_symbols.append(self.allocator, sym) catch {}; + } + for (watch_syms) |sym| all_symbols.append(self.allocator, sym) catch {}; + + // Invalidate cache if force refresh + if (config.force_refresh) { + for (all_symbols.items) |sym| { + self.invalidate(sym, .candles_daily); + } + } + + // Phase 1: Check local cache (fast path) + var needs_fetch: std.ArrayList([]const u8) = .empty; + defer needs_fetch.deinit(self.allocator); + + if (aggregate_progress) |p| p.emit(0, total_count, .cache_check); + + for (all_symbols.items) |sym| { + if (!config.force_refresh and self.isCandleCacheFresh(sym)) { + if (self.getCachedLastClose(sym)) |close| { + result.prices.put(sym, close) catch {}; + self.updateLatestDate(&result, sym); + } + result.cached_count += 1; + } else { + needs_fetch.append(self.allocator, sym) catch {}; + } + } + + if (aggregate_progress) |p| p.emit(result.cached_count, total_count, .cache_check); + + if (needs_fetch.items.len == 0) { + if (aggregate_progress) |p| p.emit(total_count, total_count, .complete); + return result; + } + + // Phase 2: Server sync (parallel if server configured) + var server_failures: std.ArrayList([]const u8) = .empty; + defer server_failures.deinit(self.allocator); + + if (self.config.server_url != null) { + self.parallelServerSync( + needs_fetch.items, + &result, + &server_failures, + config, + aggregate_progress, + total_count, + ); + } else { + // No server — all need provider fetch + for (needs_fetch.items) |sym| { + server_failures.append(self.allocator, sym) catch {}; + } + } + + // Phase 3: Sequential provider fallback for server failures + if (server_failures.items.len > 0) { + if (aggregate_progress) |p| p.emit( + result.cached_count + result.server_synced_count, + total_count, + .provider_fetch, + ); + + self.sequentialProviderFetch( + server_failures.items, + &result, + symbol_progress, + total_count - server_failures.items.len, // offset for progress display + ); + } + + if (aggregate_progress) |p| p.emit(total_count, total_count, .complete); + return result; + } + + /// Parallel server sync using thread pool. + fn parallelServerSync( + self: *DataService, + symbols: []const []const u8, + result: *LoadAllResult, + failures: *std.ArrayList([]const u8), + config: LoadAllConfig, + aggregate_progress: ?AggregateProgressCallback, + total_count: usize, + ) void { + const max_threads = if (config.max_concurrent > 0) config.max_concurrent else 8; + const thread_count = @min(symbols.len, max_threads); + + if (aggregate_progress) |p| p.emit(result.cached_count, total_count, .server_sync); + + // Shared state for worker threads + var completed = AtomicCounter{}; + var next_index = AtomicCounter{}; + const sync_results = self.allocator.alloc(ServerSyncResult, symbols.len) catch { + // Allocation failed — fall back to marking all as failures + for (symbols) |sym| failures.append(self.allocator, sym) catch {}; + return; + }; + defer self.allocator.free(sync_results); + + // Initialize results + for (sync_results, 0..) |*sr, i| { + sr.* = .{ .symbol = symbols[i], .success = false }; + } + + // Spawn worker threads + var threads = self.allocator.alloc(std.Thread, thread_count) catch { + for (symbols) |sym| failures.append(self.allocator, sym) catch {}; + return; + }; + defer self.allocator.free(threads); + + const WorkerContext = struct { + svc: *DataService, + symbols: []const []const u8, + results: []ServerSyncResult, + next_index: *AtomicCounter, + completed: *AtomicCounter, + }; + + var ctx = WorkerContext{ + .svc = self, + .symbols = symbols, + .results = sync_results, + .next_index = &next_index, + .completed = &completed, + }; + + const worker = struct { + fn run(wctx: *WorkerContext) void { + while (true) { + const idx = wctx.next_index.increment(); + if (idx >= wctx.symbols.len) break; + + const sym = wctx.symbols[idx]; + const success = wctx.svc.syncCandlesFromServer(sym); + wctx.results[idx].success = success; + _ = wctx.completed.increment(); + } + } + }; + + // Start threads + var spawned: usize = 0; + for (threads) |*t| { + t.* = std.Thread.spawn(.{}, worker.run, .{&ctx}) catch continue; + spawned += 1; + } + + // Progress reporting while waiting + if (aggregate_progress) |p| { + while (completed.load() < symbols.len) { + std.Thread.sleep(50 * std.time.ns_per_ms); + p.emit(result.cached_count + completed.load(), total_count, .server_sync); + } + } + + // Wait for all threads + for (threads[0..spawned]) |t| { + t.join(); + } + + // Process results + for (sync_results) |sr| { + if (sr.success) { + // Server sync succeeded — read from cache + if (self.getCachedLastClose(sr.symbol)) |close| { + result.prices.put(sr.symbol, close) catch {}; + self.updateLatestDate(result, sr.symbol); + result.server_synced_count += 1; + } else { + // Sync said success but can't read cache — treat as failure + failures.append(self.allocator, sr.symbol) catch {}; + } + } else { + failures.append(self.allocator, sr.symbol) catch {}; + } + } + } + + /// Sequential provider fetch for symbols that failed server sync. + fn sequentialProviderFetch( + self: *DataService, + symbols: []const []const u8, + result: *LoadAllResult, + progress: ?ProgressCallback, + index_offset: usize, + ) void { + const total = index_offset + symbols.len; + + for (symbols, 0..) |sym, i| { + const display_idx = index_offset + i; + + // Notify: about to fetch + if (progress) |p| p.emit(display_idx, total, sym, .fetching); + + // Try provider fetch + if (self.getCandles(sym)) |candle_result| { + defer self.allocator.free(candle_result.data); + if (candle_result.data.len > 0) { + const last = candle_result.data[candle_result.data.len - 1]; + result.prices.put(sym, last.close) catch {}; + if (result.latest_date == null or last.date.days > result.latest_date.?.days) { + result.latest_date = last.date; + } + } + result.provider_fetched_count += 1; + if (progress) |p| p.emit(display_idx, total, sym, .fetched); + continue; + } else |_| {} + + // Provider failed — try stale cache + result.failed_count += 1; + if (self.getCachedLastClose(sym)) |close| { + result.prices.put(sym, close) catch {}; + result.stale_count += 1; + if (progress) |p| p.emit(display_idx, total, sym, .failed_used_stale); + } else { + if (progress) |p| p.emit(display_idx, total, sym, .failed); + } + } + } + + /// Update latest_date in result from cached candle metadata. + fn updateLatestDate(self: *DataService, result: *LoadAllResult, symbol: []const u8) void { + var s = self.store(); + if (s.readCandleMeta(symbol)) |cm| { + const d = cm.meta.last_date; + if (result.latest_date == null or d.days > result.latest_date.?.days) { + result.latest_date = d; + } + } + } + // ── CUSIP Resolution ────────────────────────────────────────── /// Look up multiple CUSIPs in a single batch request via OpenFIGI. diff --git a/src/tui.zig b/src/tui.zig index 75c511d..8aa34ea 100644 --- a/src/tui.zig +++ b/src/tui.zig @@ -1781,32 +1781,15 @@ pub fn run(allocator: std.mem.Allocator, config: zfin.Config, args: []const []co const total_count = stock_count + watch_syms.items.len; if (total_count > 0) { - var prices = std.StringHashMap(f64).init(allocator); - - var progress = cli.LoadProgress{ - .svc = svc, - .color = true, - .index_offset = 0, - .grand_total = total_count, - }; - - if (syms) |ss| { - const result = svc.loadPrices(ss, &prices, false, progress.callback()); - progress.index_offset = stock_count; - - if (result.fetched_count > 0 or result.fail_count > 0) { - var msg_buf: [256]u8 = undefined; - const msg = std.fmt.bufPrint(&msg_buf, "Loaded {d} symbols ({d} cached, {d} fetched, {d} failed)\n", .{ ss.len, result.cached_count, result.fetched_count, result.fail_count }) catch "Done loading\n"; - cli.stderrPrint(msg) catch {}; - } - } - - // Load watchlist prices - if (watch_syms.items.len > 0) { - _ = svc.loadPrices(watch_syms.items, &prices, false, progress.callback()); - } - - app_inst.prefetched_prices = prices; + // Use consolidated parallel loader + const load_result = cli.loadPortfolioPrices( + svc, + syms, + watch_syms.items, + false, // force_refresh + true, // color + ); + app_inst.prefetched_prices = load_result.prices; } }