zfin/src/cache/store.zig

3652 lines
164 KiB
Zig
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

const std = @import("std");
const log = std.log.scoped(.cache);
const srf = @import("srf");
const atomic = @import("../atomic.zig");
const version = @import("../version.zig");
const Date = @import("../Date.zig");
const Candle = @import("../models/candle.zig").Candle;
const Dividend = @import("../models/dividend.zig").Dividend;
const DividendType = @import("../models/dividend.zig").DividendType;
const Split = @import("../models/split.zig").Split;
const EarningsEvent = @import("../models/earnings.zig").EarningsEvent;
const Wikidata = @import("../providers/Wikidata.zig");
const Edgar = @import("../providers/Edgar.zig");
// ── Wall-clock policy ────────────────────────────────────────
//
// Every `std.Io.Timestamp.now(...)` call in this file is intentional:
// the cache layer's job is to record *when data landed on disk* and
// compute expiry relative to that. Threading a `now_s: i64` in from the
// caller wouldn't buy anything - we'd just push the clock read up one
// frame. The torn-SRF diagnostic filenames (line ~473) additionally
// require millisecond precision to avoid collisions, which a
// caller-provided second-resolution `now_s` couldn't give us.
const Lot = @import("../models/portfolio.zig").Lot;
const LotType = @import("../models/portfolio.zig").LotType;
const Portfolio = @import("../models/portfolio.zig").Portfolio;
const OptionsChain = @import("../models/option.zig").OptionsChain;
const OptionContract = @import("../models/option.zig").OptionContract;
/// TTL durations in seconds for cache expiry.
pub const Ttl = struct {
const s_per_day = std.time.s_per_day;
/// Historical candles older than 1 day never expire
pub const candles_historical: i64 = -1; // infinite
/// Latest day's candle refreshes every 23h45m (15-min buffer for cron jitter)
pub const candles_latest: i64 = s_per_day - 15 * std.time.s_per_min;
/// Dividend data refreshes biweekly
pub const dividends: i64 = 14 * s_per_day;
/// Split data refreshes biweekly
pub const splits: i64 = 14 * s_per_day;
/// Options chains refresh hourly
pub const options: i64 = std.time.s_per_hour;
/// Earnings refresh monthly, with smart refresh after announcements
pub const earnings: i64 = 30 * s_per_day;
/// Per-symbol classification record (sector / industry / country /
/// inception_date / CIK) sourced from Wikidata. The data changes
/// rarely enough that 90 days is comfortable; jitter is layered
/// on at the call site to avoid thundering-herd refresh.
pub const classification: i64 = 90 * s_per_day;
/// Per-symbol ETF metrics (NPORT-P profile + sector breakdown +
/// top holdings). Updated when the fund files its next NPORT-P,
/// which is quarterly. 90-day TTL with call-site jitter strikes
/// the right balance between staleness and refresh load.
pub const etf_metrics: i64 = 90 * s_per_day;
/// Per-CIK entity facts from EDGAR XBRL (initially shares
/// outstanding; future variants for revenue, net income, etc).
/// Refreshes on quarterly filing cadence; 30-day TTL gives a
/// fortnightly margin around each fiscal-quarter boundary.
pub const entity_facts: i64 = 30 * s_per_day;
/// EDGAR ticker-map indexes (`company_tickers.json` and the MF
/// equivalent). SEC updates these daily upstream, but the
/// ticker->CIK mapping is extremely stable (changes are rare
/// rename events). 30-day TTL with jitter keeps the load
/// reasonable while still picking up new listings within a
/// month.
pub const tickers_funds: i64 = 30 * s_per_day;
pub const tickers_companies: i64 = 30 * s_per_day;
};
/// Cache TTL specification with optional per-key expiration jitter.
///
/// Existing call sites that just want a fixed expiration use
/// `.{ .seconds = Ttl.X }` and get the same exact-TTL behavior they
/// always had. Call sites that want thundering-herd defense set
/// `.jitter_pct = N` to spread expirations within ±N% of the base,
/// keyed deterministically by the cache entry's key. The same key
/// produces the same expiration across repeated writes - useful for
/// keeping expected-vs-actual debugging tractable.
///
/// Policy decision (which `jitter_pct` is right for which data type)
/// lives at the call site, not in this struct. The cache only
/// mechanises "compute final expiration from base + spread."
pub const TtlSpec = struct {
/// Base TTL in seconds. Sentinel `-1` means "never expires" and
/// bypasses jitter entirely.
seconds: i64,
/// Optional thundering-herd defense. When non-zero, the actual
/// expiration is offset by a per-key-deterministic amount within
/// ±(seconds * jitter_pct / 100). Default 0 = no spread, exact
/// `seconds` TTL applied.
///
/// Typical values: 8 for 30/90-day TTLs, 0 for short TTLs where
/// the absolute spread would be larger than meaningful refresh
/// drift.
jitter_pct: u8 = 0,
};
/// Compute the absolute expiration timestamp for a cache write.
///
/// The cache layer's freshness check compares `now` against this
/// expiration. Returning a precise number gives the exact moment a
/// cache entry transitions from fresh to stale.
///
/// Behavior by `spec` shape:
///
/// 1. Negative-sentinel TTL (`spec.seconds < 0`): caller is asking
/// for "never expires" (e.g. `Ttl.candles_historical = -1`).
/// Pass the sentinel through unchanged so freshness checks
/// treat it as effectively-infinite. Jitter does not apply -
/// there's no meaningful expiration to spread.
///
/// 2. No jitter (`spec.jitter_pct == 0`): exact `now + seconds`.
/// Identical to the bare-TTL behavior all callers had before
/// `TtlSpec` existed. This is the default.
///
/// 3. Jittered (`spec.jitter_pct > 0`): the actual expiration is
/// offset from the base by a per-key-deterministic amount within
/// ±(seconds * jitter_pct / 100). Two distinct keys typically
/// get distinct offsets; the same key always gets the same
/// offset. The hash function is `std.hash.Wyhash` keyed on
/// `key` only - no wall-clock or RNG state - so the result is
/// reproducible across processes and across rewrites.
///
/// Why hash-based and not `std.Random`: the property we want is
/// "spread across the population of cache keys," not "unpredictable
/// per write." Hash-by-key gives that exactly: distinct keys map
/// to distinct offsets without coordination. A PRNG would also work
/// in expectation, but at the cost of (a) drifting the expiration
/// each time the same key is rewritten and (b) introducing seed-
/// management to keep tests reproducible. Determinism by key keeps
/// debugging tractable: see an unexpected expiration in a cache
/// file -> recompute it to confirm.
pub fn computeExpires(now_s: i64, spec: TtlSpec, key: []const u8) i64 {
// Case 1: never-expires sentinel passes through unchanged.
if (spec.seconds < 0) return now_s + spec.seconds;
// Case 2: no jitter requested - exact base TTL.
if (spec.jitter_pct == 0) return now_s + spec.seconds;
// Case 3: jitter requested. Compute the maximum offset on either
// side of the base. If percent-of-base rounds to zero, fall back
// to exact (avoids modulo-by-zero and any pretense of spread).
const max_offset = @divFloor(spec.seconds * @as(i64, spec.jitter_pct), 100);
if (max_offset <= 0) return now_s + spec.seconds;
// Map the key's Wyhash into the inclusive range [-max_offset,
// +max_offset]. Modulo by 2*max_offset+1 (the count of integers
// in that range) gives a uniform-ish distribution; subtract
// max_offset to recenter.
const range_size: u64 = @intCast(2 * max_offset + 1);
const hash = std.hash.Wyhash.hash(0, key);
const positive_offset: i64 = @intCast(hash % range_size);
const signed_offset = positive_offset - max_offset;
return now_s + spec.seconds + signed_offset;
}
pub const DataType = enum {
candles_daily,
candles_meta,
dividends,
splits,
options,
earnings,
meta,
/// Per-symbol classification record sourced from Wikidata.
/// Stored at `<cache_dir>/<symbol>/classification.srf`.
classification,
/// Per-symbol NPORT-P-derived ETF metrics (tagged union of
/// profile + sector + holding rows). Stored at
/// `<cache_dir>/<symbol>/etf_metrics.srf`.
etf_metrics,
/// Per-CIK XBRL-derived entity facts (tagged union; initially
/// just shares-outstanding). Stored at
/// `<cache_dir>/<cik>/entity_facts.srf` - note CIK-keyed, not
/// symbol-keyed, so a single dual-class issuer (BRK.A / BRK.B)
/// has one shared facts file.
entity_facts,
/// EDGAR's `company_tickers_mf.json` index, cached at
/// `<cache_dir>/_edgar/tickers_funds.srf` as a slice of
/// `MutualFundTickerEntry` records under a synthetic `_edgar`
/// key. Updated daily upstream; refreshes monthly with jitter.
tickers_funds,
/// EDGAR's `company_tickers.json` index, cached at
/// `<cache_dir>/_edgar/tickers_companies.srf`. Same shape as
/// `tickers_funds`.
tickers_companies,
pub fn fileName(self: DataType) []const u8 {
return switch (self) {
.candles_daily => "candles_daily.srf",
.candles_meta => "candles_meta.srf",
.dividends => "dividends.srf",
.splits => "splits.srf",
.options => "options.srf",
.earnings => "earnings.srf",
.meta => "meta.srf",
.classification => "classification.srf",
.etf_metrics => "etf_metrics.srf",
.entity_facts => "entity_facts.srf",
.tickers_funds => "tickers_funds.srf",
.tickers_companies => "tickers_companies.srf",
};
}
/// TTL specification for this data type, including any per-key
/// jitter policy. The cache layer owns the policy because it
/// owns both the constants (`Ttl.X`) and the mechanism that
/// applies them (`computeExpires`); call sites just delegate.
///
/// Jitter assignments:
///
/// - 11% on dividends/splits (14d base, ~3d total spread).
/// Tuned so a daily cron sees roughly 1/3 of a portfolio's
/// symbols expire each day instead of all in lockstep.
///
/// - 8% on the longer-TTL types (classification 90d,
/// etf_metrics 90d, entity_facts 30d, ticker maps 30d).
/// Same thundering-herd defense; smaller percentage
/// because the absolute spread on a 30d/90d base is
/// already large in days.
///
/// - 0% on the rest. options/earnings either
/// have natural cadence spread or are short-TTL enough
/// that jitter would exceed meaningful drift.
pub fn ttl(self: DataType) TtlSpec {
return switch (self) {
.dividends => .{ .seconds = Ttl.dividends, .jitter_pct = 11 },
.splits => .{ .seconds = Ttl.splits, .jitter_pct = 11 },
.options => .{ .seconds = Ttl.options },
.earnings => .{ .seconds = Ttl.earnings },
.classification => .{ .seconds = Ttl.classification, .jitter_pct = 8 },
.etf_metrics => .{ .seconds = Ttl.etf_metrics, .jitter_pct = 8 },
.entity_facts => .{ .seconds = Ttl.entity_facts, .jitter_pct = 8 },
.tickers_funds => .{ .seconds = Ttl.tickers_funds, .jitter_pct = 8 },
.tickers_companies => .{ .seconds = Ttl.tickers_companies, .jitter_pct = 8 },
// Sentinel: these types have their own writers
// (`cacheCandles` for the candle pair, `writeNegative`
// for `meta`) that don't go through the generic
// `write()` / `writeWithSource()` path. Calling
// `.ttl()` on one of them is a misuse - replace this
// `unreachable` with `@compileError` once the call
// graph is locked down enough to enforce at comptime.
.candles_daily, .candles_meta, .meta => unreachable,
};
}
};
/// Persistent SRF-backed cache with per-symbol, per-data-type files.
///
/// Layout:
/// {cache_dir}/{SYMBOL}/candles_daily.srf
/// {cache_dir}/{SYMBOL}/dividends.srf
/// {cache_dir}/{SYMBOL}/meta.srf
/// ...
pub const Store = struct {
cache_dir: []const u8,
allocator: std.mem.Allocator,
io: std.Io,
/// Optional post-processing callback applied to each record during deserialization.
/// Used to dupe strings that outlive the SRF iterator, or apply domain-specific transforms.
pub const PostProcessFn = fn (*anyopaque, std.mem.Allocator) anyerror!void;
pub fn init(io: std.Io, allocator: std.mem.Allocator, cache_dir: []const u8) Store {
return .{
.io = io,
.cache_dir = cache_dir,
.allocator = allocator,
};
}
/// Aggregate on-disk cache statistics.
pub const DiskStats = struct { symbols: usize = 0, files: usize = 0, bytes: u64 = 0 };
/// Walk the cache directory read-only and tally symbol
/// subdirectories, total data files, and total bytes. Top-level
/// files (e.g. `cusip_tickers.srf`) count toward `files`/`bytes`
/// but not `symbols`. Returns all-zero (not an error) when the
/// cache directory does not exist yet. No mutation, no fetches.
pub fn diskStats(self: *Store) DiskStats {
const io = self.io;
var stats: DiskStats = .{};
var dir = std.Io.Dir.cwd().openDir(io, self.cache_dir, .{ .iterate = true }) catch return stats;
defer dir.close(io);
var iter = dir.iterate();
while (iter.next(io) catch null) |entry| {
switch (entry.kind) {
.file => {
const path = std.fs.path.join(self.allocator, &.{ self.cache_dir, entry.name }) catch continue;
defer self.allocator.free(path);
const st = std.Io.Dir.cwd().statFile(io, path, .{}) catch continue;
stats.files += 1;
stats.bytes += st.size;
},
.directory => {
stats.symbols += 1;
const subpath = std.fs.path.join(self.allocator, &.{ self.cache_dir, entry.name }) catch continue;
defer self.allocator.free(subpath);
var sub = std.Io.Dir.cwd().openDir(io, subpath, .{ .iterate = true }) catch continue;
defer sub.close(io);
var sub_iter = sub.iterate();
while (sub_iter.next(io) catch null) |f| {
if (f.kind != .file) continue;
const fpath = std.fs.path.join(self.allocator, &.{ subpath, f.name }) catch continue;
defer self.allocator.free(fpath);
const st = std.Io.Dir.cwd().statFile(io, fpath, .{}) catch continue;
stats.files += 1;
stats.bytes += st.size;
}
},
else => {},
}
}
return stats;
}
// ── Generic typed API ────────────────────────────────────────
/// Map a model type to its cache DataType.
pub fn dataTypeFor(comptime T: type) DataType {
return switch (T) {
Candle => .candles_daily,
Dividend => .dividends,
Split => .splits,
EarningsEvent => .earnings,
OptionsChain => .options,
Wikidata.ClassificationRecord => .classification,
Edgar.EtfMetricRecord => .etf_metrics,
Edgar.EntityFactRecord => .entity_facts,
Edgar.MutualFundTickerEntry => .tickers_funds,
Edgar.CompanyTickerEntry => .tickers_companies,
else => @compileError("unsupported type for Store"),
};
}
/// The data payload for a given type. Every supported type is
/// cached as a slice of records.
pub fn DataFor(comptime T: type) type {
return []T;
}
pub fn CacheResult(comptime T: type) type {
return struct {
data: DataFor(T),
timestamp: i64,
/// The on-disk `#!expires=` directive, or null when the
/// file carried none. Surfaced so the merge path can
/// preserve the primary provider's freshness clock when a
/// secondary source supplements the file (see
/// `writeSupplement`). Most callers ignore it.
expires: ?i64 = null,
};
}
pub const Freshness = enum { fresh_only, any };
/// Read and deserialize cached data. With `.fresh_only`, returns null if stale.
/// With `.any`, returns data regardless of freshness.
///
/// `allocator` owns the returned `CacheResult.data`. It can be
/// the same as `self.allocator` (the historical default) or a
/// caller-supplied arena. Internal scratch (raw cache bytes,
/// SRF iterator state) still uses `self.allocator` because it
/// gets freed before this function returns.
pub fn read(
self: *Store,
allocator: std.mem.Allocator,
comptime T: type,
symbol: []const u8,
comptime postProcess: ?*const fn (*T, std.mem.Allocator) anyerror!void,
comptime freshness: Freshness,
) ?CacheResult(T) {
const raw = self.readRaw(symbol, dataTypeFor(T)) catch return null;
const data = raw orelse return null;
defer self.allocator.free(data);
// Read-path self-heal: if a cached candles_daily file doesn't
// look like a complete SRF doc, archive the torn body for
// post-mortem and wipe the candle pair (daily + meta) so the
// next call re-fetches from scratch. Limited to Candle on
// purpose - it's the only data type with a recurring tear
// history + sibling meta to keep in sync, and the only one
// where a silent re-parse-miss every run would be wasted work.
if (T == Candle and !looksCompleteSrf(data)) {
self.selfHealTornCandles(symbol, data);
return null;
}
if (T == OptionsChain) {
const is_negative = std.mem.eql(u8, data, negative_cache_content);
if (is_negative) {
if (freshness == .fresh_only) {
// Negative entries are always fresh - return empty data
return .{ .data = &.{}, .timestamp = std.Io.Timestamp.now(self.io, .real).toSeconds() };
}
return null;
}
var reader = std.Io.Reader.fixed(data);
var it = srf.iterator(&reader, self.allocator, .{ .parse_allocator = .none }) catch return null;
defer it.deinit();
if (freshness == .fresh_only) {
if (it.expires == null) return null;
if (!it.isFresh(self.io)) return null;
}
const timestamp = it.created orelse std.Io.Timestamp.now(self.io, .real).toSeconds();
const items = deserializeOptions(allocator, &it) catch return null;
return .{ .data = items, .timestamp = timestamp, .expires = it.expires };
}
return readSlice(T, self.io, allocator, data, postProcess, freshness);
}
/// Who owns the `#!expires=` freshness clock on a merge write.
///
/// `#!expires=` answers "when do we next expect to consult the
/// primary provider for this data type?" Only a fetch from that
/// primary may move it. A secondary source that opportunistically
/// supplements the file (e.g. Tiingo dividend rows arriving as a
/// side effect of a candle fetch) must merge its records without
/// resetting the clock, otherwise it masks a due primary refresh.
const ExpiryPolicy = enum {
/// Primary fetch: stamp `expires = now + ttl` (+ jitter). The
/// authoritative freshness signal.
bump,
/// Secondary supplement: keep the existing on-disk `expires`.
/// When there is no existing file yet, fall back to `bump` so
/// a brand-new symbol still gets an initial TTL (this is also
/// what keeps Tiingo-only setups, with no primary key, serving
/// freshly-written data instead of immediately re-missing).
preserve,
};
/// Serialize data and write to cache with the given TTL.
///
/// For `Dividend` and `Split`, this dispatches to `writeMerged`,
/// which performs sorted-union-with-existing semantics rather than
/// a clean overwrite. Both Tiingo's full-history view and
/// Polygon's targeted fetches converge to the same on-disk union
/// regardless of write order, and forward-looking entries from
/// Polygon are preserved across Tiingo refreshes.
pub fn write(
self: *Store,
comptime T: type,
symbol: []const u8,
items: DataFor(T),
ttl: TtlSpec,
) void {
self.writeWithSource(T, symbol, items, ttl, null);
}
/// Same as `write` but lets the caller attribute new entries to a
/// named source (e.g. `"tiingo"`). The source name appears in the
/// `info(cache)` log line emitted by `writeMerged` when a
/// previously-unseen dividend or split lands in the cache. For
/// types that don't go through the merge path, the hint is unused.
pub fn writeWithSource(
self: *Store,
comptime T: type,
symbol: []const u8,
items: DataFor(T),
ttl: TtlSpec,
source_hint: ?[]const u8,
) void {
if (T == Dividend or T == Split) {
self.writeMerged(T, symbol, items, ttl, source_hint, .bump);
return;
}
const expires = computeExpires(std.Io.Timestamp.now(self.io, .real).toSeconds(), ttl, symbol);
const data_type = dataTypeFor(T);
if (T == OptionsChain) {
const srf_data = serializeOptions(self.io, self.allocator, items, .{ .expires = expires }) catch |err| {
log.warn("{s}: failed to serialize options: {s}", .{ symbol, @errorName(err) });
return;
};
defer self.allocator.free(srf_data);
self.writeRaw(symbol, data_type, srf_data) catch |err| {
log.warn("{s}: failed to write options to cache: {s}", .{ symbol, @errorName(err) });
};
return;
}
const srf_data = serializeWithMeta(T, self.io, self.allocator, items, .{ .expires = expires }) catch |err| {
log.warn("{s}: failed to serialize {s}: {s}", .{ symbol, @tagName(data_type), @errorName(err) });
return;
};
defer self.allocator.free(srf_data);
self.writeRaw(symbol, data_type, srf_data) catch |err| {
log.warn("{s}: failed to write {s} to cache: {s}", .{ symbol, @tagName(data_type), @errorName(err) });
};
}
/// Merge supplementary `Dividend` / `Split` records from a
/// non-authoritative source into the cache **without** resetting
/// the `#!expires=` freshness clock. Use this for data that
/// arrives as a side effect of fetching something else (the
/// canonical case: Tiingo dividend/split rows piggybacking on a
/// candle fetch in `populateAllFromTiingo`). The records are
/// merged via the same sorted-union semantics as
/// `writeWithSource`; only the expiry handling differs - the
/// existing on-disk expires is preserved so the next primary
/// (Polygon) fetch decides freshness. If no file exists yet, an
/// initial TTL is established (see `ExpiryPolicy.preserve`).
///
/// `source_hint` names the source in the `info(cache)` log lines
/// (e.g. `"tiingo"`).
pub fn writeSupplement(
self: *Store,
comptime T: type,
symbol: []const u8,
items: DataFor(T),
source_hint: ?[]const u8,
) void {
comptime std.debug.assert(T == Dividend or T == Split);
// ttl is consulted only on the no-existing-file fallback
// inside `writeMerged`; derive it from the data type rather
// than asking the caller for a value that's usually ignored.
self.writeMerged(T, symbol, items, dataTypeFor(T).ttl(), source_hint, .preserve);
}
/// Sorted-union write for `Dividend` and `Split`. Reads the
/// existing cache file, merges `incoming` into it, sorts the
/// union descending by date, and writes the result. The file is
/// always rewritten (even on a no-op merge) so the `#!expires=`
/// directive is handled per `expiry`; see the note at the write
/// step below.
///
/// Two kinds of merge happen on each incoming entry:
///
/// **New key** - incoming entry's key (ex_date / split.date) is
/// not in the existing data. Append it; emit an `info(cache)
/// supplied` log line so the user is alerted when a source
/// surfaces a corporate action that wasn't already known. The
/// canonical case is Tiingo discovering SPYM's 2017-10-16 4:1
/// split that Polygon's reference endpoint doesn't return.
///
/// **Existing key, field-level upgrade** - incoming entry's key
/// matches an existing entry. For `Dividend`, walk the optional
/// fields (`pay_date`, `record_date`, `type`, `currency`) and
/// fill in any nulls on the existing record from the incoming
/// record's non-null values. Don't overwrite non-null fields.
/// `type = .unknown` counts as null-equivalent. This means
/// Polygon's richer Dividend records can fill in metadata
/// regardless of whether Tiingo wrote first or Polygon did -
/// the on-disk record is the union of all sources' knowledge,
/// with conflicts resolved by "first non-null wins." Each field
/// upgrade emits its own `info(cache) upgraded` log line.
///
/// `Split` has no optional fields, so the field-level path is a
/// no-op for splits and the merge collapses to "skip if key
/// already exists."
///
/// `source_hint`, when present, names the source in the log
/// lines (e.g. `"polygon"` / `"tiingo"`). Defaults to
/// `"fetch"` when null.
///
/// `expiry` selects whether this write owns the freshness clock
/// (`.bump`, the primary-fetch path via `writeWithSource`) or
/// preserves whatever the primary last set (`.preserve`, the
/// supplement path via `writeSupplement`). See `ExpiryPolicy`.
fn writeMerged(
self: *Store,
comptime T: type,
symbol: []const u8,
incoming: []const T,
ttl: TtlSpec,
source_hint: ?[]const u8,
expiry: ExpiryPolicy,
) void {
comptime std.debug.assert(T == Dividend or T == Split);
// Read existing entries (any freshness; we want the union of
// what's on disk, not just fresh data).
//
// The post-process callback dupes any heap-allocated string
// fields (Dividend.currency) into stable memory because the
// SRF iterator's backing buffer is freed inside `read()` and
// un-duped strings would become dangling pointers as soon as
// we return. The matching `deinit` in the cleanup `defer`
// below frees these duped strings after we're done with the
// merged list. Keep the post-process logic in lockstep with
// the deinit handling - they're a pair.
const existing_result = self.read(self.allocator, T, symbol, null, .any);
const existing: []const T = if (existing_result) |r| r.data else &.{};
// Snapshot the primary's freshness clock before we rewrite, so
// `.preserve` writes can put it back unchanged.
const existing_expires: ?i64 = if (existing_result) |r| r.expires else null;
defer if (existing_result != null) {
if (comptime @hasDecl(T, "deinit")) {
for (existing) |item| item.deinit(self.allocator);
}
self.allocator.free(existing);
};
// Build the union. Start with a mutable copy of existing
// entries, then either upgrade in place or append new
// entries from incoming.
var merged: std.ArrayList(T) = .empty;
defer merged.deinit(self.allocator);
merged.appendSlice(self.allocator, existing) catch return;
var added: usize = 0;
var upgraded: usize = 0;
for (incoming) |item| {
const key = mergeKey(T, item);
if (findKeyIndex(T, merged.items, key)) |idx| {
// Exact key match - try to upgrade existing record's
// optional fields from the incoming entry's non-null
// values.
upgraded += upgradeRecord(T, &merged.items[idx], item, symbol, source_hint);
} else if (findNearMatch(T, merged.items, item)) |_| {
// Same dividend, different ex_date convention - skip.
//
// Some providers report mutual fund dividends using the
// calendar last-day-of-month even when that falls on a
// weekend (e.g. FDRXX 2025-08-31 was a Sunday); others
// use the actual trading date (Tiingo: 2025-08-29).
// Same payment, two different ex_date strings. The
// amount-and-date-window matcher (±3 days, amount
// tolerance 0.0001) catches these without affecting
// legitimate same-amount dividends paid months apart.
//
// Existing entry wins (preserves whichever source
// wrote first; in practice the Polygon-rich record).
// No log line - this is a non-event from the user's
// perspective.
} else {
// Genuinely new entry - append.
merged.append(self.allocator, item) catch return;
added += 1;
logSupplied(T, symbol, item, source_hint);
}
}
// Note: even when nothing was added or upgraded, fall through
// and rewrite the file. On the `.bump` (primary) path the
// on-disk `#!expires=` directive needs refreshing every time
// we successfully fetched and merged, otherwise an aged-out
// file stays aged-out: every subsequent refresh pays the full
// provider rate-limiter cost only to discover no changes and
// skip the write, locking the cache into a permanent slow-path.
// The write itself is a sub-millisecond atomic rename of a
// tiny file, so saving it isn't worth the bookkeeping. On the
// `.preserve` (supplement) path we still rewrite to land merged
// records, but we put the existing expires back untouched.
// Sort descending by date (newest first), matching the
// existing on-disk convention.
std.mem.sort(T, merged.items, {}, lessByDateDesc(T));
// Serialize via the same generic path as `write` for
// non-merged types, but write the union we just built.
//
// wall-clock required: TTL math for the `.bump` path, and the
// no-existing-file fallback for `.preserve`.
const now_s = std.Io.Timestamp.now(self.io, .real).toSeconds();
const expires: i64 = switch (expiry) {
.bump => computeExpires(now_s, ttl, symbol),
// Keep the primary's clock. Brand-new file (no prior
// expires) -> establish one like a first fetch would.
.preserve => existing_expires orelse computeExpires(now_s, ttl, symbol),
};
const data_type = dataTypeFor(T);
const srf_data = serializeWithMeta(T, self.io, self.allocator, merged.items, .{ .expires = expires }) catch |err| {
log.warn("{s}: failed to serialize {s}: {s}", .{ symbol, @tagName(data_type), @errorName(err) });
return;
};
defer self.allocator.free(srf_data);
self.writeRaw(symbol, data_type, srf_data) catch |err| {
log.warn("{s}: failed to write {s} to cache: {s}", .{ symbol, @tagName(data_type), @errorName(err) });
};
}
fn mergeKey(comptime T: type, item: T) i32 {
if (T == Dividend) return item.ex_date.days;
if (T == Split) return item.date.days;
@compileError("mergeKey only defined for Dividend and Split");
}
fn findKeyIndex(comptime T: type, items: []const T, key: i32) ?usize {
for (items, 0..) |it, i| {
if (mergeKey(T, it) == key) return i;
}
return null;
}
/// Look for an existing entry that's almost certainly the same
/// event as `incoming`, just recorded with a different ex_date
/// convention. Only meaningful for `Dividend` - splits don't
/// have an amount field, so this is a no-op (returns null) for
/// `Split`.
///
/// The match rule for `Dividend`: existing entry within ±3
/// calendar days of incoming, with amount matching to the
/// LOOSER of:
/// - 0.0001 absolute (1/100 cent), OR
/// - 1% relative
///
/// The relative-tolerance arm catches provider rounding: Tiingo
/// sometimes truncates dividend amounts to 2-3 decimals while
/// Polygon keeps full precision (e.g. Polygon 0.040101457 vs
/// Tiingo 0.04 - same payment, different precision). The
/// absolute arm catches near-zero-amount cases where 1% is
/// stricter than 1/100 cent.
///
/// Canonical case: mutual funds where Polygon reports the
/// calendar month-end as ex_date even when it's a weekend, and
/// Tiingo reports the actual trading date.
fn findNearMatch(comptime T: type, items: []const T, incoming: T) ?usize {
if (T != Dividend) return null;
const incoming_days = incoming.ex_date.days;
const abs_tolerance: f64 = 0.0001;
const rel_tolerance: f64 = 0.01; // 1%
for (items, 0..) |it, i| {
const day_delta = @abs(it.ex_date.days - incoming_days);
if (day_delta > 3) continue;
if (day_delta == 0) continue; // exact match - handled by findKeyIndex
const amount_delta = @abs(it.amount - incoming.amount);
const ref = @max(@abs(it.amount), @abs(incoming.amount));
const effective_tolerance = @max(abs_tolerance, ref * rel_tolerance);
if (amount_delta <= effective_tolerance) return i;
}
return null;
}
fn lessByDateDesc(comptime T: type) fn (void, T, T) bool {
return struct {
fn lt(_: void, a: T, b: T) bool {
return mergeKey(T, a) > mergeKey(T, b);
}
}.lt;
}
/// Field-level upgrade for an existing record using values from
/// an incoming record with the same merge key. Returns the
/// number of fields actually upgraded so the caller can decide
/// whether to write the file. For `Split` this is always 0
/// (no optional fields). For `Dividend` it walks `pay_date`,
/// `record_date`, `type`, `currency` and fills nulls from the
/// incoming record's non-null values. `type = .unknown` is
/// treated as null-equivalent so a `.regular` from Polygon can
/// upgrade an `.unknown` from Tiingo.
fn upgradeRecord(
comptime T: type,
existing: *T,
incoming: T,
symbol: []const u8,
source_hint: ?[]const u8,
) usize {
if (T == Split) return 0;
if (T != Dividend) @compileError("upgradeRecord only defined for Dividend and Split");
const source = source_hint orelse "fetch";
var count: usize = 0;
const key = existing.ex_date;
if (existing.pay_date == null and incoming.pay_date != null) {
existing.pay_date = incoming.pay_date;
log.info("{s}: {s} upgraded ex_date {f}: pay_date null -> {f}", .{
symbol, source, key, incoming.pay_date.?,
});
count += 1;
}
if (existing.record_date == null and incoming.record_date != null) {
existing.record_date = incoming.record_date;
log.info("{s}: {s} upgraded ex_date {f}: record_date null -> {f}", .{
symbol, source, key, incoming.record_date.?,
});
count += 1;
}
if (existing.type == .unknown and incoming.type != .unknown) {
existing.type = incoming.type;
log.info("{s}: {s} upgraded ex_date {f}: type unknown -> {s}", .{
symbol, source, key, @tagName(incoming.type),
});
count += 1;
}
if (existing.currency == null and incoming.currency != null) {
// Borrow the incoming string. The merged list lives only
// until serialization completes inside writeMerged, and
// serialization makes its own copy via SRF format. The
// backing memory is owned by `incoming` (kept alive by
// the caller's slice), and `existing.currency`'s
// original null state means there's nothing to free.
existing.currency = incoming.currency;
log.info("{s}: {s} upgraded ex_date {f}: currency null -> {s}", .{
symbol, source, key, incoming.currency.?,
});
count += 1;
}
return count;
}
fn logSupplied(comptime T: type, symbol: []const u8, item: T, source_hint: ?[]const u8) void {
const source = source_hint orelse "fetch";
if (T == Dividend) {
log.info("{s}: {s} supplied dividend ex_date {f} amount ${d:.4}", .{
symbol, source, item.ex_date, item.amount,
});
} else if (T == Split) {
log.info("{s}: {s} supplied split {f} {d}:{d}", .{
symbol, source, item.date, @as(u64, @intFromFloat(item.numerator)), @as(u64, @intFromFloat(item.denominator)),
});
}
}
// ── Candle-specific API ──────────────────────────────────────
/// Write a full set of candles to cache (no expiry - historical facts don't expire).
/// Also updates candle metadata.
pub fn cacheCandles(self: *Store, symbol: []const u8, candles: []const Candle, provider: CandleProvider, fail_count: u8) void {
if (serializeCandles(self.allocator, candles, .{})) |srf_data| {
defer self.allocator.free(srf_data);
self.writeRaw(symbol, .candles_daily, srf_data) catch |err| {
log.warn("{s}: failed to write candles to cache: {s}", .{ symbol, @errorName(err) });
};
} else |err| {
log.warn("{s}: failed to serialize candles: {s}", .{ symbol, @errorName(err) });
}
if (candles.len > 0) {
const last = candles[candles.len - 1];
self.updateCandleMeta(symbol, last.close, last.date, provider, fail_count);
}
}
/// Append new candle records to the existing cache file.
/// Falls back to a full rewrite if append fails (e.g. file doesn't exist).
/// Also updates candle metadata.
pub fn appendCandles(self: *Store, symbol: []const u8, new_candles: []const Candle, provider: CandleProvider, fail_count: u8) void {
if (new_candles.len == 0) return;
if (serializeCandles(self.allocator, new_candles, .{ .emit_directives = false })) |srf_data| {
defer self.allocator.free(srf_data);
self.appendRaw(symbol, .candles_daily, srf_data) catch |append_err| {
// Append failed (file missing?) - fall back to full load + rewrite
log.debug("{s}: append failed ({s}), falling back to full rewrite", .{ symbol, @errorName(append_err) });
if (self.read(self.allocator, Candle, symbol, null, .any)) |existing| {
defer self.allocator.free(existing.data);
const merged = self.allocator.alloc(Candle, existing.data.len + new_candles.len) catch return;
defer self.allocator.free(merged);
@memcpy(merged[0..existing.data.len], existing.data);
@memcpy(merged[existing.data.len..], new_candles);
if (serializeCandles(self.allocator, merged, .{})) |full_data| {
defer self.allocator.free(full_data);
self.writeRaw(symbol, .candles_daily, full_data) catch |err| {
log.warn("{s}: failed to write merged candles to cache: {s}", .{ symbol, @errorName(err) });
};
} else |err| {
log.warn("{s}: failed to serialize merged candles: {s}", .{ symbol, @errorName(err) });
}
}
};
} else |err| {
log.warn("{s}: failed to serialize new candles for append: {s}", .{ symbol, @errorName(err) });
}
const last = new_candles[new_candles.len - 1];
self.updateCandleMeta(symbol, last.close, last.date, provider, fail_count);
}
/// Write (or refresh) candle metadata with a specific provider source.
pub fn updateCandleMeta(self: *Store, symbol: []const u8, last_close: f64, last_date: Date, provider: CandleProvider, fail_count: u8) void {
const expires = std.Io.Timestamp.now(self.io, .real).toSeconds() + Ttl.candles_latest;
const meta = CandleMeta{
.last_close = last_close,
.last_date = last_date,
.provider = provider,
.fail_count = fail_count,
};
if (serializeCandleMeta(self.io, self.allocator, meta, .{ .expires = expires })) |meta_data| {
defer self.allocator.free(meta_data);
self.writeRaw(symbol, .candles_meta, meta_data) catch |err| {
log.warn("{s}: failed to write candle metadata: {s}", .{ symbol, @errorName(err) });
};
} else |err| {
log.warn("{s}: failed to serialize candle metadata: {s}", .{ symbol, @errorName(err) });
}
}
// ── Cache management ─────────────────────────────────────────
/// Ensure the cache directory for a symbol exists.
pub fn ensureSymbolDir(self: *Store, symbol: []const u8) !void {
const path = try self.entryPath(symbol, "");
defer self.allocator.free(path);
std.Io.Dir.cwd().createDirPath(self.io, path) catch |err| switch (err) {
error.PathAlreadyExists => {},
else => return err,
};
}
/// Clear all cached data for a symbol.
pub fn clearSymbol(self: *Store, symbol: []const u8) !void {
const path = try self.entryPath(symbol, "");
defer self.allocator.free(path);
// Best-effort clear: deleting a non-existent symbol dir is
// a no-op success from the caller's POV, so log + continue.
std.Io.Dir.cwd().deleteTree(self.io, path) catch |err| std.log.debug("clearSymbol deleteTree({s}): {t}", .{ path, err });
}
/// Content of a negative cache entry (fetch failed, don't retry until --refresh).
pub const negative_cache_content = "#!srfv1\n# fetch_failed\n";
/// Write a negative cache entry for a symbol + data type.
/// This records that a fetch was attempted and failed, preventing repeated
/// network requests for symbols that will never resolve.
/// Cleared by --refresh (which calls clearData/invalidate).
pub fn writeNegative(self: *Store, symbol: []const u8, data_type: DataType) void {
// Best-effort: a write failure here just means we'll re-attempt
// the upstream fetch next call, which is correct behavior.
self.writeRaw(symbol, data_type, negative_cache_content) catch |err| std.log.debug("writeNegative({s}/{t}): {t}", .{ symbol, data_type, err });
}
/// Validate that a byte buffer looks like a complete SRF file.
///
/// A well-formed SRF file should:
/// - Be non-empty.
/// - Start with the `#!srfv1` version directive (the negative-cache
/// marker also starts this way, so a single check covers both).
/// - End with a newline. Every record and every directive is
/// newline-terminated; a completely-written file always has a
/// trailing `\n`.
///
/// Torn HTTP body writes land short of the final newline. The FRDM
/// corruption on 2026-05-02 is the canonical example: the tail of
/// `candles_daily.srf` ended with `date::2026-04` mid-record, no
/// comma, no OHLCV fields, no newline. Atomic file writes correctly
/// persist that truncated body - `writeFileAtomic` is about rename
/// integrity, not payload validity. This helper is the payload-
/// validity check that cache-write callers should gate on when the
/// bytes come from an untrusted source (server sync, external file
/// import). Locally-serialized payloads are complete by construction
/// and don't need to run through this.
pub fn looksCompleteSrf(data: []const u8) bool {
if (data.len == 0) return false;
if (!std.mem.startsWith(u8, data, "#!srfv1")) return false;
if (data[data.len - 1] != '\n') return false;
return true;
}
/// Why a body was classified as torn. Surfaced in the archived
/// `.meta` sidecar so post-mortems can distinguish between detection
/// paths.
pub const FailureReason = enum {
/// `looksCompleteSrf` rejected the body bytes after an HTTP
/// response. The data reached us from the network but didn't
/// satisfy the structural sanity check.
looks_complete_srf_failed,
/// A cached file on disk failed to parse when read. Indicates
/// either an earlier torn write that slipped past the guard, or
/// local filesystem corruption.
srf_parse_failed,
/// The integrity digest advertised by the server (e.g. an
/// `ETag: "sha256:..."`) did not match the body bytes we
/// received. Reserved for the HTTP integrity work in Milestone 2.
etag_mismatch,
};
/// Context captured alongside a torn body. All fields are optional
/// because the exact detection site determines which signals are
/// available. Consumed by `archiveTornBody` to produce the SRF
/// `.meta` sidecar.
pub const TornMeta = struct {
failure_reason: FailureReason,
/// HTTP status code when the body originated from an HTTP
/// response. Null for parse failures discovered on local read.
http_status: ?u16 = null,
/// Raw `Content-Length` header value from the HTTP response,
/// when the detection path has access to it.
http_content_length: ?u64 = null,
/// Source URL when the body came from an HTTP response.
server_url: ?[]const u8 = null,
/// Raw value of the server's `ETag` header, if present. Captured
/// verbatim (including surrounding quotes and any `sha256:` or
/// other scheme prefix) so post-mortem can see exactly what the
/// server advertised. Paired with `body_sha256` in the sidecar,
/// an `etag_mismatch` failure is trivially explained.
server_etag: ?[]const u8 = null,
};
/// Schema for the SRF `.meta` sidecar emitted by `archiveTornBody`.
/// Each field becomes a `key:type:value` entry in a single record
/// under a `#!srfv1` header. Optional fields with `null` defaults
/// are silently skipped by `srf.fmt` when unset - which is the
/// behavior we want for the http_*, server_*, and `?[]const u8`
/// fields that only some detection paths populate.
const TearRecord = struct {
/// Fixed marker so `_torn/*.meta` is distinguishable at a glance
/// from any other SRF record a future reader might encounter in
/// the cache tree.
type: []const u8,
symbol: []const u8,
data_type: DataType,
unix_ts: i64,
iso_ts: []const u8,
body_length: u64,
body_sha256: []const u8,
failure_reason: FailureReason,
zfin_commit: []const u8,
http_status: ?u16 = null,
http_content_length: ?u64 = null,
server_url: ?[]const u8 = null,
server_etag: ?[]const u8 = null,
last_200_bytes_hex: []const u8,
};
/// Archive a torn cache body for post-mortem diagnosis.
///
/// Writes two sibling files under `{cache_dir}/_torn/`:
/// `{symbol}_{data_type}_{unix_ms}.bin` - the raw bytes as received
/// `{symbol}_{data_type}_{unix_ms}.meta` - SRF-formatted context
///
/// Filenames carry a millisecond-resolution timestamp so a retry
/// loop that tears twice within the same wall-clock second
/// produces distinct archive pairs - two back-to-back captures are
/// the most valuable forensic signal we can produce (byte offsets,
/// tail shapes, and time deltas are all impossible to infer from a
/// single failure).
///
/// The `.meta` file mirrors the rest of the cache's on-disk format
/// so tooling can iterate it with the standard SRF reader. See
/// `TearRecord` above for the schema. Required fields are always
/// emitted; `http_status`, `http_content_length`, `server_url`,
/// and `server_etag` appear only when the caller populated them
/// in `TornMeta`.
///
/// Best-effort: a failure here should never mask the original
/// detection path's return value. Callers log.debug the outcome and
/// move on.
pub fn archiveTornBody(
io: std.Io,
allocator: std.mem.Allocator,
cache_dir: []const u8,
symbol: []const u8,
data_type: DataType,
bytes: []const u8,
meta: TornMeta,
) !void {
// Ensure the _torn/ directory exists.
const torn_dir = try std.fs.path.join(allocator, &.{ cache_dir, "_torn" });
defer allocator.free(torn_dir);
std.Io.Dir.cwd().createDirPath(io, torn_dir) catch |err| switch (err) {
error.PathAlreadyExists => {},
else => return err,
};
// Second-resolution timestamp for the SRF `unix_ts` field (plus
// ISO rendering). Millisecond-resolution timestamp is used in
// the filename so retries within the same wall-clock second
// produce distinct archive entries rather than overwriting each
// other - two back-to-back tears from a refresh retry are the
// most valuable forensic signal we can capture.
const ts = std.Io.Timestamp.now(io, .real).toSeconds();
const ts_ms = @divTrunc(std.Io.Timestamp.now(io, .real).nanoseconds, std.time.ns_per_ms);
const bin_name = try std.fmt.allocPrint(
allocator,
"{s}_{s}_{d}.bin",
.{ symbol, @tagName(data_type), ts_ms },
);
defer allocator.free(bin_name);
const bin_path = try std.fs.path.join(allocator, &.{ torn_dir, bin_name });
defer allocator.free(bin_path);
const meta_name = try std.fmt.allocPrint(
allocator,
"{s}_{s}_{d}.meta",
.{ symbol, @tagName(data_type), ts_ms },
);
defer allocator.free(meta_name);
const meta_path = try std.fs.path.join(allocator, &.{ torn_dir, meta_name });
defer allocator.free(meta_path);
// Write the raw body first - if this fails we don't bother with
// the sidecar, since the sidecar is only useful paired with bytes.
try atomic.writeFileAtomic(io, allocator, bin_path, bytes);
// Compute sha256 of the body for the sidecar.
var hash: [std.crypto.hash.sha2.Sha256.digest_length]u8 = undefined;
std.crypto.hash.sha2.Sha256.hash(bytes, &hash, .{});
var hash_hex: [std.crypto.hash.sha2.Sha256.digest_length * 2]u8 = undefined;
_ = try std.fmt.bufPrint(&hash_hex, "{x}", .{&hash});
// ISO-8601 UTC timestamp - computed by hand to avoid pulling in
// a dependency. Format: YYYY-MM-DDTHH:MM:SSZ.
const epoch_seconds = std.time.epoch.EpochSeconds{ .secs = @intCast(ts) };
const day_seconds = epoch_seconds.getDaySeconds();
const epoch_day = epoch_seconds.getEpochDay();
const year_day = epoch_day.calculateYearDay();
const month_day = year_day.calculateMonthDay();
var iso_buf: [32]u8 = undefined;
const iso_ts = try std.fmt.bufPrint(&iso_buf, "{d:0>4}-{d:0>2}-{d:0>2}T{d:0>2}:{d:0>2}:{d:0>2}Z", .{
year_day.year,
@intFromEnum(month_day.month),
month_day.day_index + 1,
day_seconds.getHoursIntoDay(),
day_seconds.getMinutesIntoHour(),
day_seconds.getSecondsIntoMinute(),
});
// Last 200 bytes as lowercase hex so the sidecar is
// grep-friendly regardless of what binary garbage sits in the
// torn tail.
const tail_len: usize = @min(bytes.len, 200);
const tail = bytes[bytes.len - tail_len ..];
const tail_hex = try allocator.alloc(u8, tail_len * 2);
defer allocator.free(tail_hex);
_ = try std.fmt.bufPrint(tail_hex, "{x}", .{tail});
const record = TearRecord{
.type = "tear_metadata",
.symbol = symbol,
.data_type = data_type,
.unix_ts = ts,
.iso_ts = iso_ts,
.body_length = bytes.len,
.body_sha256 = &hash_hex,
.failure_reason = meta.failure_reason,
.zfin_commit = version.version_string,
.http_status = meta.http_status,
.http_content_length = meta.http_content_length,
.server_url = meta.server_url,
.server_etag = meta.server_etag,
.last_200_bytes_hex = tail_hex,
};
var aw: std.Io.Writer.Allocating = .init(allocator);
defer aw.deinit();
const records = [_]TearRecord{record};
try aw.writer.print("{f}", .{srf.fmt(TearRecord, &records, .{})});
try atomic.writeFileAtomic(io, allocator, meta_path, aw.writer.buffered());
}
/// Read-path self-heal for candle data. On detecting a torn
/// `candles_daily.srf` at read time, archive the bytes with full
/// diagnostic context, then wipe both `candles_daily.srf` and its
/// companion `candles_meta.srf` so the next fetch cycle pulls a
/// clean pair.
///
/// Best-effort throughout - archive failures are logged at debug
/// and do NOT block the cache invalidation. The goal is to keep
/// the read path recoverable; diagnostics are a bonus.
fn selfHealTornCandles(self: *Store, symbol: []const u8, data: []const u8) void {
archiveTornBody(
self.io,
self.allocator,
self.cache_dir,
symbol,
.candles_daily,
data,
.{ .failure_reason = .srf_parse_failed },
) catch |err| {
log.debug(
"{s}: failed to archive torn candles_daily body ({d} bytes): {s}",
.{ symbol, data.len, @errorName(err) },
);
};
log.debug(
"{s}: self-heal wiping torn candles_daily+candles_meta pair ({d} bytes)",
.{ symbol, data.len },
);
self.clearData(symbol, .candles_daily);
self.clearData(symbol, .candles_meta);
}
/// Check if a cached data file is a negative entry (fetch_failed marker).
/// Negative entries are always considered "fresh" -- they never expire.
pub fn isNegative(self: *Store, symbol: []const u8, data_type: DataType) bool {
const path = self.entryPath(symbol, data_type.fileName()) catch return false;
defer self.allocator.free(path);
const file = std.Io.Dir.cwd().openFile(self.io, path, .{}) catch return false;
defer file.close(self.io);
var buf: [negative_cache_content.len]u8 = undefined;
var file_reader = file.reader(self.io, &.{});
const n = file_reader.interface.readSliceShort(&buf) catch return false;
return n == negative_cache_content.len and
std.mem.eql(u8, buf[0..n], negative_cache_content);
}
/// Clear a specific data type for a symbol.
pub fn clearData(self: *Store, symbol: []const u8, data_type: DataType) void {
const path = self.entryPath(symbol, data_type.fileName()) catch return;
defer self.allocator.free(path);
std.Io.Dir.cwd().deleteFile(self.io, path) catch |err| std.log.debug("clearData deleteFile({s}): {t}", .{ path, err });
}
/// Read the close price from the candle metadata file.
/// Returns null if no metadata exists.
pub fn readLastClose(self: *Store, symbol: []const u8) ?f64 {
const raw = self.readRaw(symbol, .candles_meta) catch return null;
const data = raw orelse return null;
defer self.allocator.free(data);
const meta = deserializeCandleMeta(self.allocator, data) catch return null;
return meta.last_close;
}
/// Read the full candle metadata (last_close, last_date) plus the `#!created=` timestamp.
/// Returns null if no metadata exists.
pub fn readCandleMeta(self: *Store, symbol: []const u8) ?struct { meta: CandleMeta, created: i64 } {
const raw = self.readRaw(symbol, .candles_meta) catch return null;
const data = raw orelse return null;
defer self.allocator.free(data);
var reader = std.Io.Reader.fixed(data);
var it = srf.iterator(&reader, self.allocator, .{ .parse_allocator = .none }) catch return null;
defer it.deinit();
const created = it.created orelse std.Io.Timestamp.now(self.io, .real).toSeconds();
const fields = (it.next() catch return null) orelse return null;
const meta = fields.to(CandleMeta, .{}) catch return null;
return .{ .meta = meta, .created = created };
}
/// Check if candle metadata is fresh using the embedded `#!expires=` directive.
pub fn isCandleMetaFresh(self: *Store, symbol: []const u8) bool {
const raw = self.readRaw(symbol, .candles_meta) catch return false;
const data = raw orelse return false;
defer self.allocator.free(data);
if (std.mem.indexOf(u8, data, "# fetch_failed")) |_| return true;
var reader = std.Io.Reader.fixed(data);
const it = srf.iterator(&reader, self.allocator, .{ .parse_allocator = .none }) catch return false;
defer it.deinit();
if (it.expires == null) return false;
return it.isFresh(self.io);
}
/// Clear all cached data.
pub fn clearAll(self: *Store) !void {
// Best-effort: clearing an already-absent cache dir is success.
std.Io.Dir.cwd().deleteTree(self.io, self.cache_dir) catch |err| std.log.debug("clearAll deleteTree({s}): {t}", .{ self.cache_dir, err });
}
// ── Public types ─────────────────────────────────────────────
/// Metadata stored in the separate candles_meta.srf file.
/// Allows fast price lookups and freshness checks without parsing the full candle file.
/// The `#!created=` directive tracks when this metadata was written (replaces fetched_at).
pub const CandleMeta = struct {
last_close: f64,
last_date: Date,
/// Which provider sourced the candle data. **No default
/// value on purpose** - SRF auto-elides fields whose value
/// equals their default, which would hide the provider line
/// when it equaled the implicit default. We want every cache
/// file to record its provider explicitly so cache inspection
/// can always answer "where did this come from?". Construction
/// sites must pass the provider explicitly.
///
/// Cache compatibility: pre-2026-05 caches that elided the
/// provider field will fail to deserialize after this change
/// (SRF returns FieldNotFoundOnFieldWithoutDefaultValue).
/// `readCandleMeta` swallows the error and returns null,
/// making the symbol look like a cache miss - `getCandles`
/// then triggers a fresh fetch via `populateAllFromTiingo`,
/// which writes a new meta file with the provider explicit.
/// The wipe happens naturally on first use post-upgrade.
provider: CandleProvider,
/// Consecutive transient failure count for the primary provider (Tiingo).
/// Incremented on ServerError; reset to 0 on success. When >= 3, the
/// symbol is degraded to a fallback provider until Tiingo recovers.
fail_count: u8 = 0,
};
pub const CandleProvider = enum {
/// Legacy: candles were sourced from TwelveData. No new
/// writes produce this value (TwelveData was demoted in an
/// earlier change because its `adj_close` was unreliable).
/// Cache reads still recognize the value for backwards
/// compatibility.
twelvedata,
/// Legacy: candles were sourced from Yahoo Finance. No new
/// writes produce this value (Yahoo was removed from the
/// candle pipeline in the 2026-05 audit; Yahoo is still used
/// for `getQuote` real-time prices but not for historical
/// candles). Cache reads still recognize the value for
/// backwards compatibility.
yahoo,
/// Active: candles sourced from Tiingo. The only value
/// produced by current writes.
tiingo,
pub fn fromString(s: []const u8) CandleProvider {
if (std.mem.eql(u8, s, "yahoo")) return .yahoo;
if (std.mem.eql(u8, s, "tiingo")) return .tiingo;
return .twelvedata;
}
};
// ── Private I/O ──────────────────────────────────────────────
fn readRaw(self: *Store, symbol: []const u8, data_type: DataType) !?[]const u8 {
const path = try self.entryPath(symbol, data_type.fileName());
defer self.allocator.free(path);
return std.Io.Dir.cwd().readFileAlloc(self.io, path, self.allocator, .limited(50 * 1024 * 1024)) catch |err| switch (err) {
error.FileNotFound => return null,
else => return err,
};
}
/// Write raw bytes to a cache file. Used by server sync to write
/// pre-serialized SRF data directly to the cache.
///
/// Atomic: writes to `<path>.tmp`, fsyncs, and renames. A concurrent
/// reader sees either the old complete file or the new complete file -
/// never a truncated-mid-write state. This matters because:
///
/// - `parallelServerSync` (service.zig) writes many cache files in
/// parallel; other code paths in the same process (TUI redraws,
/// progress callbacks that happen to peek, future concurrent
/// readers) must never observe a half-written file.
/// - The prior implementation used `createFile(truncate) + writeAll`,
/// which has a window between truncation and write-completion
/// where a reader gets 0..data.len bytes. The symptom was SRF
/// `custom parse of value 2026-04 failed : InvalidDateFormat`
/// - a date field truncated exactly 7 chars into its 10-char
/// value.
pub fn writeRaw(self: *Store, symbol: []const u8, data_type: DataType, data: []const u8) !void {
try self.ensureSymbolDir(symbol);
const path = try self.entryPath(symbol, data_type.fileName());
defer self.allocator.free(path);
try atomic.writeFileAtomic(self.io, self.allocator, path, data);
}
/// Append raw bytes to an existing cache file.
///
/// Implemented as read-existing + concat + atomic write rather than
/// a direct open-for-append, for the same reason as `writeRaw`:
/// a true append (`seekFromEnd + writeAll`) leaves a reader that hits
/// the file mid-append with a valid head + partial tail, which for
/// SRF data means a truncated trailing record. With atomic rewrite,
/// the rename primitive guarantees readers see either the pre-append
/// state or the post-append state, never an in-between.
///
/// Returns `error.FileNotFound` if the target doesn't exist yet -
/// callers are expected to fall back to a full rewrite path in that
/// case (see `appendCandles`).
fn appendRaw(self: *Store, symbol: []const u8, data_type: DataType, data: []const u8) !void {
const path = try self.entryPath(symbol, data_type.fileName());
defer self.allocator.free(path);
const existing = std.Io.Dir.cwd().readFileAlloc(self.io, path, self.allocator, .limited(50 * 1024 * 1024)) catch |err| switch (err) {
error.FileNotFound => return error.FileNotFound,
else => return err,
};
defer self.allocator.free(existing);
const combined = try self.allocator.alloc(u8, existing.len + data.len);
defer self.allocator.free(combined);
@memcpy(combined[0..existing.len], existing);
@memcpy(combined[existing.len..], data);
try atomic.writeFileAtomic(self.io, self.allocator, path, combined);
}
/// Build the on-disk path for a cache entry under `<cache_dir>/<key>/<file_name>`.
/// `key` is the entry's primary key - typically a ticker symbol,
/// but can also be a CIK (for entity-keyed data) or other stable
/// identifier. The cache layer is agnostic to which kind of key
/// the caller passed; the directory name is just whatever string
/// was supplied. Pass an empty `file_name` to get the entry's
/// directory path (used by housekeeping calls that operate on
/// the directory rather than a file inside it).
fn entryPath(self: *Store, key: []const u8, file_name: []const u8) ![]const u8 {
if (file_name.len == 0) {
return std.fs.path.join(self.allocator, &.{ self.cache_dir, key });
}
return std.fs.path.join(self.allocator, &.{ self.cache_dir, key, file_name });
}
// ── Private serialization: generic ───────────────────────────
/// Comptime: does T have any `[]const u8` fields (or
/// `?[]const u8`)? Drives the `parse_allocator` choice in
/// `readSlice` - types that don't need to retain string
/// values past `fields.to(T, .{})` can use `.none` and save
/// the allocator hit per parsed value.
///
/// Conservative: any slice-of-u8 field (with or without
/// optional, with or without const) flips this to false.
/// Composite types (custom structs with their own SRF parse
/// hooks) are NOT inspected - if a field's type isn't a
/// plain slice-of-u8, we assume it might internally allocate
/// strings during its custom parse and treat it as
/// string-bearing. This is the safe default; a future audit
/// can opt specific composites in.
fn hasNoStringFields(comptime T: type) bool {
const info = @typeInfo(T);
if (info != .@"struct") return false;
inline for (info.@"struct".fields) |f| {
const FT = f.type;
if (FT == []const u8 or FT == []u8 or
FT == ?[]const u8 or FT == ?[]u8) return false;
// Composite (struct / union / enum) field: assume it
// might be a wrapper that stashes a string. Bail.
const fti = @typeInfo(FT);
switch (fti) {
.int, .float, .bool, .@"enum" => {},
.optional => |opt| {
const ci = @typeInfo(opt.child);
switch (ci) {
.int, .float, .bool, .@"enum" => {},
else => return false,
}
},
.@"struct" => {
// Allow only the project's `Date` (pure i32
// wrapper). Detected by name (the @typeName
// result for our `src/Date.zig` ends in
// "Date" - sometimes shown as just "Date",
// sometimes as a longer-qualified path
// depending on how the type was reached).
if (!std.mem.endsWith(u8, @typeName(FT), "Date")) return false;
},
else => return false,
}
}
return true;
}
// ── hasNoStringFields tests ─────────────────────────────
//
// Pin the comptime predicate that drives the parse_allocator
// choice in `readSlice`. If a future field added to one of
// these types changes the classification, the test catches
// it before the perf optimization silently regresses (or
// worse - if a Candle-shape gets a `?[]const u8` field
// added without updating the test, parse_alloc would stay
// `.none` and the new string field would be a borrowed slice
// into freed-by-defer iterator memory).
test "hasNoStringFields: Candle is pure-numeric (Date+5×f64+u64)" {
try std.testing.expect(hasNoStringFields(Candle));
}
test "hasNoStringFields: Split is pure-numeric (Date+2×f64)" {
try std.testing.expect(hasNoStringFields(Split));
}
test "hasNoStringFields: Dividend has currency string -> false" {
// Dividend.currency is `?[]const u8` - caller keeps it
// past the iterator, so we MUST dupe.
try std.testing.expect(!hasNoStringFields(Dividend));
}
test "hasNoStringFields: EarningsEvent has string fields -> false" {
try std.testing.expect(!hasNoStringFields(EarningsEvent));
}
test "hasNoStringFields: synthetic shapes" {
// Pure ints/floats/bools/enums + Date - should pass.
const Pure = struct {
a: i32,
b: f64,
c: bool,
d: enum { x, y },
e: Date,
f: ?u32,
};
try std.testing.expect(hasNoStringFields(Pure));
// Bare []const u8 - should fail.
const HasString = struct {
a: i32,
b: []const u8,
};
try std.testing.expect(!hasNoStringFields(HasString));
// Optional []const u8 - should fail.
const HasOptString = struct {
a: i32,
b: ?[]const u8,
};
try std.testing.expect(!hasNoStringFields(HasOptString));
// []u8 (mutable) - should also fail. We don't ship any
// mutable-slice fields today, but the predicate guards
// against future drift.
const HasMutString = struct {
a: i32,
b: []u8,
};
try std.testing.expect(!hasNoStringFields(HasMutString));
}
test "hasNoStringFields: composite struct field that's not Date is treated as string-bearing" {
// Conservative default: if a field's type is a struct we
// don't recognize as Date, we don't try to inspect it
// recursively - assume it might allocate during its
// custom parse hook.
const InnerWithString = struct {
s: []const u8,
};
const Outer = struct {
x: i32,
y: InnerWithString,
};
try std.testing.expect(!hasNoStringFields(Outer));
}
test "hasNoStringFields: non-struct types return false" {
// The predicate is meaningful only for record types
// parsed by SRF (always structs in zfin). Anything else
// returns false defensively.
try std.testing.expect(!hasNoStringFields(u32));
try std.testing.expect(!hasNoStringFields([]const u8));
}
/// Hand-rolled specialized coercer for Candle records.
/// Bypasses SRF's generalized `fields.to(T, ...)` for the
/// hot Candle parse path: zfin's cold candle load deserializes
/// hundreds of thousands of records of fixed 7-field shape,
/// where `fields.to`'s per-field framework cost (coerce()
/// boundary, found-bitmap bookkeeping, inline-for dispatch
/// chain) dominates. Direct first-byte switch + struct
/// assignment is ~25x faster in ReleaseFast for the same
/// correct result on well-formed cache files.
///
/// Trade-off vs `fields.to`: this skips default-value
/// fallback, missing-field detection, and `coerce()`'s
/// strict type discipline. Adequate for our cache-write
/// invariant (every candle file we write contains exactly
/// the 7 fields below); inadequate for parsing arbitrary
/// user-supplied SRF data.
///
/// Cache discipline: keys we don't recognize (the `else`
/// arm) are silently skipped, matching `fields.to`'s
/// behavior on unknown fields. Records with missing fields
/// produce a Candle with the zero-init default for the
/// absent field - also matching the broader `fields.to`
/// contract since Candle's fields have no SRF defaults.
///
/// See SRF's `pub fn to` doc comment for the broader
/// "specialized vs generalized" trade-off discussion.
fn coerceCandleSpecialized(fields: srf.RecordIterator.FieldIterator) !Candle {
var c: Candle = .{
.date = Date.fromYmd(1970, 1, 1),
.open = 0,
.high = 0,
.low = 0,
.close = 0,
.adj_close = 0,
.volume = 0,
};
while (try fields.next()) |f| {
const key = f.key;
const val = f.value orelse continue;
// Switch on the first byte. All 7 Candle field names
// are first-byte-unique:
// d -> date o -> open h -> high
// l -> low c -> close a -> adj_close
// v -> volume
if (key.len == 0) continue;
switch (key[0]) {
'd' => if (val == .string) {
c.date = try Date.parse(val.string);
},
'o' => if (val == .number) {
c.open = val.number;
},
'h' => if (val == .number) {
c.high = val.number;
},
'l' => if (val == .number) {
c.low = val.number;
},
'c' => if (val == .number) {
c.close = val.number;
},
'a' => if (val == .number) {
c.adj_close = val.number;
},
'v' => if (val == .number) {
c.volume = @as(u64, @intFromFloat(val.number));
},
else => {},
}
}
return c;
}
/// Generic SRF deserializer with optional freshness check.
/// Single-pass: creates one iterator, optionally checks freshness, extracts
/// `#!created=` timestamp, and deserializes all records.
fn readSlice(
comptime T: type,
io: std.Io,
allocator: std.mem.Allocator,
data: []const u8,
comptime postProcess: ?*const fn (*T, std.mem.Allocator) anyerror!void,
comptime freshness: Freshness,
) ?CacheResult(T) {
var reader = std.Io.Reader.fixed(data);
// Choose `parse_allocator` based on whether T has string
// fields the caller needs to keep past the iterator.
//
// - **Pure-numeric types** (`Candle`: Date+5×f64+u64) have
// zero `[]const u8` fields. The only string seen during
// parse is the `date` value, which Date's custom-parse
// hook converts to `i32` immediately. Nothing needs to
// outlive the iterator. Use `.none` - borrowed slices
// into the input bytes; no allocator hits per record.
// - **String-bearing types** (Dividend, EarningsEvent,
// OptionsChain) have currency / frequency / source /
// option_type fields the caller keeps. Use the custom
// allocator so values are duped into the caller's
// storage and survive `it.deinit()`.
//
// Why a comptime branch and not a static setting per
// call site: keeps `readSlice` generic over T and routes
// the optimization through type information that's
// already comptime-known. Adding a new pure-numeric type
// (e.g. Split) is a one-line edit to the comptime check.
const parse_alloc: srf.ParseAllocator = if (comptime hasNoStringFields(T))
.none
else
.{ .custom = .initTo(allocator) };
var it = srf.iterator(&reader, allocator, .{
.parse_allocator = parse_alloc,
}) catch return null;
defer it.deinit();
if (freshness == .fresh_only) {
// Negative cache entries are always "fresh" - they match exactly
const is_negative = std.mem.eql(u8, data, negative_cache_content);
if (!is_negative) {
if (it.expires == null) return null;
if (!it.isFresh(io)) return null;
}
}
const timestamp: i64 = it.created orelse std.Io.Timestamp.now(io, .real).toSeconds();
var items: std.ArrayList(T) = .empty;
defer {
if (items.items.len != 0) {
if (comptime @hasDecl(T, "deinit")) {
for (items.items) |item| item.deinit(allocator);
}
items.deinit(allocator);
}
}
// Per-record coercion. Most types use SRF's generalized
// `fields.to(T, .{})` - correct for any struct shape but
// pays a per-field abstraction cost (coerce() boundary,
// found-bitmap bookkeeping, inline-for dispatch chain).
//
// Candle takes the specialized fast path: every cached
// candle file is millions of records of the same fixed
// 7-field shape, and the cold-load wall time was almost
// entirely `fields.to`. The hand-rolled coercer is ~25x
// faster in ReleaseFast for the same correctness on
// well-formed cache files. See SRF's `fields.to` doc
// comment for the trade-off discussion.
while (it.next() catch return null) |fields| {
var item: T = if (comptime T == Candle)
coerceCandleSpecialized(fields) catch continue
else
fields.to(T, .{}) catch continue;
if (comptime postProcess) |pp| {
pp(&item, allocator) catch {
if (comptime @hasDecl(T, "deinit")) item.deinit(allocator);
return null;
};
}
items.append(allocator, item) catch {
if (comptime @hasDecl(T, "deinit")) item.deinit(allocator);
return null;
};
}
const result = items.toOwnedSlice(allocator) catch return null;
items = .empty; // prevent defer from freeing the returned slice
return .{ .data = result, .timestamp = timestamp, .expires = it.expires };
}
/// Generic SRF serializer: emit directives (including `#!created=`) then data records.
fn serializeWithMeta(
comptime T: type,
io: std.Io,
allocator: std.mem.Allocator,
items: []const T,
options: srf.FormatOptions,
) ![]const u8 {
var aw: std.Io.Writer.Allocating = .init(allocator);
errdefer aw.deinit();
var opts = options;
opts.created = std.Io.Timestamp.now(io, .real).toSeconds();
try aw.writer.print("{f}", .{srf.fmt(T, items, opts)});
return aw.toOwnedSlice();
}
// ── Private serialization: candles ───────────────────────────
fn serializeCandles(allocator: std.mem.Allocator, candles: []const Candle, options: srf.FormatOptions) ![]const u8 {
var aw: std.Io.Writer.Allocating = .init(allocator);
errdefer aw.deinit();
try aw.writer.print("{f}", .{srf.fmt(Candle, candles, options)});
return aw.toOwnedSlice();
}
/// Serialize CandleMeta to its SRF on-disk representation.
/// Uses SRF's generic field emission. Because `CandleMeta`
/// declares no default for `provider`, every meta file emits
/// the provider line explicitly - cache inspection can always
/// answer "where did this come from?".
fn serializeCandleMeta(io: std.Io, allocator: std.mem.Allocator, meta: CandleMeta, options: srf.FormatOptions) ![]const u8 {
var aw: std.Io.Writer.Allocating = .init(allocator);
errdefer aw.deinit();
const items = [_]CandleMeta{meta};
var opts = options;
opts.created = std.Io.Timestamp.now(io, .real).toSeconds();
try aw.writer.print("{f}", .{srf.fmt(CandleMeta, &items, opts)});
return aw.toOwnedSlice();
}
fn deserializeCandleMeta(allocator: std.mem.Allocator, data: []const u8) !CandleMeta {
var reader = std.Io.Reader.fixed(data);
var it = srf.iterator(&reader, allocator, .{ .parse_allocator = .none }) catch return error.InvalidData;
defer it.deinit();
const fields = (try it.next()) orelse return error.InvalidData;
return fields.to(CandleMeta, .{}) catch error.InvalidData;
}
// ── Private serialization: options (bespoke) ─────────────────
const ChainHeader = struct {
expiration: Date,
symbol: []const u8,
price: ?f64 = null,
};
const OptionsRecord = union(enum) {
pub const srf_tag_field = "type";
chain: ChainHeader,
call: OptionContract,
put: OptionContract,
};
fn serializeOptions(io: std.Io, allocator: std.mem.Allocator, chains: []const OptionsChain, options: srf.FormatOptions) ![]const u8 {
var records: std.ArrayList(OptionsRecord) = .empty;
defer records.deinit(allocator);
for (chains) |chain| {
try records.append(allocator, .{ .chain = .{
.expiration = chain.expiration,
.symbol = chain.underlying_symbol,
.price = chain.underlying_price,
} });
for (chain.calls) |c|
try records.append(allocator, .{ .call = c });
for (chain.puts) |p|
try records.append(allocator, .{ .put = p });
}
var aw: std.Io.Writer.Allocating = .init(allocator);
errdefer aw.deinit();
var opts = options;
opts.created = std.Io.Timestamp.now(io, .real).toSeconds();
try aw.writer.print("{f}", .{srf.fmt(OptionsRecord, records.items, opts)});
return aw.toOwnedSlice();
}
fn deserializeOptions(allocator: std.mem.Allocator, it: *srf.RecordIterator) ![]OptionsChain {
var chains: std.ArrayList(OptionsChain) = .empty;
errdefer {
for (chains.items) |*ch| {
allocator.free(ch.underlying_symbol);
allocator.free(ch.calls);
allocator.free(ch.puts);
}
chains.deinit(allocator);
}
var exp_map = std.AutoHashMap(i32, usize).init(allocator);
defer exp_map.deinit();
var calls_map = std.AutoHashMap(usize, std.ArrayList(OptionContract)).init(allocator);
defer {
var iter = calls_map.valueIterator();
while (iter.next()) |v| v.deinit(allocator);
calls_map.deinit();
}
var puts_map = std.AutoHashMap(usize, std.ArrayList(OptionContract)).init(allocator);
defer {
var iter = puts_map.valueIterator();
while (iter.next()) |v| v.deinit(allocator);
puts_map.deinit();
}
while (try it.next()) |fields| {
const opt_rec = fields.to(OptionsRecord, .{}) catch continue;
switch (opt_rec) {
.chain => |ch| {
const idx = chains.items.len;
try chains.append(allocator, .{
.underlying_symbol = try allocator.dupe(u8, ch.symbol),
.underlying_price = ch.price,
.expiration = ch.expiration,
.calls = &.{},
.puts = &.{},
});
try exp_map.put(ch.expiration.days, idx);
},
.call => |c| {
if (exp_map.get(c.expiration.days)) |idx| {
const entry = try calls_map.getOrPut(idx);
if (!entry.found_existing) entry.value_ptr.* = .empty;
try entry.value_ptr.append(allocator, c);
}
},
.put => |c| {
if (exp_map.get(c.expiration.days)) |idx| {
const entry = try puts_map.getOrPut(idx);
if (!entry.found_existing) entry.value_ptr.* = .empty;
try entry.value_ptr.append(allocator, c);
}
},
}
}
for (chains.items, 0..) |*chain, idx| {
if (calls_map.getPtr(idx)) |cl| {
chain.calls = try cl.toOwnedSlice(allocator);
}
if (puts_map.getPtr(idx)) |pl| {
chain.puts = try pl.toOwnedSlice(allocator);
}
}
return chains.toOwnedSlice(allocator);
}
};
/// Serialize a portfolio (list of lots) to SRF format.
pub fn serializePortfolio(allocator: std.mem.Allocator, lots: []const Lot) ![]const u8 {
var aw: std.Io.Writer.Allocating = .init(allocator);
errdefer aw.deinit();
try aw.writer.print("{f}", .{srf.fmt(Lot, lots, .{})});
return aw.toOwnedSlice();
}
/// Deserialize a portfolio from SRF data. Caller owns the returned Portfolio.
pub fn deserializePortfolio(allocator: std.mem.Allocator, data: []const u8) !Portfolio {
var lots: std.ArrayList(Lot) = .empty;
errdefer {
for (lots.items) |lot| {
allocator.free(lot.symbol);
if (lot.note) |n| allocator.free(n);
if (lot.label) |l| allocator.free(l);
if (lot.account) |a| allocator.free(a);
if (lot.ticker) |t| allocator.free(t);
if (lot.underlying) |u| allocator.free(u);
}
lots.deinit(allocator);
}
var reader = std.Io.Reader.fixed(data);
var it = srf.iterator(&reader, allocator, .{ .parse_allocator = .none }) catch return error.InvalidData;
defer it.deinit();
var skipped: usize = 0;
while (try it.next()) |fields| {
const line = it.state.line;
var lot = fields.to(Lot, .{}) catch {
std.log.warn("portfolio: could not parse record at line {d}", .{line});
skipped += 1;
continue;
};
// Dupe owned strings before iterator.deinit() frees the backing buffer
lot.symbol = try allocator.dupe(u8, lot.symbol);
if (lot.note) |n| lot.note = try allocator.dupe(u8, n);
if (lot.label) |l| lot.label = try allocator.dupe(u8, l);
if (lot.account) |a| lot.account = try allocator.dupe(u8, a);
if (lot.ticker) |t| lot.ticker = try allocator.dupe(u8, t);
if (lot.underlying) |u| lot.underlying = try allocator.dupe(u8, u);
// Cash lots without a symbol get a placeholder
if (lot.symbol.len == 0) {
allocator.free(lot.symbol);
lot.symbol = switch (lot.security_type) {
.cash => try allocator.dupe(u8, "CASH"),
.illiquid => try allocator.dupe(u8, "ILLIQUID"),
else => {
std.log.warn("portfolio: record at line {d} has no symbol, skipping", .{line});
if (lot.note) |n| allocator.free(n);
if (lot.label) |l| allocator.free(l);
if (lot.account) |a| allocator.free(a);
if (lot.ticker) |t| allocator.free(t);
if (lot.underlying) |u| allocator.free(u);
skipped += 1;
continue;
},
};
}
try lots.append(allocator, lot);
}
if (skipped > 0) {
std.log.warn("portfolio: {d} record(s) could not be parsed and were skipped", .{skipped});
}
return .{
.lots = try lots.toOwnedSlice(allocator),
.allocator = allocator,
};
}
test "dividend serialize/deserialize round-trip" {
const io = std.testing.io;
const allocator = std.testing.allocator;
const divs = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 3, 15), .amount = 0.8325, .pay_date = Date.fromYmd(2024, 3, 28), .type = .regular },
.{ .ex_date = Date.fromYmd(2024, 6, 14), .amount = 0.9148, .type = .special },
};
const data = try Store.serializeWithMeta(Dividend, io, allocator, &divs, .{});
defer allocator.free(data);
// No postProcess needed - test data has no currency strings to dupe
const result = Store.readSlice(Dividend, io, allocator, data, null, .any) orelse return error.TestUnexpectedResult;
const parsed = result.data;
defer allocator.free(parsed);
try std.testing.expectEqual(@as(usize, 2), parsed.len);
try std.testing.expect(parsed[0].ex_date.eql(Date.fromYmd(2024, 3, 15)));
try std.testing.expectApproxEqAbs(@as(f64, 0.8325), parsed[0].amount, 0.0001);
try std.testing.expect(parsed[0].pay_date != null);
try std.testing.expect(parsed[0].pay_date.?.eql(Date.fromYmd(2024, 3, 28)));
try std.testing.expectEqual(DividendType.regular, parsed[0].type);
try std.testing.expect(parsed[1].ex_date.eql(Date.fromYmd(2024, 6, 14)));
try std.testing.expectApproxEqAbs(@as(f64, 0.9148), parsed[1].amount, 0.0001);
try std.testing.expect(parsed[1].pay_date == null);
try std.testing.expectEqual(DividendType.special, parsed[1].type);
}
test "earnings serialize/deserialize round-trip frees duped symbols" {
// Regression test for the earnings-tab memory leak: the cache-read
// path dupes each event's `symbol` string into the caller's
// allocator (string-bearing type, so `parse_allocator = .custom`).
// Freeing only the outer slice leaks every symbol. `freeSlice` must
// release them; testing.allocator fails the test on any leak.
const io = std.testing.io;
const allocator = std.testing.allocator;
const events = [_]EarningsEvent{
.{ .symbol = "AAPL", .date = Date.fromYmd(2025, 1, 30), .estimate = 2.67, .actual = 2.85 },
.{ .symbol = "AAPL", .date = Date.fromYmd(2024, 10, 31), .estimate = 1.55, .actual = 1.64 },
};
const data = try Store.serializeWithMeta(EarningsEvent, io, allocator, &events, .{});
defer allocator.free(data);
const result = Store.readSlice(EarningsEvent, io, allocator, data, null, .any) orelse return error.TestUnexpectedResult;
// The fix: route the free through freeSlice so the duped symbols go
// too. Before the fix this was `allocator.free(result.data)` and the
// symbols leaked.
defer EarningsEvent.freeSlice(allocator, result.data);
try std.testing.expectEqual(@as(usize, 2), result.data.len);
// Symbols survived the iterator teardown (proves they were duped).
try std.testing.expectEqualStrings("AAPL", result.data[0].symbol);
try std.testing.expectEqualStrings("AAPL", result.data[1].symbol);
try std.testing.expect(result.data[0].date.eql(Date.fromYmd(2025, 1, 30)));
}
test "split serialize/deserialize round-trip" {
const io = std.testing.io;
const allocator = std.testing.allocator;
const splits = [_]Split{
.{ .date = Date.fromYmd(2020, 8, 31), .numerator = 4, .denominator = 1 },
.{ .date = Date.fromYmd(2014, 6, 9), .numerator = 7, .denominator = 1 },
};
const data = try Store.serializeWithMeta(Split, io, allocator, &splits, .{});
defer allocator.free(data);
const result = Store.readSlice(Split, io, allocator, data, null, .any) orelse return error.TestUnexpectedResult;
const parsed = result.data;
defer allocator.free(parsed);
try std.testing.expectEqual(@as(usize, 2), parsed.len);
try std.testing.expect(parsed[0].date.eql(Date.fromYmd(2020, 8, 31)));
try std.testing.expectApproxEqAbs(@as(f64, 4), parsed[0].numerator, 0.001);
try std.testing.expectApproxEqAbs(@as(f64, 1), parsed[0].denominator, 0.001);
try std.testing.expect(parsed[1].date.eql(Date.fromYmd(2014, 6, 9)));
try std.testing.expectApproxEqAbs(@as(f64, 7), parsed[1].numerator, 0.001);
}
test "writeMerged Dividend: empty cache writes input sorted descending" {
const allocator = std.testing.allocator;
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
defer allocator.free(dir_path);
var s = Store.init(io, allocator, dir_path);
// Intentionally pass entries out of order - writeMerged must sort.
var incoming = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50 },
.{ .ex_date = Date.fromYmd(2024, 8, 15), .amount = 0.55 },
.{ .ex_date = Date.fromYmd(2024, 2, 15), .amount = 0.48 },
};
s.write(Dividend, "TEST", incoming[0..], .{ .seconds = Ttl.dividends });
const result = s.read(s.allocator, Dividend, "TEST", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
defer for (result.data) |d| d.deinit(allocator);
try std.testing.expectEqual(@as(usize, 3), result.data.len);
try std.testing.expect(result.data[0].ex_date.eql(Date.fromYmd(2024, 8, 15)));
try std.testing.expect(result.data[1].ex_date.eql(Date.fromYmd(2024, 5, 15)));
try std.testing.expect(result.data[2].ex_date.eql(Date.fromYmd(2024, 2, 15)));
}
test "writeMerged Dividend: existing entries preserved on key collision" {
const allocator = std.testing.allocator;
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
defer allocator.free(dir_path);
var s = Store.init(io, allocator, dir_path);
// Initial write: rich entry from "Polygon" with metadata.
var initial = [_]Dividend{
.{
.ex_date = Date.fromYmd(2024, 5, 15),
.pay_date = Date.fromYmd(2024, 6, 1),
.amount = 0.50,
.type = .regular,
},
};
s.write(Dividend, "TEST", initial[0..], .{ .seconds = Ttl.dividends });
// Second write: same ex_date, sparser entry (Tiingo-style: no pay_date, no type).
// Existing entry should win.
var incoming = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.99, .type = .unknown },
};
s.write(Dividend, "TEST", incoming[0..], .{ .seconds = Ttl.dividends });
const result = s.read(s.allocator, Dividend, "TEST", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
defer for (result.data) |d| d.deinit(allocator);
try std.testing.expectEqual(@as(usize, 1), result.data.len);
// Original Polygon-style amount (0.50) must remain - Tiingo's 0.99 must not overwrite.
try std.testing.expectApproxEqAbs(@as(f64, 0.50), result.data[0].amount, 0.001);
try std.testing.expect(result.data[0].pay_date != null);
try std.testing.expectEqual(DividendType.regular, result.data[0].type);
}
test "writeMerged Dividend: union sorted desc, new entry added" {
const allocator = std.testing.allocator;
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
defer allocator.free(dir_path);
var s = Store.init(io, allocator, dir_path);
var initial = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50, .type = .regular },
.{ .ex_date = Date.fromYmd(2024, 2, 15), .amount = 0.48, .type = .regular },
};
s.write(Dividend, "TEST", initial[0..], .{ .seconds = Ttl.dividends });
// New ex_date that wasn't already present.
var incoming = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 8, 15), .amount = 0.55 },
};
s.write(Dividend, "TEST", incoming[0..], .{ .seconds = Ttl.dividends });
const result = s.read(s.allocator, Dividend, "TEST", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
defer for (result.data) |d| d.deinit(allocator);
try std.testing.expectEqual(@as(usize, 3), result.data.len);
try std.testing.expect(result.data[0].ex_date.eql(Date.fromYmd(2024, 8, 15)));
try std.testing.expect(result.data[1].ex_date.eql(Date.fromYmd(2024, 5, 15)));
try std.testing.expect(result.data[2].ex_date.eql(Date.fromYmd(2024, 2, 15)));
}
test "diskStats: empty cache is all zeros; populated counts symbols/files/bytes" {
const allocator = std.testing.allocator;
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
defer allocator.free(dir_path);
var s = Store.init(io, allocator, dir_path);
// Fresh tmp dir -> nothing cached.
const empty = s.diskStats();
try std.testing.expectEqual(@as(usize, 0), empty.symbols);
try std.testing.expectEqual(@as(usize, 0), empty.files);
try std.testing.expectEqual(@as(u64, 0), empty.bytes);
// Two symbols; AAA carries two data types, BBB one.
var divs = [_]Dividend{.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50, .type = .regular }};
var splits = [_]Split{.{ .date = Date.fromYmd(2020, 8, 31), .numerator = 4, .denominator = 1 }};
s.write(Dividend, "AAA", divs[0..], .{ .seconds = Ttl.dividends });
s.write(Split, "AAA", splits[0..], .{ .seconds = Ttl.splits });
s.write(Dividend, "BBB", divs[0..], .{ .seconds = Ttl.dividends });
const stats = s.diskStats();
try std.testing.expectEqual(@as(usize, 2), stats.symbols); // AAA, BBB
try std.testing.expectEqual(@as(usize, 3), stats.files); // AAA/{dividends,splits} + BBB/dividends
try std.testing.expect(stats.bytes > 0);
}
test "writeMerged Dividend: no-change merge still rewrites to refresh expires" {
// PRIMARY path (`writeWithSource` / .bump). The "nothing
// changed" path used to skip the rewrite as an optimization.
// Problem: once the on-disk `#!expires=` aged past TTL, every
// subsequent refresh paid the full provider rate-limiter cost
// only to discover no changes and skip the write again, locking
// the cache into a permanent slow-path.
//
// Now we always rewrite. The write is a sub-millisecond atomic
// rename of a tiny file; saving it isn't worth the bookkeeping.
// (Contrast: the supplement / .preserve path keeps the existing
// expires - see the writeSupplement tests below.)
const allocator = std.testing.allocator;
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
defer allocator.free(dir_path);
var s = Store.init(io, allocator, dir_path);
// Seed a file with expires 30 days in the past - the aged-out
// case that motivated this fix. (Pre-fix: the no-change merge
// would skip the write and the file would stay aged-out
// forever. Post-fix: the file gets rewritten with a fresh
// expires.)
const now_s = std.Io.Timestamp.now(io, .real).toSeconds();
const seed_expires = now_s - 30 * std.time.s_per_day;
const divs = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50, .type = .regular },
};
const seed_bytes = try Store.serializeWithMeta(Dividend, io, allocator, &divs, .{ .expires = seed_expires });
defer allocator.free(seed_bytes);
try s.writeRaw("TEST", .dividends, seed_bytes);
// Same incoming entry - nothing new, but we still expect a rewrite.
var repeat = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50, .type = .regular },
};
s.writeWithSource(Dividend, "TEST", repeat[0..], .{ .seconds = Ttl.dividends }, "polygon");
// Confirm fresh expires landed on disk: read the raw file and
// parse out the directive, expecting it to be roughly now+14d.
const path = try std.fs.path.join(allocator, &.{ dir_path, "TEST", "dividends.srf" });
defer allocator.free(path);
const data = try std.Io.Dir.cwd().readFileAlloc(io, path, allocator, .limited(1024 * 1024));
defer allocator.free(data);
var reader = std.Io.Reader.fixed(data);
const it = try srf.iterator(&reader, allocator, .{ .parse_allocator = .none });
defer it.deinit();
const new_expires = it.expires orelse return error.ExpiresMissing;
try std.testing.expect(new_expires > now_s);
try std.testing.expect(new_expires - now_s > 13 * std.time.s_per_day);
}
test "writeSupplement Dividend: preserves an existing future expires" {
// SUPPLEMENT path (.preserve). A Tiingo-driven supplement must
// merge new records without moving the primary's freshness
// clock. Seed a known future expires, supplement a brand-new
// dividend, and confirm the on-disk expires is exactly the
// seeded value (not re-stamped to now+TTL).
const allocator = std.testing.allocator;
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
defer allocator.free(dir_path);
var s = Store.init(io, allocator, dir_path);
const now_s = std.Io.Timestamp.now(io, .real).toSeconds();
const seed_expires = now_s + 100 * std.time.s_per_day;
const seed_divs = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50, .type = .regular },
};
const seed_bytes = try Store.serializeWithMeta(Dividend, io, allocator, &seed_divs, .{ .expires = seed_expires });
defer allocator.free(seed_bytes);
try s.writeRaw("TEST", .dividends, seed_bytes);
// Tiingo supplies a genuinely-new (months-earlier) dividend, so a
// real merge + rewrite happens.
var incoming = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 2, 15), .amount = 0.50 },
};
s.writeSupplement(Dividend, "TEST", incoming[0..], "tiingo");
const path = try std.fs.path.join(allocator, &.{ dir_path, "TEST", "dividends.srf" });
defer allocator.free(path);
const data = try std.Io.Dir.cwd().readFileAlloc(io, path, allocator, .limited(1024 * 1024));
defer allocator.free(data);
var reader = std.Io.Reader.fixed(data);
const it = try srf.iterator(&reader, allocator, .{ .parse_allocator = .none });
defer it.deinit();
// Expires preserved exactly - the candle-driven Tiingo write did
// not reset Polygon's clock.
try std.testing.expectEqual(seed_expires, it.expires orelse return error.ExpiresMissing);
// And the merge actually landed (union grew to 2).
const result = s.read(s.allocator, Dividend, "TEST", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
defer for (result.data) |d| d.deinit(allocator);
try std.testing.expectEqual(@as(usize, 2), result.data.len);
}
test "writeSupplement Dividend: preserves an aged-out expires (does not refresh)" {
// Direct contrast with the .bump no-change test above: a Tiingo
// supplement on an already-expired file leaves the past expires
// in place, so the next getDividends still treats it as stale and
// consults the primary (Polygon). Tiingo backfilling old data is
// not a reason to consider the primary's data fresh.
const allocator = std.testing.allocator;
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
defer allocator.free(dir_path);
var s = Store.init(io, allocator, dir_path);
const now_s = std.Io.Timestamp.now(io, .real).toSeconds();
const seed_expires = now_s - 30 * std.time.s_per_day;
const seed_divs = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50, .type = .regular },
};
const seed_bytes = try Store.serializeWithMeta(Dividend, io, allocator, &seed_divs, .{ .expires = seed_expires });
defer allocator.free(seed_bytes);
try s.writeRaw("TEST", .dividends, seed_bytes);
var incoming = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 2, 15), .amount = 0.50 },
};
s.writeSupplement(Dividend, "TEST", incoming[0..], "tiingo");
const path = try std.fs.path.join(allocator, &.{ dir_path, "TEST", "dividends.srf" });
defer allocator.free(path);
const data = try std.Io.Dir.cwd().readFileAlloc(io, path, allocator, .limited(1024 * 1024));
defer allocator.free(data);
var reader = std.Io.Reader.fixed(data);
const it = try srf.iterator(&reader, allocator, .{ .parse_allocator = .none });
defer it.deinit();
// Still the seeded past value - NOT bumped to now+TTL.
try std.testing.expectEqual(seed_expires, it.expires orelse return error.ExpiresMissing);
}
test "writeSupplement Dividend: no existing file establishes an initial TTL" {
// No primary clock exists yet, so there's nothing to preserve.
// A supplement to a brand-new symbol behaves like a first fetch:
// it stamps a normal jittered TTL so the freshly-written data is
// actually served (important for setups with no primary key).
const allocator = std.testing.allocator;
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
defer allocator.free(dir_path);
var s = Store.init(io, allocator, dir_path);
const now_s = std.Io.Timestamp.now(io, .real).toSeconds();
var incoming = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50 },
};
s.writeSupplement(Dividend, "TEST", incoming[0..], "tiingo");
const path = try std.fs.path.join(allocator, &.{ dir_path, "TEST", "dividends.srf" });
defer allocator.free(path);
const data = try std.Io.Dir.cwd().readFileAlloc(io, path, allocator, .limited(1024 * 1024));
defer allocator.free(data);
var reader = std.Io.Reader.fixed(data);
const it = try srf.iterator(&reader, allocator, .{ .parse_allocator = .none });
defer it.deinit();
// ~14d TTL with ±11% jitter; comfortably bracket the band.
const new_expires = it.expires orelse return error.ExpiresMissing;
try std.testing.expect(new_expires - now_s > 13 * std.time.s_per_day);
try std.testing.expect(new_expires - now_s < 16 * std.time.s_per_day);
}
test "writeMerged Dividend: field-level upgrade fills nulls (Tiingo-then-Polygon)" {
// Simulates the order that happens during a server refresh:
// Tiingo writes its sparse view first (via populateAllFromTiingo
// during getCandles), then Polygon writes its rich view second
// (via fetchCached during getDividends). The on-disk record
// must end up with Polygon's metadata, not stuck with Tiingo's
// null fields.
const allocator = std.testing.allocator;
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
defer allocator.free(dir_path);
var s = Store.init(io, allocator, dir_path);
// Tiingo first: sparse - only ex_date and amount.
var tiingo_view = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50 },
};
s.writeSupplement(Dividend, "TEST", tiingo_view[0..], "tiingo");
// Polygon second: rich - pay_date, record_date, type, currency.
var polygon_view = [_]Dividend{
.{
.ex_date = Date.fromYmd(2024, 5, 15),
.pay_date = Date.fromYmd(2024, 6, 1),
.record_date = Date.fromYmd(2024, 5, 17),
.amount = 0.50,
.type = .regular,
// currency stays null in this fixture so we don't have
// to deal with allocation; the upgrade-currency case is
// covered by a separate test.
},
};
s.writeWithSource(Dividend, "TEST", polygon_view[0..], .{ .seconds = Ttl.dividends }, "polygon");
const result = s.read(s.allocator, Dividend, "TEST", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
defer for (result.data) |d| d.deinit(allocator);
try std.testing.expectEqual(@as(usize, 1), result.data.len);
try std.testing.expect(result.data[0].pay_date != null);
try std.testing.expect(result.data[0].pay_date.?.eql(Date.fromYmd(2024, 6, 1)));
try std.testing.expect(result.data[0].record_date != null);
try std.testing.expect(result.data[0].record_date.?.eql(Date.fromYmd(2024, 5, 17)));
try std.testing.expectEqual(DividendType.regular, result.data[0].type);
}
test "writeMerged Dividend: currency upgrade does not double-free" {
// Tiingo writes a sparse record (no currency). Polygon's later
// write supplies a heap-allocated currency string. The merge
// path must not let `existing.currency = incoming.currency`
// create two records that both believe they own the same
// buffer, otherwise std.testing.allocator's double-free
// detection trips when the caller's deinit runs later.
const allocator = std.testing.allocator;
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
defer allocator.free(dir_path);
var s = Store.init(io, allocator, dir_path);
// Tiingo first: sparse - no currency.
var tiingo_view = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50 },
};
s.writeWithSource(Dividend, "TEST", tiingo_view[0..], .{ .seconds = Ttl.dividends }, "tiingo");
// Polygon second: same ex_date, but supplies currency. Caller
// owns the heap allocation and frees it after writeMerged
// returns (mirrors how Polygon's fetchDividends works in
// production: returns slice with heap-allocated currency
// strings, caller deinits).
var polygon_view = [_]Dividend{
.{
.ex_date = Date.fromYmd(2024, 5, 15),
.amount = 0.50,
.currency = try allocator.dupe(u8, "USD"),
},
};
defer for (polygon_view) |d| d.deinit(allocator);
s.writeWithSource(Dividend, "TEST", polygon_view[0..], .{ .seconds = Ttl.dividends }, "polygon");
// Read back and verify the upgrade landed.
const result = s.read(s.allocator, Dividend, "TEST", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
defer for (result.data) |d| d.deinit(allocator);
try std.testing.expectEqual(@as(usize, 1), result.data.len);
try std.testing.expect(result.data[0].currency != null);
try std.testing.expectEqualStrings("USD", result.data[0].currency.?);
}
test "writeMerged Dividend: existing currency preserved on second write with different currency" {
// Polygon writes USD first. A later write with a different
// currency (CAD) must NOT overwrite - first non-null wins.
// This exercises the path where both existing and incoming
// have non-null currency strings, which is the trickiest
// shape for the merge primitive's lifetime management.
const allocator = std.testing.allocator;
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
defer allocator.free(dir_path);
var s = Store.init(io, allocator, dir_path);
var first = [_]Dividend{
.{
.ex_date = Date.fromYmd(2024, 5, 15),
.amount = 0.50,
.currency = try allocator.dupe(u8, "USD"),
},
};
defer for (first) |d| d.deinit(allocator);
s.writeWithSource(Dividend, "TEST", first[0..], .{ .seconds = Ttl.dividends }, "polygon");
var second = [_]Dividend{
.{
.ex_date = Date.fromYmd(2024, 5, 15),
.amount = 0.50,
.currency = try allocator.dupe(u8, "CAD"),
},
};
defer for (second) |d| d.deinit(allocator);
s.writeWithSource(Dividend, "TEST", second[0..], .{ .seconds = Ttl.dividends }, "polygon");
const result = s.read(s.allocator, Dividend, "TEST", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
defer for (result.data) |d| d.deinit(allocator);
try std.testing.expectEqual(@as(usize, 1), result.data.len);
try std.testing.expect(result.data[0].currency != null);
// First write's currency wins.
try std.testing.expectEqualStrings("USD", result.data[0].currency.?);
}
test "writeMerged Dividend: type unknown counts as null and gets upgraded" {
// Tiingo's dividend records always carry type = .unknown. A
// later Polygon write with type = .regular must upgrade the
// existing record. This is the "type-unknown is null-equivalent"
// rule.
const allocator = std.testing.allocator;
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
defer allocator.free(dir_path);
var s = Store.init(io, allocator, dir_path);
var tiingo_view = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50, .type = .unknown },
};
s.writeWithSource(Dividend, "TEST", tiingo_view[0..], .{ .seconds = Ttl.dividends }, "tiingo");
var polygon_view = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50, .type = .special },
};
s.writeWithSource(Dividend, "TEST", polygon_view[0..], .{ .seconds = Ttl.dividends }, "polygon");
const result = s.read(s.allocator, Dividend, "TEST", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
defer for (result.data) |d| d.deinit(allocator);
try std.testing.expectEqual(@as(usize, 1), result.data.len);
try std.testing.expectEqual(DividendType.special, result.data[0].type);
}
test "writeMerged Dividend: non-null fields are not overwritten" {
// If the existing record already has a non-null field, an
// incoming record's different value must NOT overwrite it. The
// rule is "first non-null wins." Catches a regression where the
// upgrade logic is too eager.
const allocator = std.testing.allocator;
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
defer allocator.free(dir_path);
var s = Store.init(io, allocator, dir_path);
var first = [_]Dividend{
.{
.ex_date = Date.fromYmd(2024, 5, 15),
.pay_date = Date.fromYmd(2024, 6, 1),
.amount = 0.50,
.type = .regular,
},
};
s.writeWithSource(Dividend, "TEST", first[0..], .{ .seconds = Ttl.dividends }, "polygon");
// Second write with a different (and wrong) pay_date and type.
var second = [_]Dividend{
.{
.ex_date = Date.fromYmd(2024, 5, 15),
.pay_date = Date.fromYmd(2099, 1, 1),
.amount = 0.50,
.type = .special,
},
};
s.writeWithSource(Dividend, "TEST", second[0..], .{ .seconds = Ttl.dividends }, "polygon");
const result = s.read(s.allocator, Dividend, "TEST", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
defer for (result.data) |d| d.deinit(allocator);
try std.testing.expectEqual(@as(usize, 1), result.data.len);
// First record's fields stick.
try std.testing.expect(result.data[0].pay_date.?.eql(Date.fromYmd(2024, 6, 1)));
try std.testing.expectEqual(DividendType.regular, result.data[0].type);
}
test "writeMerged Dividend: upgrade is no-op when both have same fields" {
// Both writes have the same ex_date, amount, pay_date, and
// type. There's nothing to upgrade and nothing new - but the
// file is still rewritten so the on-disk `#!expires=` directive
// gets refreshed. (Pre-rewrite-always behavior was to skip;
// that locked aged-out files into a permanent slow-path.)
const allocator = std.testing.allocator;
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
defer allocator.free(dir_path);
var s = Store.init(io, allocator, dir_path);
var initial = [_]Dividend{
.{
.ex_date = Date.fromYmd(2024, 5, 15),
.pay_date = Date.fromYmd(2024, 6, 1),
.amount = 0.50,
.type = .regular,
},
};
s.writeWithSource(Dividend, "TEST", initial[0..], .{ .seconds = Ttl.dividends }, "polygon");
var repeat = [_]Dividend{
.{
.ex_date = Date.fromYmd(2024, 5, 15),
.pay_date = Date.fromYmd(2024, 6, 1),
.amount = 0.50,
.type = .regular,
},
};
s.writeWithSource(Dividend, "TEST", repeat[0..], .{ .seconds = Ttl.dividends }, "polygon");
// The merged result is still just one record (no duplication).
const result = s.read(s.allocator, Dividend, "TEST", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
defer for (result.data) |d| d.deinit(allocator);
try std.testing.expectEqual(@as(usize, 1), result.data.len);
}
test "writeMerged Dividend: near-match dedup catches last-biz-day vs calendar-end" {
// FDRXX/FAGIX-style case: Polygon reports mutual fund dividends
// with calendar last-day-of-month as ex_date even when it's a
// weekend. Tiingo reports the actual last business day. Same
// payment, different ex_date string, identical amount. Without
// the near-match dedup, both end up in cache and total-return
// math double-counts the dividend.
const allocator = std.testing.allocator;
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
defer allocator.free(dir_path);
var s = Store.init(io, allocator, dir_path);
// Polygon's view: 2025-08-31 (Sunday - calendar end-of-month).
var polygon_view = [_]Dividend{
.{
.ex_date = Date.fromYmd(2025, 8, 31),
.pay_date = Date.fromYmd(2025, 9, 1),
.amount = 0.003422654,
.type = .regular,
},
};
s.writeWithSource(Dividend, "FDRXX", polygon_view[0..], .{ .seconds = Ttl.dividends }, "polygon");
// Tiingo's view: 2025-08-29 (Friday - last business day),
// identical amount.
var tiingo_view = [_]Dividend{
.{
.ex_date = Date.fromYmd(2025, 8, 29),
.amount = 0.003422654,
},
};
s.writeSupplement(Dividend, "FDRXX", tiingo_view[0..], "tiingo");
const result = s.read(s.allocator, Dividend, "FDRXX", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
defer for (result.data) |d| d.deinit(allocator);
// Only one entry should survive - the Polygon-rich one.
try std.testing.expectEqual(@as(usize, 1), result.data.len);
try std.testing.expect(result.data[0].ex_date.eql(Date.fromYmd(2025, 8, 31)));
try std.testing.expect(result.data[0].pay_date != null);
try std.testing.expectEqual(DividendType.regular, result.data[0].type);
}
test "writeMerged Dividend: near-match dedup respects 3-day window upper bound" {
// 4-day gap should NOT trigger near-match dedup. This guards
// against over-aggressive collapsing of legitimate distinct
// dividends (rare, but the rule is conservative).
const allocator = std.testing.allocator;
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
defer allocator.free(dir_path);
var s = Store.init(io, allocator, dir_path);
var first = [_]Dividend{
.{ .ex_date = Date.fromYmd(2025, 8, 31), .amount = 0.003422654, .type = .regular },
};
s.writeWithSource(Dividend, "TEST", first[0..], .{ .seconds = Ttl.dividends }, "polygon");
// 4 days earlier - outside the ±3 day window.
var second = [_]Dividend{
.{ .ex_date = Date.fromYmd(2025, 8, 27), .amount = 0.003422654 },
};
s.writeWithSource(Dividend, "TEST", second[0..], .{ .seconds = Ttl.dividends }, "tiingo");
const result = s.read(s.allocator, Dividend, "TEST", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
defer for (result.data) |d| d.deinit(allocator);
// Both entries kept - gap is too wide for near-match.
try std.testing.expectEqual(@as(usize, 2), result.data.len);
}
test "writeMerged Dividend: near-match dedup respects amount tolerance" {
// Different amounts within the date window should NOT collapse.
// This guards against false-positive collapsing of distinct
// dividends paid close together (rare in real life but possible
// with special dividends adjacent to a regular).
const allocator = std.testing.allocator;
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
defer allocator.free(dir_path);
var s = Store.init(io, allocator, dir_path);
var first = [_]Dividend{
.{ .ex_date = Date.fromYmd(2025, 8, 31), .amount = 0.003422654, .type = .regular },
};
s.writeWithSource(Dividend, "TEST", first[0..], .{ .seconds = Ttl.dividends }, "polygon");
// 2 days earlier, very different amount (>1% relative).
var second = [_]Dividend{
.{ .ex_date = Date.fromYmd(2025, 8, 29), .amount = 0.005 },
};
s.writeWithSource(Dividend, "TEST", second[0..], .{ .seconds = Ttl.dividends }, "tiingo");
const result = s.read(s.allocator, Dividend, "TEST", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
defer for (result.data) |d| d.deinit(allocator);
try std.testing.expectEqual(@as(usize, 2), result.data.len);
}
test "writeMerged Dividend: near-match dedup tolerates Tiingo amount rounding" {
// FAGIX-style case: Polygon reports amount with full precision
// (0.040101457), Tiingo truncates to 2 decimals (0.04). The
// absolute difference (0.000101) is just over the 0.0001 abs
// tolerance, but well within the 1% relative tolerance. The
// near-match dedup should treat them as the same event.
const allocator = std.testing.allocator;
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
defer allocator.free(dir_path);
var s = Store.init(io, allocator, dir_path);
var polygon_view = [_]Dividend{
.{
.ex_date = Date.fromYmd(2024, 8, 31),
.pay_date = Date.fromYmd(2024, 9, 1),
.amount = 0.040101457,
.type = .regular,
},
};
s.writeWithSource(Dividend, "FAGIX", polygon_view[0..], .{ .seconds = Ttl.dividends }, "polygon");
// Tiingo's view: 1 day earlier, amount rounded to 2 decimals.
var tiingo_view = [_]Dividend{
.{ .ex_date = Date.fromYmd(2024, 8, 30), .amount = 0.04 },
};
s.writeSupplement(Dividend, "FAGIX", tiingo_view[0..], "tiingo");
const result = s.read(s.allocator, Dividend, "FAGIX", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
defer for (result.data) |d| d.deinit(allocator);
try std.testing.expectEqual(@as(usize, 1), result.data.len);
try std.testing.expect(result.data[0].ex_date.eql(Date.fromYmd(2024, 8, 31)));
try std.testing.expectApproxEqAbs(@as(f64, 0.040101457), result.data[0].amount, 0.000001);
}
test "writeMerged Split: near-match dedup is a no-op (no amount field)" {
// Splits don't have an amount field, so findNearMatch returns
// null for Split. Two splits within 3 days with the same ratio
// would both be kept. This is the intended behavior - splits
// are rare events, and any close-together splits (e.g. a
// forward-then-reverse) are real distinct events.
const allocator = std.testing.allocator;
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
defer allocator.free(dir_path);
var s = Store.init(io, allocator, dir_path);
var first = [_]Split{
.{ .date = Date.fromYmd(2024, 6, 10), .numerator = 10, .denominator = 1 },
};
s.writeWithSource(Split, "TEST", first[0..], .{ .seconds = Ttl.splits }, "polygon");
// 1 day apart (would be in near-match window if it applied to
// splits). Still kept as distinct because the dedup logic
// only applies to Dividend.
var second = [_]Split{
.{ .date = Date.fromYmd(2024, 6, 11), .numerator = 10, .denominator = 1 },
};
s.writeWithSource(Split, "TEST", second[0..], .{ .seconds = Ttl.splits }, "tiingo");
const result = s.read(s.allocator, Split, "TEST", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
try std.testing.expectEqual(@as(usize, 2), result.data.len);
}
test "writeMerged Split: SPYM-style supplementary entry added" {
const allocator = std.testing.allocator;
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
defer allocator.free(dir_path);
var s = Store.init(io, allocator, dir_path);
// Polygon's view: empty (the bug case - Polygon doesn't carry SPYM's 2017 split).
var initial = [_]Split{};
s.write(Split, "SPYM", initial[0..], .{ .seconds = Ttl.splits });
// Tiingo supplements with the 2017 4:1 split.
var tiingo_view = [_]Split{
.{ .date = Date.fromYmd(2017, 10, 16), .numerator = 4, .denominator = 1 },
};
s.writeSupplement(Split, "SPYM", tiingo_view[0..], "tiingo");
const result = s.read(s.allocator, Split, "SPYM", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
try std.testing.expectEqual(@as(usize, 1), result.data.len);
try std.testing.expect(result.data[0].date.eql(Date.fromYmd(2017, 10, 16)));
try std.testing.expectApproxEqAbs(@as(f64, 4), result.data[0].numerator, 0.001);
}
test "writeMerged Split: forward-looking Polygon entry preserved across Tiingo refresh" {
// Simulates the ARCC-like case for splits: Polygon writes a
// forward-looking entry; a later Tiingo write must not erase it.
const allocator = std.testing.allocator;
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
defer allocator.free(dir_path);
var s = Store.init(io, allocator, dir_path);
// Polygon initial: includes a forward-looking entry.
var polygon_view = [_]Split{
.{ .date = Date.fromYmd(2026, 12, 1), .numerator = 2, .denominator = 1 },
.{ .date = Date.fromYmd(2020, 8, 31), .numerator = 4, .denominator = 1 },
};
s.write(Split, "TEST", polygon_view[0..], .{ .seconds = Ttl.splits });
// Tiingo refresh: only knows about historical entries (its own data
// doesn't include forward-looking yet-to-occur splits).
var tiingo_view = [_]Split{
.{ .date = Date.fromYmd(2020, 8, 31), .numerator = 4, .denominator = 1 },
};
s.writeSupplement(Split, "TEST", tiingo_view[0..], "tiingo");
const result = s.read(s.allocator, Split, "TEST", null, .any) orelse return error.NoCache;
defer allocator.free(result.data);
// Both entries must remain - Polygon's forward-looking entry survives.
try std.testing.expectEqual(@as(usize, 2), result.data.len);
try std.testing.expect(result.data[0].date.eql(Date.fromYmd(2026, 12, 1)));
try std.testing.expect(result.data[1].date.eql(Date.fromYmd(2020, 8, 31)));
}
test "portfolio serialize/deserialize round-trip" {
const allocator = std.testing.allocator;
// Today is after the lots' open_dates and after the one close_date,
// so "open" means "no close_date and not matured".
const today = Date.fromYmd(2024, 6, 1);
const lots = [_]Lot{
.{ .symbol = "AMZN", .shares = 10, .open_date = Date.fromYmd(2022, 3, 15), .open_price = 150.25 },
.{ .symbol = "AMZN", .shares = 5, .open_date = Date.fromYmd(2023, 6, 1), .open_price = 125.00, .close_date = Date.fromYmd(2024, 1, 15), .close_price = 185.50 },
.{ .symbol = "VTI", .shares = 100, .open_date = Date.fromYmd(2022, 1, 10), .open_price = 220.00 },
};
const data = try serializePortfolio(allocator, &lots);
defer allocator.free(data);
var portfolio = try deserializePortfolio(allocator, data);
defer portfolio.deinit();
try std.testing.expectEqual(@as(usize, 3), portfolio.lots.len);
try std.testing.expectEqualStrings("AMZN", portfolio.lots[0].symbol);
try std.testing.expectApproxEqAbs(@as(f64, 10), portfolio.lots[0].shares, 0.01);
try std.testing.expect(portfolio.lots[0].isOpen(today));
try std.testing.expectEqualStrings("AMZN", portfolio.lots[1].symbol);
try std.testing.expectApproxEqAbs(@as(f64, 5), portfolio.lots[1].shares, 0.01);
try std.testing.expect(!portfolio.lots[1].isOpen(today));
try std.testing.expect(portfolio.lots[1].close_date.?.eql(Date.fromYmd(2024, 1, 15)));
try std.testing.expectApproxEqAbs(@as(f64, 185.50), portfolio.lots[1].close_price.?, 0.01);
try std.testing.expectEqualStrings("VTI", portfolio.lots[2].symbol);
}
test "portfolio: cash lots without symbol get CASH placeholder" {
const allocator = std.testing.allocator;
// Raw SRF with a cash lot that has no symbol field
const data =
\\#!srfv1
\\security_type::cash,shares:num:598.66,open_date::2026-02-25,open_price:num:1.00,account::Savings
\\symbol::AAPL,shares:num:10,open_date::2024-01-15,open_price:num:150.00
\\
;
var portfolio = try deserializePortfolio(allocator, data);
defer portfolio.deinit();
try std.testing.expectEqual(@as(usize, 2), portfolio.lots.len);
// Cash lot: no symbol in data -> gets "CASH" placeholder
try std.testing.expectEqualStrings("CASH", portfolio.lots[0].symbol);
try std.testing.expectEqual(LotType.cash, portfolio.lots[0].security_type);
try std.testing.expectApproxEqAbs(@as(f64, 598.66), portfolio.lots[0].shares, 0.01);
try std.testing.expectEqualStrings("Savings", portfolio.lots[0].account.?);
// Stock lot: symbol present
try std.testing.expectEqualStrings("AAPL", portfolio.lots[1].symbol);
}
test "portfolio: price_ratio round-trip" {
const allocator = std.testing.allocator;
const data =
\\#!srfv1
\\symbol::02315N600,shares:num:100,open_date::2024-01-15,open_price:num:140.00,ticker::VTTHX,price_ratio:num:5.185,note::VANGUARD TARGET 2035
\\symbol::AAPL,shares:num:10,open_date::2024-03-01,open_price:num:150.00
\\
;
var portfolio = try deserializePortfolio(allocator, data);
defer portfolio.deinit();
try std.testing.expectEqual(@as(usize, 2), portfolio.lots.len);
// CUSIP lot with price_ratio and ticker
try std.testing.expectEqualStrings("02315N600", portfolio.lots[0].symbol);
try std.testing.expectEqualStrings("VTTHX", portfolio.lots[0].ticker.?);
try std.testing.expectEqualStrings("VTTHX", portfolio.lots[0].priceSymbol());
try std.testing.expectApproxEqAbs(@as(f64, 5.185), portfolio.lots[0].price_ratio, 0.001);
try std.testing.expectEqualStrings("VANGUARD TARGET 2035", portfolio.lots[0].note.?);
// Regular lot - no price_ratio (default 1.0)
try std.testing.expectEqualStrings("AAPL", portfolio.lots[1].symbol);
try std.testing.expectApproxEqAbs(@as(f64, 1.0), portfolio.lots[1].price_ratio, 0.001);
try std.testing.expect(portfolio.lots[1].ticker == null);
// Round-trip: serialize and deserialize again
const reserialized = try serializePortfolio(allocator, portfolio.lots);
defer allocator.free(reserialized);
var portfolio2 = try deserializePortfolio(allocator, reserialized);
defer portfolio2.deinit();
try std.testing.expectEqual(@as(usize, 2), portfolio2.lots.len);
try std.testing.expectApproxEqAbs(@as(f64, 5.185), portfolio2.lots[0].price_ratio, 0.001);
try std.testing.expectEqualStrings("VTTHX", portfolio2.lots[0].ticker.?);
try std.testing.expectApproxEqAbs(@as(f64, 1.0), portfolio2.lots[1].price_ratio, 0.001);
}
test "portfolio: label:: round-trip" {
const allocator = std.testing.allocator;
const data =
\\#!srfv1
\\symbol::02315N600,shares:num:100,open_date::2024-01-15,open_price:num:140.00,note::some annotation,label::TGT2035
\\symbol::AAPL,shares:num:10,open_date::2024-03-01,open_price:num:150.00
\\
;
var portfolio = try deserializePortfolio(allocator, data);
defer portfolio.deinit();
try std.testing.expectEqual(@as(usize, 2), portfolio.lots.len);
// Label parses and is independent of the note.
try std.testing.expectEqualStrings("TGT2035", portfolio.lots[0].label.?);
try std.testing.expectEqualStrings("some annotation", portfolio.lots[0].note.?);
// displaySymbol() prefers the label over the raw CUSIP.
try std.testing.expectEqualStrings("TGT2035", portfolio.lots[0].displaySymbol());
// No label: displaySymbol() falls back to priceSymbol().
try std.testing.expect(portfolio.lots[1].label == null);
try std.testing.expectEqualStrings("AAPL", portfolio.lots[1].displaySymbol());
// Round-trip: the label survives serialize -> deserialize.
const reserialized = try serializePortfolio(allocator, portfolio.lots);
defer allocator.free(reserialized);
try std.testing.expect(std.mem.indexOf(u8, reserialized, "label::TGT2035") != null);
var portfolio2 = try deserializePortfolio(allocator, reserialized);
defer portfolio2.deinit();
try std.testing.expectEqualStrings("TGT2035", portfolio2.lots[0].label.?);
try std.testing.expect(portfolio2.lots[1].label == null);
}
// ── TTL and Negative Cache Tests ─────────────────────────────────
test "TTL constants are reasonable" {
// Historical candles never expire
try std.testing.expectEqual(@as(i64, -1), Ttl.candles_historical);
// Latest candles expire just under 24 hours (allowing for cron jitter)
try std.testing.expect(Ttl.candles_latest > 23 * std.time.s_per_hour);
try std.testing.expect(Ttl.candles_latest < 24 * std.time.s_per_hour);
// Dividends and splits refresh biweekly
try std.testing.expectEqual(@as(i64, 14 * std.time.s_per_day), Ttl.dividends);
try std.testing.expectEqual(@as(i64, 14 * std.time.s_per_day), Ttl.splits);
// Options refresh hourly
try std.testing.expectEqual(@as(i64, std.time.s_per_hour), Ttl.options);
// Earnings refresh monthly
try std.testing.expectEqual(@as(i64, 30 * std.time.s_per_day), Ttl.earnings);
// New types: classification (90d) and etf_metrics (90d) refresh
// quarterly; entity_facts (30d) refreshes monthly.
try std.testing.expectEqual(@as(i64, 90 * std.time.s_per_day), Ttl.classification);
try std.testing.expectEqual(@as(i64, 90 * std.time.s_per_day), Ttl.etf_metrics);
try std.testing.expectEqual(@as(i64, 30 * std.time.s_per_day), Ttl.entity_facts);
// EDGAR ticker-map indexes refresh monthly with jitter.
try std.testing.expectEqual(@as(i64, 30 * std.time.s_per_day), Ttl.tickers_funds);
try std.testing.expectEqual(@as(i64, 30 * std.time.s_per_day), Ttl.tickers_companies);
}
test "DataType.ttl returns correct seconds and jitter policy" {
// 11% jitter: dividends and splits.
const div = DataType.dividends.ttl();
try std.testing.expectEqual(Ttl.dividends, div.seconds);
try std.testing.expectEqual(@as(u8, 11), div.jitter_pct);
const spl = DataType.splits.ttl();
try std.testing.expectEqual(Ttl.splits, spl.seconds);
try std.testing.expectEqual(@as(u8, 11), spl.jitter_pct);
// 8% jitter: classification, etf_metrics, entity_facts, ticker maps.
const cls = DataType.classification.ttl();
try std.testing.expectEqual(Ttl.classification, cls.seconds);
try std.testing.expectEqual(@as(u8, 8), cls.jitter_pct);
const em = DataType.etf_metrics.ttl();
try std.testing.expectEqual(Ttl.etf_metrics, em.seconds);
try std.testing.expectEqual(@as(u8, 8), em.jitter_pct);
const ef = DataType.entity_facts.ttl();
try std.testing.expectEqual(Ttl.entity_facts, ef.seconds);
try std.testing.expectEqual(@as(u8, 8), ef.jitter_pct);
const tf = DataType.tickers_funds.ttl();
try std.testing.expectEqual(Ttl.tickers_funds, tf.seconds);
try std.testing.expectEqual(@as(u8, 8), tf.jitter_pct);
const tc = DataType.tickers_companies.ttl();
try std.testing.expectEqual(Ttl.tickers_companies, tc.seconds);
try std.testing.expectEqual(@as(u8, 8), tc.jitter_pct);
// No jitter: short-TTL types.
try std.testing.expectEqual(Ttl.options, DataType.options.ttl().seconds);
try std.testing.expectEqual(@as(u8, 0), DataType.options.ttl().jitter_pct);
try std.testing.expectEqual(Ttl.earnings, DataType.earnings.ttl().seconds);
try std.testing.expectEqual(@as(u8, 0), DataType.earnings.ttl().jitter_pct);
// candles_daily, candles_meta, and meta have their own writers
// (`cacheCandles`, `writeNegative`); calling .ttl() on them is
// unreachable and would panic. Not exercised here.
}
test "DataType.fileName returns correct file names" {
try std.testing.expectEqualStrings("candles_daily.srf", DataType.candles_daily.fileName());
try std.testing.expectEqualStrings("candles_meta.srf", DataType.candles_meta.fileName());
try std.testing.expectEqualStrings("dividends.srf", DataType.dividends.fileName());
try std.testing.expectEqualStrings("splits.srf", DataType.splits.fileName());
try std.testing.expectEqualStrings("options.srf", DataType.options.fileName());
try std.testing.expectEqualStrings("earnings.srf", DataType.earnings.fileName());
try std.testing.expectEqualStrings("meta.srf", DataType.meta.fileName());
try std.testing.expectEqualStrings("classification.srf", DataType.classification.fileName());
try std.testing.expectEqualStrings("etf_metrics.srf", DataType.etf_metrics.fileName());
try std.testing.expectEqualStrings("entity_facts.srf", DataType.entity_facts.fileName());
try std.testing.expectEqualStrings("tickers_funds.srf", DataType.tickers_funds.fileName());
try std.testing.expectEqualStrings("tickers_companies.srf", DataType.tickers_companies.fileName());
}
test "negative_cache_content format" {
// Negative cache marker should be valid SRF with a comment
try std.testing.expect(std.mem.startsWith(u8, Store.negative_cache_content, "#!srfv1"));
try std.testing.expect(std.mem.indexOf(u8, Store.negative_cache_content, "fetch_failed") != null);
}
test "computeExpires with jitter_pct=0 produces exact base TTL" {
const now: i64 = 1_000_000;
const ttl: i64 = 86_400; // one day
const e1 = computeExpires(now, .{ .seconds = ttl }, "AAPL");
const e2 = computeExpires(now, .{ .seconds = ttl }, "MSFT");
const e3 = computeExpires(now, .{ .seconds = ttl, .jitter_pct = 0 }, "VTI");
try std.testing.expectEqual(now + ttl, e1);
try std.testing.expectEqual(now + ttl, e2);
try std.testing.expectEqual(now + ttl, e3);
}
test "computeExpires with jitter_pct>0 spreads within bounds and is deterministic" {
const now: i64 = 1_000_000;
const ttl: i64 = 86_400 * 90; // 90 days
const jitter_pct: u8 = 8;
const span: i64 = @divFloor(ttl * jitter_pct, 100);
// Determinism: same key, same expiration.
const a1 = computeExpires(now, .{ .seconds = ttl, .jitter_pct = jitter_pct }, "AAPL");
const a2 = computeExpires(now, .{ .seconds = ttl, .jitter_pct = jitter_pct }, "AAPL");
try std.testing.expectEqual(a1, a2);
// Each key's expiration falls within ±span of the base.
const keys = [_][]const u8{ "AAPL", "MSFT", "GOOGL", "VTI", "SPY", "BND", "GLD" };
for (keys) |key| {
const exp = computeExpires(now, .{ .seconds = ttl, .jitter_pct = jitter_pct }, key);
const offset = exp - now - ttl;
try std.testing.expect(offset >= -span);
try std.testing.expect(offset <= span);
}
}
test "computeExpires with jitter_pct>0 spreads different keys to different expirations" {
const now: i64 = 1_000_000;
const ttl: i64 = 86_400 * 90;
const a = computeExpires(now, .{ .seconds = ttl, .jitter_pct = 8 }, "AAPL");
const m = computeExpires(now, .{ .seconds = ttl, .jitter_pct = 8 }, "MSFT");
const g = computeExpires(now, .{ .seconds = ttl, .jitter_pct = 8 }, "GOOGL");
// At least two of the three should differ - defends against the
// hash collapsing distinct inputs to the same offset.
const all_equal = a == m and m == g;
try std.testing.expect(!all_equal);
}
test "computeExpires bypasses jitter for negative-sentinel TTL" {
// Ttl.candles_historical = -1 means "never expires." Jitter
// must not fold a negative sentinel into something that looks
// like a finite expiration in the past.
const now: i64 = 1_000_000;
const e = computeExpires(now, .{ .seconds = -1, .jitter_pct = 8 }, "AAPL");
try std.testing.expectEqual(now - 1, e);
}
test "looksCompleteSrf: empty is invalid" {
try std.testing.expect(!Store.looksCompleteSrf(""));
}
test "looksCompleteSrf: missing srfv1 header rejected" {
try std.testing.expect(!Store.looksCompleteSrf("garbage\n"));
try std.testing.expect(!Store.looksCompleteSrf("some json body\n"));
try std.testing.expect(!Store.looksCompleteSrf("{\"error\":\"not found\"}\n"));
}
test "looksCompleteSrf: missing trailing newline rejected" {
// Classic torn-write symptom: body ends mid-record with no newline.
// FRDM 2026-05-02 looked like this: trailing `date::2026-04` with
// no comma, no remaining fields, no `\n`.
try std.testing.expect(!Store.looksCompleteSrf(
"#!srfv1\ndate::2026-04-22,open:num:62.82\ndate::2026-04",
));
try std.testing.expect(!Store.looksCompleteSrf("#!srfv1"));
}
test "looksCompleteSrf: well-formed body accepted" {
try std.testing.expect(Store.looksCompleteSrf(
"#!srfv1\ndate::2026-04-22,open:num:62.82,close:num:63.23\n",
));
// Negative cache content is also valid
try std.testing.expect(Store.looksCompleteSrf(Store.negative_cache_content));
// Minimal: just the header with its own terminator
try std.testing.expect(Store.looksCompleteSrf("#!srfv1\n"));
}
test "archiveTornBody writes .bin + .meta pair with expected SRF content" {
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", testing.allocator);
defer testing.allocator.free(dir_path);
// Shape mirrors the canonical FRDM torn body: a header plus a
// complete record, then a record that was cut mid-date-field with
// no terminating newline.
const torn_bytes = "#!srfv1\ndate::2026-04-22,open:num:62.82,close:num:63.23\ndate::2026-04";
try Store.archiveTornBody(
std.testing.io,
testing.allocator,
dir_path,
"FRDM",
.candles_daily,
torn_bytes,
.{
.failure_reason = .looks_complete_srf_failed,
.http_status = 200,
.http_content_length = 214637,
.server_url = "https://example.test/FRDM/candles",
.server_etag = "\"sha256:deadbeefcafe\"",
},
);
// Find the produced files. The timestamp in the name is produced
// at write time so we can't predict it - list the _torn dir and
// assert exactly one `.bin` + one `.meta`, and that they share a
// prefix (so they're unambiguously paired).
const torn_dir_path = try std.fs.path.join(testing.allocator, &.{ dir_path, "_torn" });
defer testing.allocator.free(torn_dir_path);
var torn_dir = try std.Io.Dir.cwd().openDir(std.testing.io, torn_dir_path, .{ .iterate = true });
defer torn_dir.close(io);
var bin_name_buf: [128]u8 = undefined;
var meta_name_buf: [128]u8 = undefined;
var bin_len: usize = 0;
var meta_len: usize = 0;
var it = torn_dir.iterate();
while (try it.next(io)) |entry| {
if (std.mem.endsWith(u8, entry.name, ".bin")) {
@memcpy(bin_name_buf[0..entry.name.len], entry.name);
bin_len = entry.name.len;
} else if (std.mem.endsWith(u8, entry.name, ".meta")) {
@memcpy(meta_name_buf[0..entry.name.len], entry.name);
meta_len = entry.name.len;
}
}
try std.testing.expect(bin_len > 0);
try std.testing.expect(meta_len > 0);
// Prefix without extension should match (same symbol + data_type
// + timestamp).
const bin_prefix = bin_name_buf[0 .. bin_len - ".bin".len];
const meta_prefix = meta_name_buf[0 .. meta_len - ".meta".len];
try std.testing.expectEqualStrings(bin_prefix, meta_prefix);
try std.testing.expect(std.mem.startsWith(u8, bin_prefix, "FRDM_candles_daily_"));
// .bin round-trips verbatim.
const bin_path = try std.fs.path.join(testing.allocator, &.{ torn_dir_path, bin_name_buf[0..bin_len] });
defer testing.allocator.free(bin_path);
const bin_contents = try std.Io.Dir.cwd().readFileAlloc(std.testing.io, bin_path, testing.allocator, .limited(1024 * 1024));
defer testing.allocator.free(bin_contents);
try std.testing.expectEqualStrings(torn_bytes, bin_contents);
// .meta is valid SRF and carries the fields we care about.
const meta_path = try std.fs.path.join(testing.allocator, &.{ torn_dir_path, meta_name_buf[0..meta_len] });
defer testing.allocator.free(meta_path);
const meta_contents = try std.Io.Dir.cwd().readFileAlloc(std.testing.io, meta_path, testing.allocator, .limited(1024 * 1024));
defer testing.allocator.free(meta_contents);
try std.testing.expect(std.mem.startsWith(u8, meta_contents, "#!srfv1\n"));
try std.testing.expect(std.mem.indexOf(u8, meta_contents, "symbol::FRDM") != null);
try std.testing.expect(std.mem.indexOf(u8, meta_contents, "data_type::candles_daily") != null);
try std.testing.expect(std.mem.indexOf(u8, meta_contents, "failure_reason::looks_complete_srf_failed") != null);
try std.testing.expect(std.mem.indexOf(u8, meta_contents, "http_status:num:200") != null);
try std.testing.expect(std.mem.indexOf(u8, meta_contents, "http_content_length:num:214637") != null);
try std.testing.expect(std.mem.indexOf(u8, meta_contents, "server_url::https://example.test/FRDM/candles") != null);
try std.testing.expect(std.mem.indexOf(u8, meta_contents, "server_etag::\"sha256:deadbeefcafe\"") != null);
// Body length matches the torn slice exactly.
var len_buf: [32]u8 = undefined;
const len_str = try std.fmt.bufPrint(&len_buf, "body_length:num:{d}", .{torn_bytes.len});
try std.testing.expect(std.mem.indexOf(u8, meta_contents, len_str) != null);
// Hex-encoded sha256 - length check (64 hex chars).
const sha_idx = std.mem.indexOf(u8, meta_contents, "body_sha256::").?;
const sha_start = sha_idx + "body_sha256::".len;
try std.testing.expect(meta_contents.len - sha_start >= 64);
for (meta_contents[sha_start .. sha_start + 64]) |c| {
try std.testing.expect(std.ascii.isHex(c));
}
// last_200_bytes_hex is exactly 2 * min(body_len, 200) chars.
const tail_idx = std.mem.indexOf(u8, meta_contents, "last_200_bytes_hex::").?;
const tail_start = tail_idx + "last_200_bytes_hex::".len;
const tail_end = std.mem.indexOfScalarPos(u8, meta_contents, tail_start, '\n').?;
try std.testing.expectEqual(@as(usize, torn_bytes.len * 2), tail_end - tail_start);
}
test "Store.read self-heals torn candles_daily and wipes the pair" {
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", testing.allocator);
defer testing.allocator.free(dir_path);
var store = Store.init(std.testing.io, testing.allocator, dir_path);
try store.ensureSymbolDir("FRDM");
// Seed a torn daily and an intact meta - the exact state we saw
// on disk for FRDM in the 2026-05-08 incident.
const torn_daily = "#!srfv1\ndate::2026-04-22,open:num:62.82,close:num:63.23\ndate::2026-04";
const intact_meta = "#!srfv1\n#!expires=9999999999\n#!created=1777000000\nlast_close:num:67.11,last_date::2026-05-07\n";
try store.writeRaw("FRDM", .candles_daily, torn_daily);
try store.writeRaw("FRDM", .candles_meta, intact_meta);
// Reading candles MUST signal cache miss.
const result = store.read(store.allocator, Candle, "FRDM", null, .any);
try std.testing.expect(result == null);
// Both files are wiped.
const daily_path = try std.fs.path.join(testing.allocator, &.{ dir_path, "FRDM", "candles_daily.srf" });
defer testing.allocator.free(daily_path);
const meta_path = try std.fs.path.join(testing.allocator, &.{ dir_path, "FRDM", "candles_meta.srf" });
defer testing.allocator.free(meta_path);
try std.testing.expectError(error.FileNotFound, std.Io.Dir.cwd().access(std.testing.io, daily_path, .{}));
try std.testing.expectError(error.FileNotFound, std.Io.Dir.cwd().access(std.testing.io, meta_path, .{}));
// And the torn body was archived under _torn/.
const torn_dir_path = try std.fs.path.join(testing.allocator, &.{ dir_path, "_torn" });
defer testing.allocator.free(torn_dir_path);
var torn_dir = try std.Io.Dir.cwd().openDir(std.testing.io, torn_dir_path, .{ .iterate = true });
defer torn_dir.close(io);
var found_bin = false;
var found_meta = false;
var it = torn_dir.iterate();
while (try it.next(io)) |entry| {
if (std.mem.startsWith(u8, entry.name, "FRDM_candles_daily_")) {
if (std.mem.endsWith(u8, entry.name, ".bin")) found_bin = true;
if (std.mem.endsWith(u8, entry.name, ".meta")) found_meta = true;
}
}
try std.testing.expect(found_bin);
try std.testing.expect(found_meta);
}
test "Store.read does not self-heal an intact candles_daily" {
const io = std.testing.io;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", testing.allocator);
defer testing.allocator.free(dir_path);
var store = Store.init(std.testing.io, testing.allocator, dir_path);
try store.ensureSymbolDir("OK");
// Seed a complete (well-formed) candles_daily.srf - a single
// record with the full OHLCV field set the Candle type expects.
const good_daily =
"#!srfv1\n" ++
"date::2026-04-22,open:num:62.82,high:num:63.24,low:num:62.6,close:num:63.23,adj_close:num:63.23,volume:num:189473\n";
const good_meta = "#!srfv1\n#!expires=9999999999\n#!created=1777000000\nlast_close:num:63.23,last_date::2026-04-22\n";
try store.writeRaw("OK", .candles_daily, good_daily);
try store.writeRaw("OK", .candles_meta, good_meta);
// Reading should succeed, and the cache files must still be there.
const result = store.read(store.allocator, Candle, "OK", null, .any);
try std.testing.expect(result != null);
if (result) |r| {
defer testing.allocator.free(r.data);
try std.testing.expectEqual(@as(usize, 1), r.data.len);
}
const daily_path = try std.fs.path.join(testing.allocator, &.{ dir_path, "OK", "candles_daily.srf" });
defer testing.allocator.free(daily_path);
const daily_stat = try std.Io.Dir.cwd().statFile(std.testing.io, daily_path, .{});
try std.testing.expect(daily_stat.size > 0);
// And no _torn/ directory was created.
const torn_dir_path = try std.fs.path.join(testing.allocator, &.{ dir_path, "_torn" });
defer testing.allocator.free(torn_dir_path);
try std.testing.expectError(error.FileNotFound, std.Io.Dir.cwd().access(std.testing.io, torn_dir_path, .{}));
}
test "Store.dataTypeFor maps model types correctly" {
try std.testing.expectEqual(DataType.candles_daily, Store.dataTypeFor(Candle));
try std.testing.expectEqual(DataType.dividends, Store.dataTypeFor(Dividend));
try std.testing.expectEqual(DataType.splits, Store.dataTypeFor(Split));
try std.testing.expectEqual(DataType.earnings, Store.dataTypeFor(EarningsEvent));
try std.testing.expectEqual(DataType.options, Store.dataTypeFor(OptionsChain));
}
test "Store.DataFor returns correct types" {
// Every supported type is cached as a slice of records.
try std.testing.expect(Store.DataFor(Candle) == []Candle);
try std.testing.expect(Store.DataFor(Dividend) == []Dividend);
try std.testing.expect(Store.DataFor(Split) == []Split);
}
test "Store.Freshness enum values" {
// Ensure enum has expected values
try std.testing.expect(Store.Freshness.fresh_only != Store.Freshness.any);
}
test "CandleProvider.fromString parses provider names" {
try std.testing.expectEqual(Store.CandleProvider.yahoo, Store.CandleProvider.fromString("yahoo"));
try std.testing.expectEqual(Store.CandleProvider.tiingo, Store.CandleProvider.fromString("tiingo"));
try std.testing.expectEqual(Store.CandleProvider.twelvedata, Store.CandleProvider.fromString("twelvedata"));
// Unknown defaults to twelvedata
try std.testing.expectEqual(Store.CandleProvider.twelvedata, Store.CandleProvider.fromString("unknown"));
try std.testing.expectEqual(Store.CandleProvider.twelvedata, Store.CandleProvider.fromString(""));
}
test "Store init creates valid store" {
const allocator = std.testing.allocator;
const store = Store.init(std.testing.io, allocator, "/tmp/zfin-test");
try std.testing.expectEqualStrings("/tmp/zfin-test", store.cache_dir);
}
test "CandleMeta has no default provider (must be set explicitly)" {
// Regression: pre-2026-05 the model had `provider: CandleProvider = .tiingo`
// which caused SRF to elide the field when it equaled the default,
// hiding the provider in cache inspection. We removed the default
// so every cache file records its provider explicitly.
//
// This test confirms the field is required at construction. If
// someone re-adds a default later, this test fails to compile.
const meta = Store.CandleMeta{
.last_close = 100.0,
.last_date = Date.fromYmd(2024, 1, 1),
.provider = .tiingo,
};
try std.testing.expectEqual(Store.CandleProvider.tiingo, meta.provider);
}
test "serializeCandleMeta unconditionally emits provider field" {
// Regression: SRF auto-elision used to hide `provider::tiingo`
// when it equaled the model default. With the default removed
// from the struct, SRF emits the field unconditionally.
const allocator = std.testing.allocator;
const meta = Store.CandleMeta{
.last_close = 100.0,
.last_date = Date.fromYmd(2024, 1, 1),
.provider = .tiingo,
};
const data = try Store.serializeCandleMeta(std.testing.io, allocator, meta, .{ .expires = 1234567890 });
defer allocator.free(data);
try std.testing.expect(std.mem.indexOf(u8, data, "provider::tiingo") != null);
try std.testing.expect(std.mem.indexOf(u8, data, "last_close:num:100") != null);
try std.testing.expect(std.mem.indexOf(u8, data, "last_date::2024-01-01") != null);
try std.testing.expect(std.mem.indexOf(u8, data, "#!expires=1234567890") != null);
}
test "serializeCandleMeta round-trips through deserializeCandleMeta" {
const allocator = std.testing.allocator;
const meta = Store.CandleMeta{
.last_close = 42.57,
.last_date = Date.fromYmd(2026, 5, 19),
.provider = .tiingo,
.fail_count = 2,
};
const data = try Store.serializeCandleMeta(std.testing.io, allocator, meta, .{ .expires = 1234567890 });
defer allocator.free(data);
const parsed = try Store.deserializeCandleMeta(allocator, data);
try std.testing.expectApproxEqAbs(@as(f64, 42.57), parsed.last_close, 0.001);
try std.testing.expect(parsed.last_date.eql(Date.fromYmd(2026, 5, 19)));
try std.testing.expectEqual(Store.CandleProvider.tiingo, parsed.provider);
try std.testing.expectEqual(@as(u8, 2), parsed.fail_count);
}
test "deserializeCandleMeta fails on old cache that elided provider field" {
// Pre-2026-05 cache files elided `provider::tiingo` when it
// equaled the model default. Those caches no longer deserialize
// (model has no default for provider). The graceful handling is
// upstream: `readCandleMeta` swallows the deserialization error
// and returns null, which makes `getCandles` treat it as a cache
// miss and trigger a fresh fetch via `populateAllFromTiingo`.
// The new fetch writes a meta file with the provider explicit.
//
// This test documents the failure mode and confirms it's not a
// silent corruption - the caller gets an error, not stale data.
const allocator = std.testing.allocator;
const old_format =
\\#!srfv1
\\#!expires=1779384748
\\#!created=1779299248
\\last_close:num:298.97,last_date::2026-05-19
\\
;
const result = Store.deserializeCandleMeta(allocator, old_format);
try std.testing.expectError(error.InvalidData, result);
}
// ── writeRaw / appendRaw atomicity ───────────────────────────
//
// A concurrent reader hitting a cache file mid-write must never see a
// truncated or partial-field state - the symptom was srf `custom parse
// of value 2026-04 failed : InvalidDateFormat`, a date field chopped
// exactly 7 chars into a 10-char value.
//
// These tests exercise the atomicity primitive underneath writeRaw/
// appendRaw: while one thread hammers writes of two alternating,
// differently-sized SRF blobs, reader threads hammer reads and assert
// that every read they succeed at parses cleanly as one of those two
// blobs - no partial content, no partial dates. A pre-atomic-fix
// version of writeRaw would fail this test within a handful of
// iterations.
const testing = std.testing;
test "writeRaw atomicity: concurrent readers never observe a truncated file" {
const io = std.testing.io;
// Two SRF blobs with dates at different byte offsets. A non-atomic
// writer that truncates + writes would leak partial bytes of
// whichever blob is mid-write; that would show up as either a
// partial date or a split field - both caught by strict equality
// against these two complete blobs.
const blob_a =
\\#!srfv1
\\#!expires=1780000000
\\#!created=1777000000
\\symbol::AAPL,date::2026-04-30,actual:num:2.78
\\symbol::MSFT,date::2026-01-29,actual:num:4.27
\\
;
const blob_b =
\\#!srfv1
\\#!expires=1780500000
\\#!created=1777500000
\\symbol::AMZN,date::2026-07-29,estimate:num:1.83
\\
;
try std.testing.expect(blob_a.len != blob_b.len); // ensure size changes
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", testing.allocator);
defer testing.allocator.free(dir_path);
// 0.16's DebugAllocator (which testing.allocator uses) is
// thread-safe by default, so no ThreadSafeAllocator wrapper needed.
var store = Store.init(std.testing.io, testing.allocator, dir_path);
// Make sure the symbol subdir exists once up-front (writeRaw will
// also call ensureSymbolDir, but doing it here keeps the writer
// hot loop lean).
try store.ensureSymbolDir("SYM");
const Ctx = struct {
store: *Store,
blob_a: []const u8,
blob_b: []const u8,
stop: std.atomic.Value(bool),
bad_reads: std.atomic.Value(u32),
completed_reads: std.atomic.Value(u32),
fn writer(self: *@This()) void {
var i: usize = 0;
while (!self.stop.load(.acquire)) : (i += 1) {
const bytes = if (i & 1 == 0) self.blob_a else self.blob_b;
// We don't care about errors here - worst case the
// writer just skips this iteration.
self.store.writeRaw("SYM", .candles_daily, bytes) catch {};
}
}
fn reader(self: *@This()) void {
const path = self.store.entryPath("SYM", DataType.candles_daily.fileName()) catch return;
defer self.store.allocator.free(path);
while (!self.stop.load(.acquire)) {
const bytes = std.Io.Dir.cwd().readFileAlloc(self.store.io, path, self.store.allocator, .limited(1 * 1024 * 1024)) catch |err| switch (err) {
error.FileNotFound => continue, // pre-first-write race
else => continue,
};
defer self.store.allocator.free(bytes);
// Every successful read must match exactly one of the
// two source blobs. A non-atomic writer would eventually
// produce reads that match neither (truncated, zero
// bytes, or mid-blob switchover garbage).
const matches = std.mem.eql(u8, bytes, self.blob_a) or std.mem.eql(u8, bytes, self.blob_b);
if (!matches) _ = self.bad_reads.fetchAdd(1, .monotonic);
_ = self.completed_reads.fetchAdd(1, .monotonic);
}
}
};
var ctx = Ctx{
.store = &store,
.blob_a = blob_a,
.blob_b = blob_b,
.stop = std.atomic.Value(bool).init(false),
.bad_reads = std.atomic.Value(u32).init(0),
.completed_reads = std.atomic.Value(u32).init(0),
};
var writer_thread = try std.Thread.spawn(.{}, Ctx.writer, .{&ctx});
var reader_threads: [3]std.Thread = undefined;
for (&reader_threads) |*t| t.* = try std.Thread.spawn(.{}, Ctx.reader, .{&ctx});
// Run the stress for a fixed duration. 200ms is plenty for thousands
// of iterations on any reasonable machine - enough to reliably catch
// a non-atomic write window at the scheduler granularity.
try io.sleep(.fromMilliseconds(200), .boot);
ctx.stop.store(true, .release);
writer_thread.join();
for (&reader_threads) |*t| t.join();
// The critical assertion: zero reads landed in a partial state.
const bad = ctx.bad_reads.load(.monotonic);
const total = ctx.completed_reads.load(.monotonic);
// Sanity: we actually did work (otherwise the test is a no-op).
try testing.expect(total > 0);
try testing.expectEqual(@as(u32, 0), bad);
}
test "appendRaw atomicity: concurrent readers see either pre- or post-append, never mid" {
const io = std.testing.io;
// Seed an initial file with a complete SRF doc, then have one thread
// append more records repeatedly while readers race to read it. Every
// successful read must parse cleanly and have a valid termination
// - no trailing partial record from an in-flight append.
const seed =
\\#!srfv1
\\#!created=1777000000
\\symbol::AAPL,date::2024-04-30,actual:num:1.53
\\
;
const chunk =
\\symbol::AAPL,date::2024-07-30,actual:num:1.65
\\
;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", testing.allocator);
defer testing.allocator.free(dir_path);
// 0.16's DebugAllocator (which testing.allocator uses) is
// thread-safe by default, so no ThreadSafeAllocator wrapper needed.
var store = Store.init(std.testing.io, testing.allocator, dir_path);
try store.ensureSymbolDir("SYM");
// Write seed atomically up front.
try store.writeRaw("SYM", .candles_daily, seed);
const Ctx = struct {
store: *Store,
chunk: []const u8,
stop: std.atomic.Value(bool),
bad_reads: std.atomic.Value(u32),
completed_reads: std.atomic.Value(u32),
fn appender(self: *@This()) void {
while (!self.stop.load(.acquire)) {
self.store.appendRaw("SYM", .candles_daily, self.chunk) catch {};
}
}
fn reader(self: *@This()) void {
const path = self.store.entryPath("SYM", DataType.candles_daily.fileName()) catch return;
defer self.store.allocator.free(path);
while (!self.stop.load(.acquire)) {
const bytes = std.Io.Dir.cwd().readFileAlloc(self.store.io, path, self.store.allocator, .limited(4 * 1024 * 1024)) catch continue;
defer self.store.allocator.free(bytes);
// Invariants for a valid appended file:
// 1. Starts with `#!srfv1\n` (the header is never torn).
// 2. Ends with `\n` (no partial trailing record).
// 3. Total length is `seed.len + k * chunk.len` for some
// integer k ≥ 0 - which, with atomic appends, is the
// only observable shape.
const ok_header = std.mem.startsWith(u8, bytes, "#!srfv1\n");
const ok_tail = bytes.len > 0 and bytes[bytes.len - 1] == '\n';
if (!ok_header or !ok_tail) {
_ = self.bad_reads.fetchAdd(1, .monotonic);
}
_ = self.completed_reads.fetchAdd(1, .monotonic);
}
}
};
var ctx = Ctx{
.store = &store,
.chunk = chunk,
.stop = std.atomic.Value(bool).init(false),
.bad_reads = std.atomic.Value(u32).init(0),
.completed_reads = std.atomic.Value(u32).init(0),
};
var appender_thread = try std.Thread.spawn(.{}, Ctx.appender, .{&ctx});
var reader_threads: [3]std.Thread = undefined;
for (&reader_threads) |*t| t.* = try std.Thread.spawn(.{}, Ctx.reader, .{&ctx});
try io.sleep(.fromMilliseconds(200), .boot);
ctx.stop.store(true, .release);
appender_thread.join();
for (&reader_threads) |*t| t.join();
const bad = ctx.bad_reads.load(.monotonic);
const total = ctx.completed_reads.load(.monotonic);
try testing.expect(total > 0);
try testing.expectEqual(@as(u32, 0), bad);
}