convert service to async pattern

This commit is contained in:
Emil Lerch 2026-06-11 16:22:26 -07:00
parent 2245176cb0
commit b3645a7bd1
Signed by: lobo
GPG key ID: A7B62D657EF764F8
3 changed files with 36 additions and 194 deletions

View file

@ -30,7 +30,7 @@ repos:
# std.Io.Group async/await methods. `zig fmt` rewrites the
# `@"async"`/`@"await"` workaround back to the bare form,
# so we can't dodge it locally either.
exclude: ^src/(tui|PortfolioData)\.zig$
exclude: ^src/(tui|PortfolioData|service)\.zig$
- repo: https://github.com/batmac/pre-commit-zig
rev: v0.3.0
hooks:

138
TODO.md
View file

@ -5,144 +5,6 @@ ordered roughly by priority within each section. Priority labels
(`HIGH` / `MEDIUM` / `LOW`) mark items that deserve explicit
ranking; unlabeled items are "someday, if the mood strikes."
## Review tab: cursor + symbol selection + drill-down — priority MEDIUM
The review tab is currently a static table — you can sort it but
not select a row. Common workflow: see something interesting on
the review tab (e.g. NKE has the worst trailing returns), want to
jump straight into the per-symbol detail tabs (performance, quote,
options) for it. Today that takes either:
- typing `/<symbol>` to set the active symbol manually, or
- switching to portfolio tab, finding the row, pressing `s` /
space / left-click on it, then switching back.
Both are small but noticeable papercuts when scanning the review
table.
### What to add
Mirror the portfolio tab's pattern (`src/tui/portfolio_tab.zig`):
- `State.cursor: usize` — selected row index.
- `j`/`k` and arrow keys move the cursor (already routed by the
framework's `onCursorMove` hook; opt in by declaring it on the
tab).
- Mouse wheel scrolls (already handled by App when the tab opts
out via `handleMouse` returning false on wheel events).
- Mouse click on a data row sets the cursor to that row.
- Cursor row gets `selectStyle` highlight in the row's
`StyleSpan` set so it visibly stands out.
- Active-symbol indicator: rows whose `symbol` matches `app.symbol`
get an asterisk or similar marker (matches portfolio tab's
star convention).
- Press `s` (or space, or Enter) to set `app.symbol` to the
cursor row's symbol — same `select_symbol` action portfolio tab
binds. The framework validator already prevents tab-local
bindings from colliding with global keys, so reusing `s`/space
is fine because portfolio tab does it too.
### Drill-down navigation
A natural extension once selection works: a hotkey that both
selects the symbol AND switches to a per-symbol tab (performance
is the obvious target since that's the deep-dive surface). Maybe
`Enter` with a row selected → set symbol + jump to performance
tab. Compare: portfolio tab's Enter is "expand/collapse"; review
rows don't expand, so Enter is free.
### Tests
- Cursor moves on j/k/arrows, clamps at edges.
- Click on row N sets cursor to N.
- `select_symbol` action sets `app.symbol` and triggers a
`loadTabData()` if a downstream tab is active.
- Active-symbol asterisk renders for the row matching `app.symbol`.
## TUI: share candle/dividend maps across tabs — priority MEDIUM
`App.ensurePortfolioDataLoaded` builds a complete per-symbol
`candle_map` via `buildPortfolioData`, uses it once to compute
historical-snapshot values, then **frees it** at
`src/tui.zig:1404-1406`. Every per-position TUI tab that
subsequently needs candles (`review`, `performance`, parts of
`portfolio` and `quote`) re-reads them from the SRF cache via
`getCachedCandles` — ~27 redundant reads per tab activation on
a 27-symbol portfolio, each running its own SRF iterator pass.
Most visible in debug builds (~2s tab activation); release
mode is sub-second but still measurable on first switch.
### Fix sketch
Promote `candle_map` and add a `dividend_map` to fields on
`App.portfolio.PortfolioData`. Tabs read from
`app.portfolio.candle_map` / `app.portfolio.dividend_map`
instead of re-fetching. Lifetime tied to the existing
`summary` ownership: cleared atomically on portfolio reload,
freed once in `PortfolioData.deinit`.
The review tab's `State.dividend_map` field is removed —
that data lives on App now.
### Loading strategy options
**Eager.** Populate both maps inside
`ensurePortfolioDataLoaded` so they're ready when the first
per-symbol tab activates. Pros: tab switches are always
instant. Cons: pays the dividend-cache read cost (~27 SRF
reads) at TUI startup even if the user never opens a tab
that needs them.
**Lazy.** App exposes `ensureDividendMap()` (parallel to
`ensureAccountMap`); first tab to need dividends pays the
load. Pros: no startup cost for users who don't open
review/performance/etc. Cons: first review-tab activation
still slow.
**Async (recommended middle ground).** On TUI startup, spawn
a background task using Zig 0.16.0's `Io` async to populate
both maps. Tabs read through a synchronization wrapper —
something like:
```zig
pub const PortfolioCache = struct {
candle_map: ?std.StringHashMap([]const Candle) = null,
dividend_map: ?std.StringHashMap([]const Dividend) = null,
ready: std.Thread.Semaphore, // or io.Async equivalent
pub fn waitReady(self: *PortfolioCache) void { ... }
};
```
Tabs that need the maps call `waitReady()` (cheap when
already loaded; brief block on first call if the background
task hasn't finished). Uses Zig 0.16's `io` async layer so
we don't manage thread lifecycle directly. Pros: zero
perceived startup cost AND zero perceived tab-switch cost
for typical workflows. Cons: more design work; need to
handle the failure case (background task errored — fall
back to lazy load).
### Other call sites to audit
- `src/tui/portfolio_tab.zig:1756` — re-reads cached candles
to compute the latest-quote-date footer. Re-route through
the shared map.
- `src/tui/quote_tab.zig` and `src/tui/performance_tab.zig`
these are intentionally per-symbol-narrow (single ticker
detail views), so they probably stay on `getTrailingReturns`
but could short-circuit when the candles are already in
the shared map.
### Tests
- Lifetime test: portfolio reload clears both maps before
the next load assigns new ones; no use-after-free.
- Async path: failure in the background task surfaces as a
visible status message but doesn't break tab activation
(lazy fallback still works).
## Projections: future enhancements
- **Configurable return cap per position — priority MEDIUM.**

View file

@ -2032,8 +2032,6 @@ pub const DataService = struct {
/// Drives `--refresh-data=never`.
skip_network: bool = false,
color: bool = true,
/// Maximum concurrent server sync requests. 0 = auto (8).
max_concurrent: usize = 0,
/// Map this config to the per-call `FetchOptions` shape.
/// Convenience for paths that need to pass through to
@ -2210,7 +2208,6 @@ pub const DataService = struct {
needs_fetch.items,
&result,
&server_failures,
config,
aggregate_progress,
total_count,
);
@ -2242,27 +2239,32 @@ pub const DataService = struct {
return result;
}
/// Parallel server sync using thread pool.
/// Parallel server sync via `std.Io.Group`.
///
/// Concurrency shape: one task per symbol, spawned into a
/// single `Group`. The `std.Io` implementation owns
/// scheduling and concurrency limits (e.g. `Io.Threaded`
/// sizes its pool from CPU count); we don't second-guess it
/// with our own worker cap or work-stealing queue.
///
/// Each task hits `io.checkCancel()` before its sync, so a
/// cancelation request propagating through `Group.await`
/// stops pending work at task granularity.
fn parallelServerSync(
self: *DataService,
symbols: []const []const u8,
result: *LoadAllResult,
failures: *std.ArrayList([]const u8),
config: LoadAllConfig,
aggregate_progress: ?AggregateProgressCallback,
total_count: usize,
) void {
const max_threads = if (config.max_concurrent > 0) config.max_concurrent else 8;
const thread_count = @min(symbols.len, max_threads);
if (aggregate_progress) |p| p.emit(result.cached_count, total_count, .server_sync);
// Shared state for worker threads
// Shared state for tasks
var completed = AtomicCounter{};
var next_index = AtomicCounter{};
const sync_results = self.allocator.alloc(ServerSyncResult, symbols.len) catch {
// Allocation failed fall back to marking all as failures
for (symbols) |sym| failures.append(self.allocator, sym) catch |err| log.warn("parallelFetch slots-alloc-fallback failures append({s}): {t}", .{ sym, err });
for (symbols) |sym| failures.append(self.allocator, sym) catch |err| log.warn("parallelServerSync slots-alloc-fallback failures append({s}): {t}", .{ sym, err });
return;
};
defer self.allocator.free(sync_results);
@ -2272,62 +2274,40 @@ pub const DataService = struct {
sr.* = .{ .symbol = symbols[i], .success = false };
}
// Spawn worker threads
var threads = self.allocator.alloc(std.Thread, thread_count) catch {
for (symbols) |sym| failures.append(self.allocator, sym) catch |err| log.warn("parallelFetch threads-alloc-fallback failures append({s}): {t}", .{ sym, err });
return;
};
defer self.allocator.free(threads);
const WorkerContext = struct {
svc: *DataService,
symbols: []const []const u8,
results: []ServerSyncResult,
next_index: *AtomicCounter,
completed: *AtomicCounter,
};
var ctx = WorkerContext{
.svc = self,
.symbols = symbols,
.results = sync_results,
.next_index = &next_index,
.completed = &completed,
};
const worker = struct {
fn run(wctx: *WorkerContext) void {
while (true) {
const idx = wctx.next_index.increment();
if (idx >= wctx.symbols.len) break;
const sym = wctx.symbols[idx];
const success = wctx.svc.syncCandlesFromServer(sym);
wctx.results[idx].success = success;
_ = wctx.completed.increment();
}
fn run(io: std.Io, svc: *DataService, slot: *ServerSyncResult, done: *AtomicCounter) std.Io.Cancelable!void {
defer _ = done.increment();
try io.checkCancel();
slot.success = svc.syncCandlesFromServer(slot.symbol);
}
};
// Start threads
var spawned: usize = 0;
for (threads) |*t| {
t.* = std.Thread.spawn(.{}, worker.run, .{&ctx}) catch continue;
spawned += 1;
// Spawn one task per symbol. Group.async requires an
// eventual Group.await/cancel to release resources; the
// single await below covers all paths.
var group: std.Io.Group = .init;
for (sync_results) |*sr| {
group.async(self.io, worker.run, .{ self.io, self, sr, &completed });
}
// Progress reporting while waiting
// Progress reporting while the group runs
if (aggregate_progress) |p| {
while (completed.load() < symbols.len) {
std.Io.sleep(self.io, std.Io.Duration.fromMilliseconds(50), .awake) catch |err| log.debug("parallelFetch progress-poll sleep interrupted: {t}", .{err});
std.Io.sleep(self.io, std.Io.Duration.fromMilliseconds(50), .awake) catch |err| {
log.debug("parallelServerSync progress-poll sleep interrupted: {t}", .{err});
break;
};
p.emit(result.cached_count + completed.load(), total_count, .server_sync);
}
}
// Wait for all threads
for (threads[0..spawned]) |t| {
t.join();
}
// Wait for all tasks. On cancelation the unstarted tasks
// exit at their checkCancel point; partial results (slots
// that completed) are still processed below they came
// from successful cache writes.
group.await(self.io) catch |err| {
log.debug("parallelServerSync group await: {t}", .{err});
};
// Process results
for (sync_results) |sr| {