This commit is contained in:
parent
6d99349b62
commit
7144f60d10
4 changed files with 496 additions and 63 deletions
|
|
@ -123,6 +123,129 @@ pub const LoadProgress = struct {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/// Aggregate progress callback for parallel loading operations.
|
||||||
|
/// Displays a single updating line with progress bar.
|
||||||
|
pub const AggregateProgress = struct {
|
||||||
|
color: bool,
|
||||||
|
last_phase: ?zfin.DataService.AggregateProgressCallback.Phase = null,
|
||||||
|
|
||||||
|
fn onProgress(ctx: *anyopaque, completed: usize, total: usize, phase: zfin.DataService.AggregateProgressCallback.Phase) void {
|
||||||
|
const self: *AggregateProgress = @ptrCast(@alignCast(ctx));
|
||||||
|
|
||||||
|
// Track phase transitions for newlines
|
||||||
|
const phase_changed = self.last_phase == null or self.last_phase.? != phase;
|
||||||
|
self.last_phase = phase;
|
||||||
|
|
||||||
|
var buf: [256]u8 = undefined;
|
||||||
|
var writer = std.fs.File.stderr().writer(&buf);
|
||||||
|
const out = &writer.interface;
|
||||||
|
|
||||||
|
switch (phase) {
|
||||||
|
.cache_check => {
|
||||||
|
// Brief phase, no output needed
|
||||||
|
},
|
||||||
|
.server_sync => {
|
||||||
|
// Single updating line with carriage return
|
||||||
|
if (self.color) fmt.ansiSetFg(out, CLR_MUTED[0], CLR_MUTED[1], CLR_MUTED[2]) catch {};
|
||||||
|
out.print("\r Syncing from server... [{d}/{d}]", .{ completed, total }) catch {};
|
||||||
|
if (self.color) fmt.ansiReset(out) catch {};
|
||||||
|
out.flush() catch {};
|
||||||
|
},
|
||||||
|
.provider_fetch => {
|
||||||
|
if (phase_changed) {
|
||||||
|
// Clear the server sync line and print newline
|
||||||
|
out.print("\r\x1b[K", .{}) catch {}; // clear line
|
||||||
|
if (self.color) fmt.ansiSetFg(out, CLR_MUTED[0], CLR_MUTED[1], CLR_MUTED[2]) catch {};
|
||||||
|
out.print(" Synced {d} from server, fetching remaining from providers...\n", .{completed}) catch {};
|
||||||
|
if (self.color) fmt.ansiReset(out) catch {};
|
||||||
|
out.flush() catch {};
|
||||||
|
}
|
||||||
|
},
|
||||||
|
.complete => {
|
||||||
|
// Final newline if we were on server_sync line
|
||||||
|
if (self.last_phase != null and
|
||||||
|
(self.last_phase.? == .server_sync or self.last_phase.? == .cache_check))
|
||||||
|
{
|
||||||
|
out.print("\r\x1b[K", .{}) catch {}; // clear line
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn callback(self: *AggregateProgress) zfin.DataService.AggregateProgressCallback {
|
||||||
|
return .{
|
||||||
|
.context = @ptrCast(self),
|
||||||
|
.on_progress = onProgress,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Unified price loading for both CLI and TUI.
|
||||||
|
/// Handles parallel server sync when ZFIN_SERVER is configured,
|
||||||
|
/// with sequential provider fallback for failures.
|
||||||
|
pub fn loadPortfolioPrices(
|
||||||
|
svc: *zfin.DataService,
|
||||||
|
portfolio_syms: ?[]const []const u8,
|
||||||
|
watch_syms: []const []const u8,
|
||||||
|
force_refresh: bool,
|
||||||
|
color: bool,
|
||||||
|
) zfin.DataService.LoadAllResult {
|
||||||
|
var aggregate = AggregateProgress{ .color = color };
|
||||||
|
var symbol_progress = LoadProgress{
|
||||||
|
.svc = svc,
|
||||||
|
.color = color,
|
||||||
|
.index_offset = 0,
|
||||||
|
.grand_total = (if (portfolio_syms) |ps| ps.len else 0) + watch_syms.len,
|
||||||
|
};
|
||||||
|
|
||||||
|
const result = svc.loadAllPrices(
|
||||||
|
portfolio_syms,
|
||||||
|
watch_syms,
|
||||||
|
.{ .force_refresh = force_refresh, .color = color },
|
||||||
|
aggregate.callback(),
|
||||||
|
symbol_progress.callback(),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Print summary
|
||||||
|
const total = symbol_progress.grand_total;
|
||||||
|
const from_cache = result.cached_count;
|
||||||
|
const from_server = result.server_synced_count;
|
||||||
|
const from_provider = result.provider_fetched_count;
|
||||||
|
const failed = result.failed_count;
|
||||||
|
const stale = result.stale_count;
|
||||||
|
|
||||||
|
var buf: [256]u8 = undefined;
|
||||||
|
var writer = std.fs.File.stderr().writer(&buf);
|
||||||
|
const out = &writer.interface;
|
||||||
|
|
||||||
|
if (from_cache == total) {
|
||||||
|
if (color) fmt.ansiSetFg(out, CLR_MUTED[0], CLR_MUTED[1], CLR_MUTED[2]) catch {};
|
||||||
|
out.print(" Loaded {d} symbols from cache\n", .{total}) catch {};
|
||||||
|
if (color) fmt.ansiReset(out) catch {};
|
||||||
|
} else if (failed > 0) {
|
||||||
|
if (color) fmt.ansiSetFg(out, CLR_MUTED[0], CLR_MUTED[1], CLR_MUTED[2]) catch {};
|
||||||
|
if (stale > 0) {
|
||||||
|
out.print(" Loaded {d} symbols ({d} cached, {d} server, {d} provider, {d} failed — {d} using stale)\n", .{ total, from_cache, from_server, from_provider, failed, stale }) catch {};
|
||||||
|
} else {
|
||||||
|
out.print(" Loaded {d} symbols ({d} cached, {d} server, {d} provider, {d} failed)\n", .{ total, from_cache, from_server, from_provider, failed }) catch {};
|
||||||
|
}
|
||||||
|
if (color) fmt.ansiReset(out) catch {};
|
||||||
|
} else {
|
||||||
|
if (color) fmt.ansiSetFg(out, CLR_MUTED[0], CLR_MUTED[1], CLR_MUTED[2]) catch {};
|
||||||
|
if (from_server > 0 and from_provider > 0) {
|
||||||
|
out.print(" Loaded {d} symbols ({d} cached, {d} server, {d} provider)\n", .{ total, from_cache, from_server, from_provider }) catch {};
|
||||||
|
} else if (from_server > 0) {
|
||||||
|
out.print(" Loaded {d} symbols ({d} cached, {d} server)\n", .{ total, from_cache, from_server }) catch {};
|
||||||
|
} else {
|
||||||
|
out.print(" Loaded {d} symbols ({d} cached, {d} fetched)\n", .{ total, from_cache, from_provider }) catch {};
|
||||||
|
}
|
||||||
|
if (color) fmt.ansiReset(out) catch {};
|
||||||
|
}
|
||||||
|
out.flush() catch {};
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
// ── Portfolio data pipeline ──────────────────────────────────
|
// ── Portfolio data pipeline ──────────────────────────────────
|
||||||
|
|
||||||
/// Result of the shared portfolio data pipeline. Caller must call deinit().
|
/// Result of the shared portfolio data pipeline. Caller must call deinit().
|
||||||
|
|
|
||||||
|
|
@ -56,45 +56,22 @@ pub fn run(allocator: std.mem.Allocator, svc: *zfin.DataService, file_path: []co
|
||||||
const all_syms_count = syms.len + watch_syms.items.len;
|
const all_syms_count = syms.len + watch_syms.items.len;
|
||||||
|
|
||||||
if (all_syms_count > 0) {
|
if (all_syms_count > 0) {
|
||||||
|
// Use consolidated parallel loader
|
||||||
|
var load_result = cli.loadPortfolioPrices(
|
||||||
|
svc,
|
||||||
|
syms,
|
||||||
|
watch_syms.items,
|
||||||
|
force_refresh,
|
||||||
|
color,
|
||||||
|
);
|
||||||
|
defer load_result.deinit(); // Free the prices hashmap after we copy
|
||||||
|
|
||||||
// Progress callback for per-symbol output
|
// Transfer prices to our local map
|
||||||
var progress_ctx = cli.LoadProgress{
|
var it = load_result.prices.iterator();
|
||||||
.svc = svc,
|
while (it.next()) |entry| {
|
||||||
.color = color,
|
prices.put(entry.key_ptr.*, entry.value_ptr.*) catch {};
|
||||||
.index_offset = 0,
|
|
||||||
.grand_total = all_syms_count,
|
|
||||||
};
|
|
||||||
|
|
||||||
// Load prices for stock/ETF positions
|
|
||||||
const load_result = svc.loadPrices(syms, &prices, force_refresh, progress_ctx.callback());
|
|
||||||
fail_count = load_result.fail_count;
|
|
||||||
|
|
||||||
// Fetch watch symbol candles (for watchlist display, not portfolio value)
|
|
||||||
progress_ctx.index_offset = syms.len;
|
|
||||||
_ = svc.loadPrices(watch_syms.items, &prices, force_refresh, progress_ctx.callback());
|
|
||||||
|
|
||||||
// Summary line
|
|
||||||
{
|
|
||||||
const cached_count = load_result.cached_count;
|
|
||||||
const fetched_count = load_result.fetched_count;
|
|
||||||
var msg_buf: [256]u8 = undefined;
|
|
||||||
if (cached_count == all_syms_count) {
|
|
||||||
const msg = std.fmt.bufPrint(&msg_buf, "All {d} symbols loaded from cache\n", .{all_syms_count}) catch "Loaded from cache\n";
|
|
||||||
try cli.stderrPrint(msg);
|
|
||||||
} else if (fail_count > 0) {
|
|
||||||
const stale = load_result.stale_count;
|
|
||||||
if (stale > 0) {
|
|
||||||
const msg = std.fmt.bufPrint(&msg_buf, "Loaded {d} symbols ({d} cached, {d} fetched, {d} failed — {d} using stale cache)\n", .{ all_syms_count, cached_count, fetched_count, fail_count, stale }) catch "Done loading\n";
|
|
||||||
try cli.stderrPrint(msg);
|
|
||||||
} else {
|
|
||||||
const msg = std.fmt.bufPrint(&msg_buf, "Loaded {d} symbols ({d} cached, {d} fetched, {d} failed)\n", .{ all_syms_count, cached_count, fetched_count, fail_count }) catch "Done loading\n";
|
|
||||||
try cli.stderrPrint(msg);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
const msg = std.fmt.bufPrint(&msg_buf, "Loaded {d} symbols ({d} cached, {d} fetched)\n", .{ all_syms_count, cached_count, fetched_count }) catch "Done loading\n";
|
|
||||||
try cli.stderrPrint(msg);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
fail_count = load_result.failed_count;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Build portfolio summary, candle map, and historical snapshots
|
// Build portfolio summary, candle map, and historical snapshots
|
||||||
|
|
|
||||||
350
src/service.zig
350
src/service.zig
|
|
@ -748,6 +748,356 @@ pub const DataService = struct {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Consolidated Price Loading (Parallel Server + Sequential Provider) ──
|
||||||
|
|
||||||
|
/// Configuration for loadAllPrices.
|
||||||
|
pub const LoadAllConfig = struct {
|
||||||
|
force_refresh: bool = false,
|
||||||
|
color: bool = true,
|
||||||
|
/// Maximum concurrent server sync requests. 0 = auto (8).
|
||||||
|
max_concurrent: usize = 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Result of loadAllPrices operation.
|
||||||
|
pub const LoadAllResult = struct {
|
||||||
|
prices: std.StringHashMap(f64),
|
||||||
|
/// Number of symbols resolved from fresh local cache.
|
||||||
|
cached_count: usize,
|
||||||
|
/// Number of symbols synced from server.
|
||||||
|
server_synced_count: usize,
|
||||||
|
/// Number of symbols fetched from providers (rate-limited APIs).
|
||||||
|
provider_fetched_count: usize,
|
||||||
|
/// Number of symbols that failed all sources but used stale cache.
|
||||||
|
stale_count: usize,
|
||||||
|
/// Number of symbols that failed completely (no data).
|
||||||
|
failed_count: usize,
|
||||||
|
/// Latest candle date seen.
|
||||||
|
latest_date: ?Date,
|
||||||
|
|
||||||
|
/// Free the prices hashmap. Call this if you don't transfer ownership.
|
||||||
|
pub fn deinit(self: *LoadAllResult) void {
|
||||||
|
self.prices.deinit();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Progress callback for aggregate (parallel) progress reporting.
|
||||||
|
/// Called periodically during parallel operations with current counts.
|
||||||
|
pub const AggregateProgressCallback = struct {
|
||||||
|
context: *anyopaque,
|
||||||
|
on_progress: *const fn (ctx: *anyopaque, completed: usize, total: usize, phase: Phase) void,
|
||||||
|
|
||||||
|
pub const Phase = enum {
|
||||||
|
/// Checking local cache
|
||||||
|
cache_check,
|
||||||
|
/// Syncing from ZFIN_SERVER
|
||||||
|
server_sync,
|
||||||
|
/// Fetching from rate-limited providers
|
||||||
|
provider_fetch,
|
||||||
|
/// Done
|
||||||
|
complete,
|
||||||
|
};
|
||||||
|
|
||||||
|
fn emit(self: AggregateProgressCallback, completed: usize, total: usize, phase: Phase) void {
|
||||||
|
self.on_progress(self.context, completed, total, phase);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Thread-safe counter for parallel progress tracking.
|
||||||
|
const AtomicCounter = struct {
|
||||||
|
value: std.atomic.Value(usize) = std.atomic.Value(usize).init(0),
|
||||||
|
|
||||||
|
fn increment(self: *AtomicCounter) usize {
|
||||||
|
return self.value.fetchAdd(1, .monotonic);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn load(self: *const AtomicCounter) usize {
|
||||||
|
return self.value.load(.monotonic);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Per-symbol result from parallel server sync.
|
||||||
|
const ServerSyncResult = struct {
|
||||||
|
symbol: []const u8,
|
||||||
|
success: bool,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Load prices for portfolio and watchlist symbols with automatic parallelization.
|
||||||
|
///
|
||||||
|
/// When ZFIN_SERVER is configured:
|
||||||
|
/// 1. Check local cache (fast, parallel-safe)
|
||||||
|
/// 2. Parallel sync from server for cache misses
|
||||||
|
/// 3. Sequential provider fallback for server failures
|
||||||
|
///
|
||||||
|
/// When ZFIN_SERVER is not configured:
|
||||||
|
/// Falls back to sequential loading with per-symbol progress.
|
||||||
|
///
|
||||||
|
/// Progress is reported via `aggregate_progress` for parallel phases
|
||||||
|
/// and `symbol_progress` for sequential provider fallback.
|
||||||
|
pub fn loadAllPrices(
|
||||||
|
self: *DataService,
|
||||||
|
portfolio_syms: ?[]const []const u8,
|
||||||
|
watch_syms: []const []const u8,
|
||||||
|
config: LoadAllConfig,
|
||||||
|
aggregate_progress: ?AggregateProgressCallback,
|
||||||
|
symbol_progress: ?ProgressCallback,
|
||||||
|
) LoadAllResult {
|
||||||
|
var result = LoadAllResult{
|
||||||
|
.prices = std.StringHashMap(f64).init(self.allocator),
|
||||||
|
.cached_count = 0,
|
||||||
|
.server_synced_count = 0,
|
||||||
|
.provider_fetched_count = 0,
|
||||||
|
.stale_count = 0,
|
||||||
|
.failed_count = 0,
|
||||||
|
.latest_date = null,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Combine all symbols
|
||||||
|
const portfolio_count = if (portfolio_syms) |ps| ps.len else 0;
|
||||||
|
const watch_count = watch_syms.len;
|
||||||
|
const total_count = portfolio_count + watch_count;
|
||||||
|
|
||||||
|
if (total_count == 0) return result;
|
||||||
|
|
||||||
|
// Build combined symbol list
|
||||||
|
var all_symbols = std.ArrayList([]const u8).initCapacity(self.allocator, total_count) catch return result;
|
||||||
|
defer all_symbols.deinit(self.allocator);
|
||||||
|
|
||||||
|
if (portfolio_syms) |ps| {
|
||||||
|
for (ps) |sym| all_symbols.append(self.allocator, sym) catch {};
|
||||||
|
}
|
||||||
|
for (watch_syms) |sym| all_symbols.append(self.allocator, sym) catch {};
|
||||||
|
|
||||||
|
// Invalidate cache if force refresh
|
||||||
|
if (config.force_refresh) {
|
||||||
|
for (all_symbols.items) |sym| {
|
||||||
|
self.invalidate(sym, .candles_daily);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Phase 1: Check local cache (fast path)
|
||||||
|
var needs_fetch: std.ArrayList([]const u8) = .empty;
|
||||||
|
defer needs_fetch.deinit(self.allocator);
|
||||||
|
|
||||||
|
if (aggregate_progress) |p| p.emit(0, total_count, .cache_check);
|
||||||
|
|
||||||
|
for (all_symbols.items) |sym| {
|
||||||
|
if (!config.force_refresh and self.isCandleCacheFresh(sym)) {
|
||||||
|
if (self.getCachedLastClose(sym)) |close| {
|
||||||
|
result.prices.put(sym, close) catch {};
|
||||||
|
self.updateLatestDate(&result, sym);
|
||||||
|
}
|
||||||
|
result.cached_count += 1;
|
||||||
|
} else {
|
||||||
|
needs_fetch.append(self.allocator, sym) catch {};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (aggregate_progress) |p| p.emit(result.cached_count, total_count, .cache_check);
|
||||||
|
|
||||||
|
if (needs_fetch.items.len == 0) {
|
||||||
|
if (aggregate_progress) |p| p.emit(total_count, total_count, .complete);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Phase 2: Server sync (parallel if server configured)
|
||||||
|
var server_failures: std.ArrayList([]const u8) = .empty;
|
||||||
|
defer server_failures.deinit(self.allocator);
|
||||||
|
|
||||||
|
if (self.config.server_url != null) {
|
||||||
|
self.parallelServerSync(
|
||||||
|
needs_fetch.items,
|
||||||
|
&result,
|
||||||
|
&server_failures,
|
||||||
|
config,
|
||||||
|
aggregate_progress,
|
||||||
|
total_count,
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
// No server — all need provider fetch
|
||||||
|
for (needs_fetch.items) |sym| {
|
||||||
|
server_failures.append(self.allocator, sym) catch {};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Phase 3: Sequential provider fallback for server failures
|
||||||
|
if (server_failures.items.len > 0) {
|
||||||
|
if (aggregate_progress) |p| p.emit(
|
||||||
|
result.cached_count + result.server_synced_count,
|
||||||
|
total_count,
|
||||||
|
.provider_fetch,
|
||||||
|
);
|
||||||
|
|
||||||
|
self.sequentialProviderFetch(
|
||||||
|
server_failures.items,
|
||||||
|
&result,
|
||||||
|
symbol_progress,
|
||||||
|
total_count - server_failures.items.len, // offset for progress display
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (aggregate_progress) |p| p.emit(total_count, total_count, .complete);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Parallel server sync using thread pool.
|
||||||
|
fn parallelServerSync(
|
||||||
|
self: *DataService,
|
||||||
|
symbols: []const []const u8,
|
||||||
|
result: *LoadAllResult,
|
||||||
|
failures: *std.ArrayList([]const u8),
|
||||||
|
config: LoadAllConfig,
|
||||||
|
aggregate_progress: ?AggregateProgressCallback,
|
||||||
|
total_count: usize,
|
||||||
|
) void {
|
||||||
|
const max_threads = if (config.max_concurrent > 0) config.max_concurrent else 8;
|
||||||
|
const thread_count = @min(symbols.len, max_threads);
|
||||||
|
|
||||||
|
if (aggregate_progress) |p| p.emit(result.cached_count, total_count, .server_sync);
|
||||||
|
|
||||||
|
// Shared state for worker threads
|
||||||
|
var completed = AtomicCounter{};
|
||||||
|
var next_index = AtomicCounter{};
|
||||||
|
const sync_results = self.allocator.alloc(ServerSyncResult, symbols.len) catch {
|
||||||
|
// Allocation failed — fall back to marking all as failures
|
||||||
|
for (symbols) |sym| failures.append(self.allocator, sym) catch {};
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
defer self.allocator.free(sync_results);
|
||||||
|
|
||||||
|
// Initialize results
|
||||||
|
for (sync_results, 0..) |*sr, i| {
|
||||||
|
sr.* = .{ .symbol = symbols[i], .success = false };
|
||||||
|
}
|
||||||
|
|
||||||
|
// Spawn worker threads
|
||||||
|
var threads = self.allocator.alloc(std.Thread, thread_count) catch {
|
||||||
|
for (symbols) |sym| failures.append(self.allocator, sym) catch {};
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
defer self.allocator.free(threads);
|
||||||
|
|
||||||
|
const WorkerContext = struct {
|
||||||
|
svc: *DataService,
|
||||||
|
symbols: []const []const u8,
|
||||||
|
results: []ServerSyncResult,
|
||||||
|
next_index: *AtomicCounter,
|
||||||
|
completed: *AtomicCounter,
|
||||||
|
};
|
||||||
|
|
||||||
|
var ctx = WorkerContext{
|
||||||
|
.svc = self,
|
||||||
|
.symbols = symbols,
|
||||||
|
.results = sync_results,
|
||||||
|
.next_index = &next_index,
|
||||||
|
.completed = &completed,
|
||||||
|
};
|
||||||
|
|
||||||
|
const worker = struct {
|
||||||
|
fn run(wctx: *WorkerContext) void {
|
||||||
|
while (true) {
|
||||||
|
const idx = wctx.next_index.increment();
|
||||||
|
if (idx >= wctx.symbols.len) break;
|
||||||
|
|
||||||
|
const sym = wctx.symbols[idx];
|
||||||
|
const success = wctx.svc.syncCandlesFromServer(sym);
|
||||||
|
wctx.results[idx].success = success;
|
||||||
|
_ = wctx.completed.increment();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Start threads
|
||||||
|
var spawned: usize = 0;
|
||||||
|
for (threads) |*t| {
|
||||||
|
t.* = std.Thread.spawn(.{}, worker.run, .{&ctx}) catch continue;
|
||||||
|
spawned += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Progress reporting while waiting
|
||||||
|
if (aggregate_progress) |p| {
|
||||||
|
while (completed.load() < symbols.len) {
|
||||||
|
std.Thread.sleep(50 * std.time.ns_per_ms);
|
||||||
|
p.emit(result.cached_count + completed.load(), total_count, .server_sync);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for all threads
|
||||||
|
for (threads[0..spawned]) |t| {
|
||||||
|
t.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process results
|
||||||
|
for (sync_results) |sr| {
|
||||||
|
if (sr.success) {
|
||||||
|
// Server sync succeeded — read from cache
|
||||||
|
if (self.getCachedLastClose(sr.symbol)) |close| {
|
||||||
|
result.prices.put(sr.symbol, close) catch {};
|
||||||
|
self.updateLatestDate(result, sr.symbol);
|
||||||
|
result.server_synced_count += 1;
|
||||||
|
} else {
|
||||||
|
// Sync said success but can't read cache — treat as failure
|
||||||
|
failures.append(self.allocator, sr.symbol) catch {};
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
failures.append(self.allocator, sr.symbol) catch {};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sequential provider fetch for symbols that failed server sync.
|
||||||
|
fn sequentialProviderFetch(
|
||||||
|
self: *DataService,
|
||||||
|
symbols: []const []const u8,
|
||||||
|
result: *LoadAllResult,
|
||||||
|
progress: ?ProgressCallback,
|
||||||
|
index_offset: usize,
|
||||||
|
) void {
|
||||||
|
const total = index_offset + symbols.len;
|
||||||
|
|
||||||
|
for (symbols, 0..) |sym, i| {
|
||||||
|
const display_idx = index_offset + i;
|
||||||
|
|
||||||
|
// Notify: about to fetch
|
||||||
|
if (progress) |p| p.emit(display_idx, total, sym, .fetching);
|
||||||
|
|
||||||
|
// Try provider fetch
|
||||||
|
if (self.getCandles(sym)) |candle_result| {
|
||||||
|
defer self.allocator.free(candle_result.data);
|
||||||
|
if (candle_result.data.len > 0) {
|
||||||
|
const last = candle_result.data[candle_result.data.len - 1];
|
||||||
|
result.prices.put(sym, last.close) catch {};
|
||||||
|
if (result.latest_date == null or last.date.days > result.latest_date.?.days) {
|
||||||
|
result.latest_date = last.date;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
result.provider_fetched_count += 1;
|
||||||
|
if (progress) |p| p.emit(display_idx, total, sym, .fetched);
|
||||||
|
continue;
|
||||||
|
} else |_| {}
|
||||||
|
|
||||||
|
// Provider failed — try stale cache
|
||||||
|
result.failed_count += 1;
|
||||||
|
if (self.getCachedLastClose(sym)) |close| {
|
||||||
|
result.prices.put(sym, close) catch {};
|
||||||
|
result.stale_count += 1;
|
||||||
|
if (progress) |p| p.emit(display_idx, total, sym, .failed_used_stale);
|
||||||
|
} else {
|
||||||
|
if (progress) |p| p.emit(display_idx, total, sym, .failed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Update latest_date in result from cached candle metadata.
|
||||||
|
fn updateLatestDate(self: *DataService, result: *LoadAllResult, symbol: []const u8) void {
|
||||||
|
var s = self.store();
|
||||||
|
if (s.readCandleMeta(symbol)) |cm| {
|
||||||
|
const d = cm.meta.last_date;
|
||||||
|
if (result.latest_date == null or d.days > result.latest_date.?.days) {
|
||||||
|
result.latest_date = d;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ── CUSIP Resolution ──────────────────────────────────────────
|
// ── CUSIP Resolution ──────────────────────────────────────────
|
||||||
|
|
||||||
/// Look up multiple CUSIPs in a single batch request via OpenFIGI.
|
/// Look up multiple CUSIPs in a single batch request via OpenFIGI.
|
||||||
|
|
|
||||||
35
src/tui.zig
35
src/tui.zig
|
|
@ -1781,32 +1781,15 @@ pub fn run(allocator: std.mem.Allocator, config: zfin.Config, args: []const []co
|
||||||
const total_count = stock_count + watch_syms.items.len;
|
const total_count = stock_count + watch_syms.items.len;
|
||||||
|
|
||||||
if (total_count > 0) {
|
if (total_count > 0) {
|
||||||
var prices = std.StringHashMap(f64).init(allocator);
|
// Use consolidated parallel loader
|
||||||
|
const load_result = cli.loadPortfolioPrices(
|
||||||
var progress = cli.LoadProgress{
|
svc,
|
||||||
.svc = svc,
|
syms,
|
||||||
.color = true,
|
watch_syms.items,
|
||||||
.index_offset = 0,
|
false, // force_refresh
|
||||||
.grand_total = total_count,
|
true, // color
|
||||||
};
|
);
|
||||||
|
app_inst.prefetched_prices = load_result.prices;
|
||||||
if (syms) |ss| {
|
|
||||||
const result = svc.loadPrices(ss, &prices, false, progress.callback());
|
|
||||||
progress.index_offset = stock_count;
|
|
||||||
|
|
||||||
if (result.fetched_count > 0 or result.fail_count > 0) {
|
|
||||||
var msg_buf: [256]u8 = undefined;
|
|
||||||
const msg = std.fmt.bufPrint(&msg_buf, "Loaded {d} symbols ({d} cached, {d} fetched, {d} failed)\n", .{ ss.len, result.cached_count, result.fetched_count, result.fail_count }) catch "Done loading\n";
|
|
||||||
cli.stderrPrint(msg) catch {};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Load watchlist prices
|
|
||||||
if (watch_syms.items.len > 0) {
|
|
||||||
_ = svc.loadPrices(watch_syms.items, &prices, false, progress.callback());
|
|
||||||
}
|
|
||||||
|
|
||||||
app_inst.prefetched_prices = prices;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue