store.zig refactored and reviewed

This commit is contained in:
Emil Lerch 2026-03-09 16:58:12 -07:00
parent 189d09720b
commit e0129003e6
Signed by: lobo
GPG key ID: A7B62D657EF764F8
10 changed files with 600 additions and 681 deletions

View file

@ -13,8 +13,8 @@
.hash = "z2d-0.10.0-j5P_Hu-6FgBsZNgwphIqh17jDnj8_yPtD8yzjO6PpHRQ",
},
.srf = .{
.url = "git+https://git.lerch.org/lobo/srf.git#8e12b7396afc1bcbc4e2a3f19d8725a82b71b27e",
.hash = "srf-0.0.0-qZj573V9AQBJTR8ehcnA6KW_wb6cdkJZtFZGq87b8dAJ",
.url = "git+https://git.lerch.org/lobo/srf.git#353f8bca359d35872c1869dca906f34f9579d073",
.hash = "srf-0.0.0-qZj577GyAQBpIS3e1hiOb6Gi-4KUmFxaNsk3jzZMszoO",
},
},
.paths = .{

View file

@ -75,11 +75,11 @@ pub fn parseAccountsFile(allocator: std.mem.Allocator, data: []const u8) !Accoun
}
var reader = std.Io.Reader.fixed(data);
const parsed = srf.parse(&reader, allocator, .{ .alloc_strings = false }) catch return error.InvalidData;
defer parsed.deinit();
var it = srf.iterator(&reader, allocator, .{ .alloc_strings = false }) catch return error.InvalidData;
defer it.deinit();
for (parsed.records) |record| {
const entry = record.to(AccountTaxEntry) catch continue;
while (try it.next()) |fields| {
const entry = fields.to(AccountTaxEntry) catch continue;
try entries.append(allocator, .{
.account = try allocator.dupe(u8, entry.account),
.tax_type = entry.tax_type,

932
src/cache/store.zig vendored

File diff suppressed because it is too large Load diff

View file

@ -57,11 +57,11 @@ pub fn parseClassificationFile(allocator: std.mem.Allocator, data: []const u8) !
}
var reader = std.Io.Reader.fixed(data);
const parsed = srf.parse(&reader, allocator, .{ .alloc_strings = false }) catch return error.InvalidData;
defer parsed.deinit();
var it = srf.iterator(&reader, allocator, .{ .alloc_strings = false }) catch return error.InvalidData;
defer it.deinit();
for (parsed.records) |record| {
const entry = record.to(ClassificationEntry) catch continue;
while (try it.next()) |fields| {
const entry = fields.to(ClassificationEntry) catch continue;
try entries.append(allocator, .{
.symbol = try allocator.dupe(u8, entry.symbol),
.sector = if (entry.sector) |s| try allocator.dupe(u8, s) else null,

View file

@ -35,7 +35,7 @@ pub const LotType = enum {
/// Open lots have no close_date/close_price.
/// Closed lots have both.
pub const Lot = struct {
symbol: []const u8,
symbol: []const u8 = "",
shares: f64,
open_date: Date,
open_price: f64,

View file

@ -174,26 +174,26 @@ pub fn buildUrl(
base: []const u8,
params: []const [2][]const u8,
) ![]const u8 {
var buf: std.ArrayList(u8) = .empty;
errdefer buf.deinit(allocator);
var aw: std.Io.Writer.Allocating = .init(allocator);
errdefer aw.deinit();
try buf.appendSlice(allocator, base);
try aw.writer.writeAll(base);
for (params, 0..) |param, i| {
try buf.append(allocator, if (i == 0) '?' else '&');
try buf.appendSlice(allocator, param[0]);
try buf.append(allocator, '=');
try aw.writer.writeByte(if (i == 0) '?' else '&');
try aw.writer.writeAll(param[0]);
try aw.writer.writeByte('=');
for (param[1]) |c| {
switch (c) {
' ' => try buf.appendSlice(allocator, "%20"),
'&' => try buf.appendSlice(allocator, "%26"),
'=' => try buf.appendSlice(allocator, "%3D"),
'+' => try buf.appendSlice(allocator, "%2B"),
else => try buf.append(allocator, c),
' ' => try aw.writer.writeAll("%20"),
'&' => try aw.writer.writeAll("%26"),
'=' => try aw.writer.writeAll("%3D"),
'+' => try aw.writer.writeAll("%2B"),
else => try aw.writer.writeByte(c),
}
}
}
return buf.toOwnedSlice(allocator);
return aw.toOwnedSlice();
}
test "buildUrl" {

View file

@ -55,15 +55,15 @@ pub const Cboe = struct {
};
fn buildCboeUrl(allocator: std.mem.Allocator, symbol: []const u8) ![]const u8 {
var buf: std.ArrayList(u8) = .empty;
errdefer buf.deinit(allocator);
var aw: std.Io.Writer.Allocating = .init(allocator);
errdefer aw.deinit();
try buf.appendSlice(allocator, base_url);
try buf.append(allocator, '/');
try buf.appendSlice(allocator, symbol);
try buf.appendSlice(allocator, ".json");
try aw.writer.writeAll(base_url);
try aw.writer.writeByte('/');
try aw.writer.writeAll(symbol);
try aw.writer.writeAll(".json");
return buf.toOwnedSlice(allocator);
return aw.toOwnedSlice();
}
/// Parse a CBOE options response into grouped OptionsChain slices.

View file

@ -47,6 +47,28 @@ pub const Source = enum {
fetched,
};
// PostProcess callbacks
// These are passed to Store.read to handle type-specific
// concerns: string duping (serialization plumbing) and domain transforms.
/// Dupe the currency string so it outlives the SRF iterator's backing buffer.
fn dividendPostProcess(div: *Dividend, allocator: std.mem.Allocator) anyerror!void {
if (div.currency) |c| {
div.currency = try allocator.dupe(u8, c);
}
}
/// Recompute surprise/surprise_percent from actual and estimate fields.
/// SRF only stores actual and estimate; surprise is derived.
fn earningsPostProcess(ev: *EarningsEvent, _: std.mem.Allocator) anyerror!void {
if (ev.actual != null and ev.estimate != null) {
ev.surprise = ev.actual.? - ev.estimate.?;
if (ev.estimate.? != 0) {
ev.surprise_percent = (ev.surprise.? / @abs(ev.estimate.?)) * 100.0;
}
}
}
pub const DataService = struct {
allocator: std.mem.Allocator,
config: Config,
@ -137,12 +159,13 @@ pub const DataService = struct {
const today = todayDate();
// Check candle metadata for freshness (tiny file, no candle deserialization)
const meta = s.readCandleMeta(symbol);
if (meta) |m| {
const meta_result = s.readCandleMeta(symbol);
if (meta_result) |mr| {
const m = mr.meta;
if (s.isCandleMetaFresh(symbol)) {
// Fresh deserialize candles and return
const candles = self.loadCandleFile(&s, symbol);
if (candles) |c| return .{ .data = c, .source = .cached, .timestamp = m.fetched_at };
if (s.read(Candle, symbol, null, .any)) |r|
return .{ .data = r.data, .source = .cached, .timestamp = mr.created };
}
// Stale try incremental update using last_date from meta
@ -150,38 +173,37 @@ pub const DataService = struct {
// If last cached date is today or later, just refresh the TTL (meta only)
if (!fetch_from.lessThan(today)) {
self.updateCandleMeta(&s, symbol, m.last_close, m.last_date);
const candles = self.loadCandleFile(&s, symbol);
if (candles) |c| return .{ .data = c, .source = .cached, .timestamp = std.time.timestamp() };
s.updateCandleMeta(symbol, m.last_close, m.last_date);
if (s.read(Candle, symbol, null, .any)) |r|
return .{ .data = r.data, .source = .cached, .timestamp = std.time.timestamp() };
} else {
// Incremental fetch from day after last cached candle
var td = self.getTwelveData() catch {
// No API key return stale data
const candles = self.loadCandleFile(&s, symbol);
if (candles) |c| return .{ .data = c, .source = .cached, .timestamp = m.fetched_at };
if (s.read(Candle, symbol, null, .any)) |r|
return .{ .data = r.data, .source = .cached, .timestamp = mr.created };
return DataError.NoApiKey;
};
const new_candles = td.fetchCandles(self.allocator, symbol, fetch_from, today) catch {
// Fetch failed return stale data rather than erroring
const candles = self.loadCandleFile(&s, symbol);
if (candles) |c| return .{ .data = c, .source = .cached, .timestamp = m.fetched_at };
if (s.read(Candle, symbol, null, .any)) |r|
return .{ .data = r.data, .source = .cached, .timestamp = mr.created };
return DataError.FetchFailed;
};
if (new_candles.len == 0) {
// No new candles (weekend/holiday) refresh TTL only (meta rewrite)
self.allocator.free(new_candles);
self.updateCandleMeta(&s, symbol, m.last_close, m.last_date);
const candles = self.loadCandleFile(&s, symbol);
if (candles) |c| return .{ .data = c, .source = .cached, .timestamp = std.time.timestamp() };
s.updateCandleMeta(symbol, m.last_close, m.last_date);
if (s.read(Candle, symbol, null, .any)) |r|
return .{ .data = r.data, .source = .cached, .timestamp = std.time.timestamp() };
} else {
// Append new candles to existing file + update meta
self.appendCandles(&s, symbol, new_candles);
s.appendCandles(symbol, new_candles);
// Load the full (now-updated) file for the caller
const candles = self.loadCandleFile(&s, symbol);
if (candles) |c| {
if (s.read(Candle, symbol, null, .any)) |r| {
self.allocator.free(new_candles);
return .{ .data = c, .source = .fetched, .timestamp = std.time.timestamp() };
return .{ .data = r.data, .source = .fetched, .timestamp = std.time.timestamp() };
}
// Append failed or file unreadable just return new candles
return .{ .data = new_candles, .source = .fetched, .timestamp = std.time.timestamp() };
@ -198,91 +220,19 @@ pub const DataService = struct {
};
if (fetched.len > 0) {
self.cacheCandles(&s, symbol, fetched);
s.cacheCandles(symbol, fetched);
}
return .{ .data = fetched, .source = .fetched, .timestamp = std.time.timestamp() };
}
/// Load candle data from the cache file. Returns null if unavailable.
fn loadCandleFile(self: *DataService, s: *cache.Store, symbol: []const u8) ?[]Candle {
const raw = s.readRaw(symbol, .candles_daily) catch return null;
const data = raw orelse return null;
defer self.allocator.free(data);
return cache.Store.deserializeCandles(self.allocator, data) catch null;
}
fn cacheCandles(self: *DataService, s: *cache.Store, symbol: []const u8, candles: []const Candle) void {
// Write candle data (no expiry -- historical facts don't expire)
if (cache.Store.serializeCandles(self.allocator, candles, .{})) |srf_data| {
defer self.allocator.free(srf_data);
s.writeRaw(symbol, .candles_daily, srf_data) catch {};
} else |_| {}
// Write candle metadata (with expiry for freshness checks)
if (candles.len > 0) {
const last = candles[candles.len - 1];
self.updateCandleMeta(s, symbol, last.close, last.date);
}
}
/// Append new candle records to the existing candle file and update metadata.
/// Falls back to a full rewrite if append fails (e.g. file doesn't exist).
fn appendCandles(self: *DataService, s: *cache.Store, symbol: []const u8, new_candles: []const Candle) void {
if (new_candles.len == 0) return;
// Serialize just the new records with no SRF header
if (cache.Store.serializeCandles(self.allocator, new_candles, .{ .emit_directives = false })) |srf_data| {
defer self.allocator.free(srf_data);
s.appendRaw(symbol, .candles_daily, srf_data) catch {
// Append failed (file missing?) fall back to full load + rewrite
if (self.loadCandleFile(s, symbol)) |existing| {
defer self.allocator.free(existing);
// Merge and do full write
const merged = self.allocator.alloc(Candle, existing.len + new_candles.len) catch return;
defer self.allocator.free(merged);
@memcpy(merged[0..existing.len], existing);
@memcpy(merged[existing.len..], new_candles);
if (cache.Store.serializeCandles(self.allocator, merged, .{})) |full_data| {
defer self.allocator.free(full_data);
s.writeRaw(symbol, .candles_daily, full_data) catch {};
} else |_| {}
}
};
} else |_| {}
// Update metadata to reflect the new last candle
const last = new_candles[new_candles.len - 1];
self.updateCandleMeta(s, symbol, last.close, last.date);
}
/// Write (or refresh) candle metadata without touching the candle data file.
fn updateCandleMeta(self: *DataService, s: *cache.Store, symbol: []const u8, last_close: f64, last_date: Date) void {
const expires = std.time.timestamp() + cache.Ttl.candles_latest;
const meta = cache.Store.CandleMeta{
.last_close = last_close,
.last_date = last_date,
.fetched_at = std.time.timestamp(),
};
if (cache.Store.serializeCandleMeta(self.allocator, meta, .{ .expires = expires })) |meta_data| {
defer self.allocator.free(meta_data);
s.writeRaw(symbol, .candles_meta, meta_data) catch {};
} else |_| {}
}
/// Fetch dividend history for a symbol.
/// Checks cache first; fetches from Polygon if stale/missing.
pub fn getDividends(self: *DataService, symbol: []const u8) DataError!struct { data: []Dividend, source: Source, timestamp: i64 } {
var s = self.store();
const cached_raw = s.readRaw(symbol, .dividends) catch return DataError.CacheError;
if (cached_raw) |data| {
defer self.allocator.free(data);
if (cache.Store.isFreshData(data, self.allocator)) {
const divs = cache.Store.deserializeDividends(self.allocator, data) catch null;
if (divs) |d| return .{ .data = d, .source = .cached, .timestamp = cache.Store.readFetchedAt(self.allocator, data) orelse std.time.timestamp() };
}
}
if (s.read(Dividend, symbol, dividendPostProcess, .fresh_only)) |cached|
return .{ .data = cached.data, .source = .cached, .timestamp = cached.timestamp };
var pg = try self.getPolygon();
const fetched = pg.fetchDividends(self.allocator, symbol, null, null) catch {
@ -290,11 +240,7 @@ pub const DataService = struct {
};
if (fetched.len > 0) {
const expires = std.time.timestamp() + cache.Ttl.dividends;
if (cache.Store.serializeDividends(self.allocator, fetched, .{ .expires = expires })) |srf_data| {
defer self.allocator.free(srf_data);
s.writeRaw(symbol, .dividends, srf_data) catch {};
} else |_| {}
s.write(Dividend, symbol, fetched, cache.Ttl.dividends);
}
return .{ .data = fetched, .source = .fetched, .timestamp = std.time.timestamp() };
@ -305,24 +251,15 @@ pub const DataService = struct {
pub fn getSplits(self: *DataService, symbol: []const u8) DataError!struct { data: []Split, source: Source, timestamp: i64 } {
var s = self.store();
const cached_raw = s.readRaw(symbol, .splits) catch return DataError.CacheError;
if (cached_raw) |data| {
defer self.allocator.free(data);
if (cache.Store.isFreshData(data, self.allocator)) {
const splits = cache.Store.deserializeSplits(self.allocator, data) catch null;
if (splits) |sp| return .{ .data = sp, .source = .cached, .timestamp = cache.Store.readFetchedAt(self.allocator, data) orelse std.time.timestamp() };
}
}
if (s.read(Split, symbol, null, .fresh_only)) |cached|
return .{ .data = cached.data, .source = .cached, .timestamp = cached.timestamp };
var pg = try self.getPolygon();
const fetched = pg.fetchSplits(self.allocator, symbol) catch {
return DataError.FetchFailed;
};
if (cache.Store.serializeSplits(self.allocator, fetched, .{ .expires = std.time.timestamp() + cache.Ttl.splits })) |srf_data| {
defer self.allocator.free(srf_data);
s.writeRaw(symbol, .splits, srf_data) catch {};
} else |_| {}
s.write(Split, symbol, fetched, cache.Ttl.splits);
return .{ .data = fetched, .source = .fetched, .timestamp = std.time.timestamp() };
}
@ -332,14 +269,8 @@ pub const DataService = struct {
pub fn getOptions(self: *DataService, symbol: []const u8) DataError!struct { data: []OptionsChain, source: Source, timestamp: i64 } {
var s = self.store();
const cached_raw = s.readRaw(symbol, .options) catch return DataError.CacheError;
if (cached_raw) |data| {
defer self.allocator.free(data);
if (cache.Store.isFreshData(data, self.allocator)) {
const chains = cache.Store.deserializeOptions(self.allocator, data) catch null;
if (chains) |c| return .{ .data = c, .source = .cached, .timestamp = cache.Store.readFetchedAt(self.allocator, data) orelse std.time.timestamp() };
}
}
if (s.read(OptionsChain, symbol, null, .fresh_only)) |cached|
return .{ .data = cached.data, .source = .cached, .timestamp = cached.timestamp };
var cboe = self.getCboe();
const fetched = cboe.fetchOptionsChain(self.allocator, symbol) catch {
@ -347,11 +278,7 @@ pub const DataService = struct {
};
if (fetched.len > 0) {
const expires = std.time.timestamp() + cache.Ttl.options;
if (cache.Store.serializeOptions(self.allocator, fetched, .{ .expires = expires })) |srf_data| {
defer self.allocator.free(srf_data);
s.writeRaw(symbol, .options, srf_data) catch {};
} else |_| {}
s.write(OptionsChain, symbol, fetched, cache.Ttl.options);
}
return .{ .data = fetched, .source = .fetched, .timestamp = std.time.timestamp() };
@ -365,25 +292,18 @@ pub const DataService = struct {
var s = self.store();
const today = todayDate();
const cached_raw = s.readRaw(symbol, .earnings) catch return DataError.CacheError;
if (cached_raw) |data| {
defer self.allocator.free(data);
if (cache.Store.isFreshData(data, self.allocator)) {
const events = cache.Store.deserializeEarnings(self.allocator, data) catch null;
if (events) |e| {
// Check if any past/today earnings event is still missing actual results.
// If so, the announcement likely just happened force a refresh.
const needs_refresh = for (e) |ev| {
if (ev.actual == null and !today.lessThan(ev.date)) break true;
} else false;
if (s.read(EarningsEvent, symbol, earningsPostProcess, .fresh_only)) |cached| {
// Check if any past/today earnings event is still missing actual results.
// If so, the announcement likely just happened force a refresh.
const needs_refresh = for (cached.data) |ev| {
if (ev.actual == null and !today.lessThan(ev.date)) break true;
} else false;
if (!needs_refresh) {
return .{ .data = e, .source = .cached, .timestamp = cache.Store.readFetchedAt(self.allocator, data) orelse std.time.timestamp() };
}
// Stale: free cached events and re-fetch below
self.allocator.free(e);
}
if (!needs_refresh) {
return .{ .data = cached.data, .source = .cached, .timestamp = cached.timestamp };
}
// Stale: free cached events and re-fetch below
self.allocator.free(cached.data);
}
var fh = try self.getFinnhub();
@ -395,11 +315,7 @@ pub const DataService = struct {
};
if (fetched.len > 0) {
const expires = std.time.timestamp() + cache.Ttl.earnings;
if (cache.Store.serializeEarnings(self.allocator, fetched, .{ .expires = expires })) |srf_data| {
defer self.allocator.free(srf_data);
s.writeRaw(symbol, .earnings, srf_data) catch {};
} else |_| {}
s.write(EarningsEvent, symbol, fetched, cache.Ttl.earnings);
}
return .{ .data = fetched, .source = .fetched, .timestamp = std.time.timestamp() };
@ -410,24 +326,15 @@ pub const DataService = struct {
pub fn getEtfProfile(self: *DataService, symbol: []const u8) DataError!struct { data: EtfProfile, source: Source, timestamp: i64 } {
var s = self.store();
const cached_raw = s.readRaw(symbol, .etf_profile) catch return DataError.CacheError;
if (cached_raw) |data| {
defer self.allocator.free(data);
if (cache.Store.isFreshData(data, self.allocator)) {
const profile = cache.Store.deserializeEtfProfile(self.allocator, data) catch null;
if (profile) |p| return .{ .data = p, .source = .cached, .timestamp = cache.Store.readFetchedAt(self.allocator, data) orelse std.time.timestamp() };
}
}
if (s.read(EtfProfile, symbol, null, .fresh_only)) |cached|
return .{ .data = cached.data, .source = .cached, .timestamp = cached.timestamp };
var av = try self.getAlphaVantage();
const fetched = av.fetchEtfProfile(self.allocator, symbol) catch {
return DataError.FetchFailed;
};
if (cache.Store.serializeEtfProfile(self.allocator, fetched, .{ .expires = std.time.timestamp() + cache.Ttl.etf_profile })) |srf_data| {
defer self.allocator.free(srf_data);
s.writeRaw(symbol, .etf_profile, srf_data) catch {};
} else |_| {}
s.write(EtfProfile, symbol, fetched, cache.Ttl.etf_profile);
return .{ .data = fetched, .source = .fetched, .timestamp = std.time.timestamp() };
}
@ -524,45 +431,29 @@ pub const DataService = struct {
pub fn getCachedCandles(self: *DataService, symbol: []const u8) ?[]Candle {
var s = self.store();
if (s.isNegative(symbol, .candles_daily)) return null;
const data = s.readRaw(symbol, .candles_daily) catch return null;
if (data) |d| {
defer self.allocator.free(d);
return cache.Store.deserializeCandles(self.allocator, d) catch null;
}
return null;
const result = s.read(Candle, symbol, null, .any) orelse return null;
return result.data;
}
/// Read dividends from cache only (no network fetch).
pub fn getCachedDividends(self: *DataService, symbol: []const u8) ?[]Dividend {
var s = self.store();
const data = s.readRaw(symbol, .dividends) catch return null;
if (data) |d| {
defer self.allocator.free(d);
return cache.Store.deserializeDividends(self.allocator, d) catch null;
}
return null;
const result = s.read(Dividend, symbol, dividendPostProcess, .any) orelse return null;
return result.data;
}
/// Read earnings from cache only (no network fetch).
pub fn getCachedEarnings(self: *DataService, symbol: []const u8) ?[]EarningsEvent {
var s = self.store();
const data = s.readRaw(symbol, .earnings) catch return null;
if (data) |d| {
defer self.allocator.free(d);
return cache.Store.deserializeEarnings(self.allocator, d) catch null;
}
return null;
const result = s.read(EarningsEvent, symbol, earningsPostProcess, .any) orelse return null;
return result.data;
}
/// Read options from cache only (no network fetch).
pub fn getCachedOptions(self: *DataService, symbol: []const u8) ?[]OptionsChain {
var s = self.store();
const data = s.readRaw(symbol, .options) catch return null;
if (data) |d| {
defer self.allocator.free(d);
return cache.Store.deserializeOptions(self.allocator, d) catch null;
}
return null;
const result = s.read(OptionsChain, symbol, null, .any) orelse return null;
return result.data;
}
// Portfolio price loading

View file

@ -316,30 +316,26 @@ pub fn loadFromData(allocator: std.mem.Allocator, data: []const u8) ?KeyMap {
const aa = arena.allocator();
var reader = std.Io.Reader.fixed(data);
const parsed = srf.parse(&reader, aa, .{}) catch return null;
// Don't defer parsed.deinit() -- arena owns everything
var it = srf.iterator(&reader, aa, .{}) catch return null;
// Don't defer it.deinit() -- arena owns everything
var bindings = std.ArrayList(Binding).empty;
for (parsed.records) |record| {
while (it.next() catch return null) |fields| {
var action: ?Action = null;
var key: ?KeyCombo = null;
for (record.fields) |field| {
while (fields.next() catch return null) |field| {
if (std.mem.eql(u8, field.key, "action")) {
if (field.value) |v| {
switch (v) {
.string => |s| action = parseAction(s),
else => {},
}
}
if (field.value) |v| switch (v) {
.string => |s| action = parseAction(s),
else => {},
};
} else if (std.mem.eql(u8, field.key, "key")) {
if (field.value) |v| {
switch (v) {
.string => |s| key = parseKeyCombo(s),
else => {},
}
}
if (field.value) |v| switch (v) {
.string => |s| key = parseKeyCombo(s),
else => {},
};
}
}

View file

@ -251,13 +251,13 @@ pub fn loadFromData(data: []const u8) ?Theme {
const alloc = fba.allocator();
var reader = std.Io.Reader.fixed(data);
const parsed = srf.parse(&reader, alloc, .{ .alloc_strings = false }) catch return null;
_ = &parsed; // don't deinit, fba owns everything
var it = srf.iterator(&reader, alloc, .{ .alloc_strings = false }) catch return null;
// Don't deinit -- fba owns everything
var theme = default_theme;
for (parsed.records) |record| {
for (record.fields) |field| {
while (it.next() catch return null) |fields| {
while (fields.next() catch return null) |field| {
if (field.value) |v| {
const str = switch (v) {
.string => |s| s,