2530 lines
111 KiB
Zig
2530 lines
111 KiB
Zig
const std = @import("std");
|
|
const log = std.log.scoped(.cache);
|
|
const srf = @import("srf");
|
|
const atomic = @import("../atomic.zig");
|
|
const version = @import("../version.zig");
|
|
const Date = @import("../Date.zig");
|
|
const Candle = @import("../models/candle.zig").Candle;
|
|
const Dividend = @import("../models/dividend.zig").Dividend;
|
|
const DividendType = @import("../models/dividend.zig").DividendType;
|
|
const Split = @import("../models/split.zig").Split;
|
|
const EarningsEvent = @import("../models/earnings.zig").EarningsEvent;
|
|
const ReportTime = @import("../models/earnings.zig").ReportTime;
|
|
const EtfProfile = @import("../models/etf_profile.zig").EtfProfile;
|
|
const Holding = @import("../models/etf_profile.zig").Holding;
|
|
const SectorWeight = @import("../models/etf_profile.zig").SectorWeight;
|
|
|
|
// ── Wall-clock policy ────────────────────────────────────────
|
|
//
|
|
// Every `std.Io.Timestamp.now(...)` call in this file is intentional:
|
|
// the cache layer's job is to record *when data landed on disk* and
|
|
// compute expiry relative to that. Threading a `now_s: i64` in from the
|
|
// caller wouldn't buy anything — we'd just push the clock read up one
|
|
// frame. The torn-SRF diagnostic filenames (line ~473) additionally
|
|
// require millisecond precision to avoid collisions, which a
|
|
// caller-provided second-resolution `now_s` couldn't give us.
|
|
const Lot = @import("../models/portfolio.zig").Lot;
|
|
const LotType = @import("../models/portfolio.zig").LotType;
|
|
const Portfolio = @import("../models/portfolio.zig").Portfolio;
|
|
const OptionsChain = @import("../models/option.zig").OptionsChain;
|
|
const OptionContract = @import("../models/option.zig").OptionContract;
|
|
|
|
/// TTL durations in seconds for cache expiry.
|
|
pub const Ttl = struct {
|
|
const s_per_day = std.time.s_per_day;
|
|
/// Historical candles older than 1 day never expire
|
|
pub const candles_historical: i64 = -1; // infinite
|
|
/// Latest day's candle refreshes every 23h45m (15-min buffer for cron jitter)
|
|
pub const candles_latest: i64 = s_per_day - 15 * std.time.s_per_min;
|
|
/// Dividend data refreshes biweekly
|
|
pub const dividends: i64 = 14 * s_per_day;
|
|
/// Split data refreshes biweekly
|
|
pub const splits: i64 = 14 * s_per_day;
|
|
/// Options chains refresh hourly
|
|
pub const options: i64 = std.time.s_per_hour;
|
|
/// Earnings refresh monthly, with smart refresh after announcements
|
|
pub const earnings: i64 = 30 * s_per_day;
|
|
/// ETF profiles refresh monthly
|
|
pub const etf_profile: i64 = 30 * s_per_day;
|
|
};
|
|
|
|
pub const DataType = enum {
|
|
candles_daily,
|
|
candles_meta,
|
|
dividends,
|
|
splits,
|
|
options,
|
|
earnings,
|
|
etf_profile,
|
|
meta,
|
|
|
|
pub fn fileName(self: DataType) []const u8 {
|
|
return switch (self) {
|
|
.candles_daily => "candles_daily.srf",
|
|
.candles_meta => "candles_meta.srf",
|
|
.dividends => "dividends.srf",
|
|
.splits => "splits.srf",
|
|
.options => "options.srf",
|
|
.earnings => "earnings.srf",
|
|
.etf_profile => "etf_profile.srf",
|
|
.meta => "meta.srf",
|
|
};
|
|
}
|
|
|
|
pub fn ttl(self: DataType) i64 {
|
|
return switch (self) {
|
|
.dividends => Ttl.dividends,
|
|
.splits => Ttl.splits,
|
|
.options => Ttl.options,
|
|
.earnings => Ttl.earnings,
|
|
.etf_profile => Ttl.etf_profile,
|
|
.candles_daily, .candles_meta, .meta => 0,
|
|
};
|
|
}
|
|
};
|
|
|
|
/// Persistent SRF-backed cache with per-symbol, per-data-type files.
|
|
///
|
|
/// Layout:
|
|
/// {cache_dir}/{SYMBOL}/candles_daily.srf
|
|
/// {cache_dir}/{SYMBOL}/dividends.srf
|
|
/// {cache_dir}/{SYMBOL}/meta.srf
|
|
/// ...
|
|
pub const Store = struct {
|
|
cache_dir: []const u8,
|
|
allocator: std.mem.Allocator,
|
|
io: std.Io,
|
|
|
|
/// Optional post-processing callback applied to each record during deserialization.
|
|
/// Used to dupe strings that outlive the SRF iterator, or apply domain-specific transforms.
|
|
pub const PostProcessFn = fn (*anyopaque, std.mem.Allocator) anyerror!void;
|
|
|
|
pub fn init(io: std.Io, allocator: std.mem.Allocator, cache_dir: []const u8) Store {
|
|
return .{
|
|
.io = io,
|
|
.cache_dir = cache_dir,
|
|
.allocator = allocator,
|
|
};
|
|
}
|
|
|
|
// ── Generic typed API ────────────────────────────────────────
|
|
|
|
/// Map a model type to its cache DataType.
|
|
pub fn dataTypeFor(comptime T: type) DataType {
|
|
return switch (T) {
|
|
Candle => .candles_daily,
|
|
Dividend => .dividends,
|
|
Split => .splits,
|
|
EarningsEvent => .earnings,
|
|
OptionsChain => .options,
|
|
EtfProfile => .etf_profile,
|
|
else => @compileError("unsupported type for Store"),
|
|
};
|
|
}
|
|
|
|
/// The data payload for a given type: single struct for EtfProfile, slice for everything else.
|
|
pub fn DataFor(comptime T: type) type {
|
|
return if (T == EtfProfile) EtfProfile else []T;
|
|
}
|
|
|
|
pub fn CacheResult(comptime T: type) type {
|
|
return struct { data: DataFor(T), timestamp: i64 };
|
|
}
|
|
|
|
pub const Freshness = enum { fresh_only, any };
|
|
|
|
/// Read and deserialize cached data. With `.fresh_only`, returns null if stale.
|
|
/// With `.any`, returns data regardless of freshness.
|
|
pub fn read(
|
|
self: *Store,
|
|
comptime T: type,
|
|
symbol: []const u8,
|
|
comptime postProcess: ?*const fn (*T, std.mem.Allocator) anyerror!void,
|
|
comptime freshness: Freshness,
|
|
) ?CacheResult(T) {
|
|
const raw = self.readRaw(symbol, dataTypeFor(T)) catch return null;
|
|
const data = raw orelse return null;
|
|
defer self.allocator.free(data);
|
|
|
|
// Read-path self-heal: if a cached candles_daily file doesn't
|
|
// look like a complete SRF doc, archive the torn body for
|
|
// post-mortem and wipe the candle pair (daily + meta) so the
|
|
// next call re-fetches from scratch. Limited to Candle on
|
|
// purpose — it's the only data type with a recurring tear
|
|
// history + sibling meta to keep in sync, and the only one
|
|
// where a silent re-parse-miss every run would be wasted work.
|
|
if (T == Candle and !looksCompleteSrf(data)) {
|
|
self.selfHealTornCandles(symbol, data);
|
|
return null;
|
|
}
|
|
|
|
if (T == EtfProfile or T == OptionsChain) {
|
|
const is_negative = std.mem.eql(u8, data, negative_cache_content);
|
|
if (is_negative) {
|
|
if (freshness == .fresh_only) {
|
|
// Negative entries are always fresh — return empty data
|
|
if (T == EtfProfile)
|
|
return .{ .data = EtfProfile{ .symbol = "" }, .timestamp = std.Io.Timestamp.now(self.io, .real).toSeconds() };
|
|
if (T == OptionsChain)
|
|
return .{ .data = &.{}, .timestamp = std.Io.Timestamp.now(self.io, .real).toSeconds() };
|
|
}
|
|
return null;
|
|
}
|
|
|
|
var reader = std.Io.Reader.fixed(data);
|
|
var it = srf.iterator(&reader, self.allocator, .{ .alloc_strings = false }) catch return null;
|
|
defer it.deinit();
|
|
|
|
if (freshness == .fresh_only) {
|
|
if (it.expires == null) return null;
|
|
if (!it.isFresh(self.io)) return null;
|
|
}
|
|
const timestamp = it.created orelse std.Io.Timestamp.now(self.io, .real).toSeconds();
|
|
|
|
if (T == EtfProfile) {
|
|
const profile = deserializeEtfProfile(self.allocator, &it) catch return null;
|
|
return .{ .data = profile, .timestamp = timestamp };
|
|
}
|
|
if (T == OptionsChain) {
|
|
const items = deserializeOptions(self.allocator, &it) catch return null;
|
|
return .{ .data = items, .timestamp = timestamp };
|
|
}
|
|
}
|
|
|
|
return readSlice(T, self.io, self.allocator, data, postProcess, freshness);
|
|
}
|
|
|
|
/// Serialize data and write to cache with the given TTL.
|
|
/// Accepts a slice for most types, or a single struct for EtfProfile.
|
|
///
|
|
/// For `Dividend` and `Split`, this dispatches to `writeMerged`,
|
|
/// which performs sorted-union-with-existing semantics rather than
|
|
/// a clean overwrite. Both Tiingo's full-history view and
|
|
/// Polygon's targeted fetches converge to the same on-disk union
|
|
/// regardless of write order, and forward-looking entries from
|
|
/// Polygon are preserved across Tiingo refreshes.
|
|
pub fn write(
|
|
self: *Store,
|
|
comptime T: type,
|
|
symbol: []const u8,
|
|
items: DataFor(T),
|
|
ttl: i64,
|
|
) void {
|
|
self.writeWithSource(T, symbol, items, ttl, null);
|
|
}
|
|
|
|
/// Same as `write` but lets the caller attribute new entries to a
|
|
/// named source (e.g. `"tiingo"`). The source name appears in the
|
|
/// `info(cache)` log line emitted by `writeMerged` when a
|
|
/// previously-unseen dividend or split lands in the cache. For
|
|
/// types that don't go through the merge path, the hint is unused.
|
|
pub fn writeWithSource(
|
|
self: *Store,
|
|
comptime T: type,
|
|
symbol: []const u8,
|
|
items: DataFor(T),
|
|
ttl: i64,
|
|
source_hint: ?[]const u8,
|
|
) void {
|
|
if (T == Dividend or T == Split) {
|
|
self.writeMerged(T, symbol, items, ttl, source_hint);
|
|
return;
|
|
}
|
|
const expires = std.Io.Timestamp.now(self.io, .real).toSeconds() + ttl;
|
|
const data_type = dataTypeFor(T);
|
|
if (T == EtfProfile) {
|
|
const srf_data = serializeEtfProfile(self.io, self.allocator, items, .{ .expires = expires }) catch |err| {
|
|
log.warn("{s}: failed to serialize ETF profile: {s}", .{ symbol, @errorName(err) });
|
|
return;
|
|
};
|
|
defer self.allocator.free(srf_data);
|
|
self.writeRaw(symbol, data_type, srf_data) catch |err| {
|
|
log.warn("{s}: failed to write ETF profile to cache: {s}", .{ symbol, @errorName(err) });
|
|
};
|
|
return;
|
|
}
|
|
if (T == OptionsChain) {
|
|
const srf_data = serializeOptions(self.io, self.allocator, items, .{ .expires = expires }) catch |err| {
|
|
log.warn("{s}: failed to serialize options: {s}", .{ symbol, @errorName(err) });
|
|
return;
|
|
};
|
|
defer self.allocator.free(srf_data);
|
|
self.writeRaw(symbol, data_type, srf_data) catch |err| {
|
|
log.warn("{s}: failed to write options to cache: {s}", .{ symbol, @errorName(err) });
|
|
};
|
|
return;
|
|
}
|
|
const srf_data = serializeWithMeta(T, self.io, self.allocator, items, .{ .expires = expires }) catch |err| {
|
|
log.warn("{s}: failed to serialize {s}: {s}", .{ symbol, @tagName(data_type), @errorName(err) });
|
|
return;
|
|
};
|
|
defer self.allocator.free(srf_data);
|
|
self.writeRaw(symbol, data_type, srf_data) catch |err| {
|
|
log.warn("{s}: failed to write {s} to cache: {s}", .{ symbol, @tagName(data_type), @errorName(err) });
|
|
};
|
|
}
|
|
|
|
/// Sorted-union write for `Dividend` and `Split`. Reads the
|
|
/// existing cache file, merges `incoming` into it, sorts the
|
|
/// union descending by date, and writes the result. If nothing
|
|
/// changed, the existing file is left untouched (no mtime bump,
|
|
/// no I/O).
|
|
///
|
|
/// Two kinds of merge happen on each incoming entry:
|
|
///
|
|
/// **New key** — incoming entry's key (ex_date / split.date) is
|
|
/// not in the existing data. Append it; emit an `info(cache)
|
|
/// supplied` log line so the user is alerted when a source
|
|
/// surfaces a corporate action that wasn't already known. The
|
|
/// canonical case is Tiingo discovering SPYM's 2017-10-16 4:1
|
|
/// split that Polygon's reference endpoint doesn't return.
|
|
///
|
|
/// **Existing key, field-level upgrade** — incoming entry's key
|
|
/// matches an existing entry. For `Dividend`, walk the optional
|
|
/// fields (`pay_date`, `record_date`, `type`, `currency`) and
|
|
/// fill in any nulls on the existing record from the incoming
|
|
/// record's non-null values. Don't overwrite non-null fields.
|
|
/// `type = .unknown` counts as null-equivalent. This means
|
|
/// Polygon's richer Dividend records can fill in metadata
|
|
/// regardless of whether Tiingo wrote first or Polygon did —
|
|
/// the on-disk record is the union of all sources' knowledge,
|
|
/// with conflicts resolved by "first non-null wins." Each field
|
|
/// upgrade emits its own `info(cache) upgraded` log line.
|
|
///
|
|
/// `Split` has no optional fields, so the field-level path is a
|
|
/// no-op for splits and the merge collapses to "skip if key
|
|
/// already exists."
|
|
///
|
|
/// `source_hint`, when present, names the source in the log
|
|
/// lines (e.g. `"polygon"` / `"tiingo"`). Defaults to
|
|
/// `"fetch"` when null.
|
|
fn writeMerged(
|
|
self: *Store,
|
|
comptime T: type,
|
|
symbol: []const u8,
|
|
incoming: []const T,
|
|
ttl: i64,
|
|
source_hint: ?[]const u8,
|
|
) void {
|
|
comptime std.debug.assert(T == Dividend or T == Split);
|
|
|
|
// Read existing entries (any freshness; we want the union of
|
|
// what's on disk, not just fresh data).
|
|
//
|
|
// The post-process callback dupes any heap-allocated string
|
|
// fields (Dividend.currency) into stable memory because the
|
|
// SRF iterator's backing buffer is freed inside `read()` and
|
|
// un-duped strings would become dangling pointers as soon as
|
|
// we return. The matching `deinit` in the cleanup `defer`
|
|
// below frees these duped strings after we're done with the
|
|
// merged list. Keep the post-process logic in lockstep with
|
|
// the deinit handling — they're a pair.
|
|
const existing_result = self.read(T, symbol, mergePostProcess(T), .any);
|
|
const existing: []const T = if (existing_result) |r| r.data else &.{};
|
|
defer if (existing_result != null) {
|
|
if (comptime @hasDecl(T, "deinit")) {
|
|
for (existing) |item| item.deinit(self.allocator);
|
|
}
|
|
self.allocator.free(existing);
|
|
};
|
|
|
|
// Build the union. Start with a mutable copy of existing
|
|
// entries, then either upgrade in place or append new
|
|
// entries from incoming.
|
|
var merged: std.ArrayList(T) = .empty;
|
|
defer merged.deinit(self.allocator);
|
|
merged.appendSlice(self.allocator, existing) catch return;
|
|
|
|
var added: usize = 0;
|
|
var upgraded: usize = 0;
|
|
for (incoming) |item| {
|
|
const key = mergeKey(T, item);
|
|
if (findKeyIndex(T, merged.items, key)) |idx| {
|
|
// Existing entry — try to upgrade its optional fields
|
|
// from the incoming entry's non-null values.
|
|
upgraded += upgradeRecord(T, &merged.items[idx], item, symbol, source_hint);
|
|
} else {
|
|
// New entry — append.
|
|
merged.append(self.allocator, item) catch return;
|
|
added += 1;
|
|
logSupplied(T, symbol, item, source_hint);
|
|
}
|
|
}
|
|
|
|
if (added == 0 and upgraded == 0) {
|
|
// Nothing changed — leave the file untouched. This is
|
|
// the common case for repeated Polygon/Tiingo refreshes.
|
|
return;
|
|
}
|
|
|
|
// Sort descending by date (newest first), matching the
|
|
// existing on-disk convention.
|
|
std.mem.sort(T, merged.items, {}, lessByDateDesc(T));
|
|
|
|
// Serialize via the same generic path as `write` for
|
|
// non-merged types, but write the union we just built.
|
|
const expires = std.Io.Timestamp.now(self.io, .real).toSeconds() + ttl;
|
|
const data_type = dataTypeFor(T);
|
|
const srf_data = serializeWithMeta(T, self.io, self.allocator, merged.items, .{ .expires = expires }) catch |err| {
|
|
log.warn("{s}: failed to serialize {s}: {s}", .{ symbol, @tagName(data_type), @errorName(err) });
|
|
return;
|
|
};
|
|
defer self.allocator.free(srf_data);
|
|
self.writeRaw(symbol, data_type, srf_data) catch |err| {
|
|
log.warn("{s}: failed to write {s} to cache: {s}", .{ symbol, @tagName(data_type), @errorName(err) });
|
|
};
|
|
}
|
|
|
|
fn mergeKey(comptime T: type, item: T) i32 {
|
|
if (T == Dividend) return item.ex_date.days;
|
|
if (T == Split) return item.date.days;
|
|
@compileError("mergeKey only defined for Dividend and Split");
|
|
}
|
|
|
|
/// Post-process callback for the merge primitive's `read` call.
|
|
/// Dupes any heap-allocated string fields into stable memory so
|
|
/// the `existing` slice's records survive past the SRF
|
|
/// iterator's backing buffer being freed. Paired with each
|
|
/// type's `deinit` to release the duped strings after merge.
|
|
/// Splits have no string fields so the callback is null.
|
|
fn mergePostProcess(comptime T: type) ?*const fn (*T, std.mem.Allocator) anyerror!void {
|
|
return switch (T) {
|
|
Dividend => &struct {
|
|
fn pp(div: *Dividend, allocator: std.mem.Allocator) anyerror!void {
|
|
if (div.currency) |c| {
|
|
div.currency = try allocator.dupe(u8, c);
|
|
}
|
|
}
|
|
}.pp,
|
|
Split => null,
|
|
else => @compileError("mergePostProcess only defined for Dividend and Split"),
|
|
};
|
|
}
|
|
|
|
fn findKeyIndex(comptime T: type, items: []const T, key: i32) ?usize {
|
|
for (items, 0..) |it, i| {
|
|
if (mergeKey(T, it) == key) return i;
|
|
}
|
|
return null;
|
|
}
|
|
|
|
fn lessByDateDesc(comptime T: type) fn (void, T, T) bool {
|
|
return struct {
|
|
fn lt(_: void, a: T, b: T) bool {
|
|
return mergeKey(T, a) > mergeKey(T, b);
|
|
}
|
|
}.lt;
|
|
}
|
|
|
|
/// Field-level upgrade for an existing record using values from
|
|
/// an incoming record with the same merge key. Returns the
|
|
/// number of fields actually upgraded so the caller can decide
|
|
/// whether to write the file. For `Split` this is always 0
|
|
/// (no optional fields). For `Dividend` it walks `pay_date`,
|
|
/// `record_date`, `type`, `currency` and fills nulls from the
|
|
/// incoming record's non-null values. `type = .unknown` is
|
|
/// treated as null-equivalent so a `.regular` from Polygon can
|
|
/// upgrade an `.unknown` from Tiingo.
|
|
fn upgradeRecord(
|
|
comptime T: type,
|
|
existing: *T,
|
|
incoming: T,
|
|
symbol: []const u8,
|
|
source_hint: ?[]const u8,
|
|
) usize {
|
|
if (T == Split) return 0;
|
|
if (T != Dividend) @compileError("upgradeRecord only defined for Dividend and Split");
|
|
|
|
const source = source_hint orelse "fetch";
|
|
var count: usize = 0;
|
|
const key = existing.ex_date;
|
|
|
|
if (existing.pay_date == null and incoming.pay_date != null) {
|
|
existing.pay_date = incoming.pay_date;
|
|
log.info("{s}: {s} upgraded ex_date {f}: pay_date null -> {f}", .{
|
|
symbol, source, key, incoming.pay_date.?,
|
|
});
|
|
count += 1;
|
|
}
|
|
if (existing.record_date == null and incoming.record_date != null) {
|
|
existing.record_date = incoming.record_date;
|
|
log.info("{s}: {s} upgraded ex_date {f}: record_date null -> {f}", .{
|
|
symbol, source, key, incoming.record_date.?,
|
|
});
|
|
count += 1;
|
|
}
|
|
if (existing.type == .unknown and incoming.type != .unknown) {
|
|
existing.type = incoming.type;
|
|
log.info("{s}: {s} upgraded ex_date {f}: type unknown -> {s}", .{
|
|
symbol, source, key, @tagName(incoming.type),
|
|
});
|
|
count += 1;
|
|
}
|
|
if (existing.currency == null and incoming.currency != null) {
|
|
// Borrow the incoming string. The merged list lives only
|
|
// until serialization completes inside writeMerged, and
|
|
// serialization makes its own copy via SRF format. The
|
|
// backing memory is owned by `incoming` (kept alive by
|
|
// the caller's slice), and `existing.currency`'s
|
|
// original null state means there's nothing to free.
|
|
existing.currency = incoming.currency;
|
|
log.info("{s}: {s} upgraded ex_date {f}: currency null -> {s}", .{
|
|
symbol, source, key, incoming.currency.?,
|
|
});
|
|
count += 1;
|
|
}
|
|
|
|
return count;
|
|
}
|
|
|
|
fn logSupplied(comptime T: type, symbol: []const u8, item: T, source_hint: ?[]const u8) void {
|
|
const source = source_hint orelse "fetch";
|
|
if (T == Dividend) {
|
|
log.info("{s}: {s} supplied dividend ex_date {f} amount ${d:.4}", .{
|
|
symbol, source, item.ex_date, item.amount,
|
|
});
|
|
} else if (T == Split) {
|
|
log.info("{s}: {s} supplied split {f} {d}:{d}", .{
|
|
symbol, source, item.date, @as(u64, @intFromFloat(item.numerator)), @as(u64, @intFromFloat(item.denominator)),
|
|
});
|
|
}
|
|
}
|
|
|
|
// ── Candle-specific API ──────────────────────────────────────
|
|
|
|
/// Write a full set of candles to cache (no expiry — historical facts don't expire).
|
|
/// Also updates candle metadata.
|
|
pub fn cacheCandles(self: *Store, symbol: []const u8, candles: []const Candle, provider: CandleProvider, fail_count: u8) void {
|
|
if (serializeCandles(self.allocator, candles, .{})) |srf_data| {
|
|
defer self.allocator.free(srf_data);
|
|
self.writeRaw(symbol, .candles_daily, srf_data) catch |err| {
|
|
log.warn("{s}: failed to write candles to cache: {s}", .{ symbol, @errorName(err) });
|
|
};
|
|
} else |err| {
|
|
log.warn("{s}: failed to serialize candles: {s}", .{ symbol, @errorName(err) });
|
|
}
|
|
|
|
if (candles.len > 0) {
|
|
const last = candles[candles.len - 1];
|
|
self.updateCandleMeta(symbol, last.close, last.date, provider, fail_count);
|
|
}
|
|
}
|
|
|
|
/// Append new candle records to the existing cache file.
|
|
/// Falls back to a full rewrite if append fails (e.g. file doesn't exist).
|
|
/// Also updates candle metadata.
|
|
pub fn appendCandles(self: *Store, symbol: []const u8, new_candles: []const Candle, provider: CandleProvider, fail_count: u8) void {
|
|
if (new_candles.len == 0) return;
|
|
|
|
if (serializeCandles(self.allocator, new_candles, .{ .emit_directives = false })) |srf_data| {
|
|
defer self.allocator.free(srf_data);
|
|
self.appendRaw(symbol, .candles_daily, srf_data) catch |append_err| {
|
|
// Append failed (file missing?) — fall back to full load + rewrite
|
|
log.debug("{s}: append failed ({s}), falling back to full rewrite", .{ symbol, @errorName(append_err) });
|
|
if (self.read(Candle, symbol, null, .any)) |existing| {
|
|
defer self.allocator.free(existing.data);
|
|
const merged = self.allocator.alloc(Candle, existing.data.len + new_candles.len) catch return;
|
|
defer self.allocator.free(merged);
|
|
@memcpy(merged[0..existing.data.len], existing.data);
|
|
@memcpy(merged[existing.data.len..], new_candles);
|
|
if (serializeCandles(self.allocator, merged, .{})) |full_data| {
|
|
defer self.allocator.free(full_data);
|
|
self.writeRaw(symbol, .candles_daily, full_data) catch |err| {
|
|
log.warn("{s}: failed to write merged candles to cache: {s}", .{ symbol, @errorName(err) });
|
|
};
|
|
} else |err| {
|
|
log.warn("{s}: failed to serialize merged candles: {s}", .{ symbol, @errorName(err) });
|
|
}
|
|
}
|
|
};
|
|
} else |err| {
|
|
log.warn("{s}: failed to serialize new candles for append: {s}", .{ symbol, @errorName(err) });
|
|
}
|
|
|
|
const last = new_candles[new_candles.len - 1];
|
|
self.updateCandleMeta(symbol, last.close, last.date, provider, fail_count);
|
|
}
|
|
|
|
/// Write (or refresh) candle metadata with a specific provider source.
|
|
pub fn updateCandleMeta(self: *Store, symbol: []const u8, last_close: f64, last_date: Date, provider: CandleProvider, fail_count: u8) void {
|
|
const expires = std.Io.Timestamp.now(self.io, .real).toSeconds() + Ttl.candles_latest;
|
|
const meta = CandleMeta{
|
|
.last_close = last_close,
|
|
.last_date = last_date,
|
|
.provider = provider,
|
|
.fail_count = fail_count,
|
|
};
|
|
if (serializeCandleMeta(self.io, self.allocator, meta, .{ .expires = expires })) |meta_data| {
|
|
defer self.allocator.free(meta_data);
|
|
self.writeRaw(symbol, .candles_meta, meta_data) catch |err| {
|
|
log.warn("{s}: failed to write candle metadata: {s}", .{ symbol, @errorName(err) });
|
|
};
|
|
} else |err| {
|
|
log.warn("{s}: failed to serialize candle metadata: {s}", .{ symbol, @errorName(err) });
|
|
}
|
|
}
|
|
|
|
// ── Cache management ─────────────────────────────────────────
|
|
|
|
/// Ensure the cache directory for a symbol exists.
|
|
pub fn ensureSymbolDir(self: *Store, symbol: []const u8) !void {
|
|
const path = try self.symbolPath(symbol, "");
|
|
defer self.allocator.free(path);
|
|
std.Io.Dir.cwd().createDirPath(self.io, path) catch |err| switch (err) {
|
|
error.PathAlreadyExists => {},
|
|
else => return err,
|
|
};
|
|
}
|
|
|
|
/// Clear all cached data for a symbol.
|
|
pub fn clearSymbol(self: *Store, symbol: []const u8) !void {
|
|
const path = try self.symbolPath(symbol, "");
|
|
defer self.allocator.free(path);
|
|
std.Io.Dir.cwd().deleteTree(self.io, path) catch {};
|
|
}
|
|
|
|
/// Content of a negative cache entry (fetch failed, don't retry until --refresh).
|
|
pub const negative_cache_content = "#!srfv1\n# fetch_failed\n";
|
|
|
|
/// Write a negative cache entry for a symbol + data type.
|
|
/// This records that a fetch was attempted and failed, preventing repeated
|
|
/// network requests for symbols that will never resolve.
|
|
/// Cleared by --refresh (which calls clearData/invalidate).
|
|
pub fn writeNegative(self: *Store, symbol: []const u8, data_type: DataType) void {
|
|
self.writeRaw(symbol, data_type, negative_cache_content) catch {};
|
|
}
|
|
|
|
/// Validate that a byte buffer looks like a complete SRF file.
|
|
///
|
|
/// A well-formed SRF file should:
|
|
/// - Be non-empty.
|
|
/// - Start with the `#!srfv1` version directive (the negative-cache
|
|
/// marker also starts this way, so a single check covers both).
|
|
/// - End with a newline. Every record and every directive is
|
|
/// newline-terminated; a completely-written file always has a
|
|
/// trailing `\n`.
|
|
///
|
|
/// Torn HTTP body writes land short of the final newline. The FRDM
|
|
/// corruption on 2026-05-02 is the canonical example: the tail of
|
|
/// `candles_daily.srf` ended with `date::2026-04` mid-record, no
|
|
/// comma, no OHLCV fields, no newline. Atomic file writes correctly
|
|
/// persist that truncated body — `writeFileAtomic` is about rename
|
|
/// integrity, not payload validity. This helper is the payload-
|
|
/// validity check that cache-write callers should gate on when the
|
|
/// bytes come from an untrusted source (server sync, external file
|
|
/// import). Locally-serialized payloads are complete by construction
|
|
/// and don't need to run through this.
|
|
pub fn looksCompleteSrf(data: []const u8) bool {
|
|
if (data.len == 0) return false;
|
|
if (!std.mem.startsWith(u8, data, "#!srfv1")) return false;
|
|
if (data[data.len - 1] != '\n') return false;
|
|
return true;
|
|
}
|
|
|
|
/// Why a body was classified as torn. Surfaced in the archived
|
|
/// `.meta` sidecar so post-mortems can distinguish between detection
|
|
/// paths.
|
|
pub const FailureReason = enum {
|
|
/// `looksCompleteSrf` rejected the body bytes after an HTTP
|
|
/// response. The data reached us from the network but didn't
|
|
/// satisfy the structural sanity check.
|
|
looks_complete_srf_failed,
|
|
/// A cached file on disk failed to parse when read. Indicates
|
|
/// either an earlier torn write that slipped past the guard, or
|
|
/// local filesystem corruption.
|
|
srf_parse_failed,
|
|
/// The integrity digest advertised by the server (e.g. an
|
|
/// `ETag: "sha256:..."`) did not match the body bytes we
|
|
/// received. Reserved for the HTTP integrity work in Milestone 2.
|
|
etag_mismatch,
|
|
};
|
|
|
|
/// Context captured alongside a torn body. All fields are optional
|
|
/// because the exact detection site determines which signals are
|
|
/// available. Consumed by `archiveTornBody` to produce the SRF
|
|
/// `.meta` sidecar.
|
|
pub const TornMeta = struct {
|
|
failure_reason: FailureReason,
|
|
/// HTTP status code when the body originated from an HTTP
|
|
/// response. Null for parse failures discovered on local read.
|
|
http_status: ?u16 = null,
|
|
/// Raw `Content-Length` header value from the HTTP response,
|
|
/// when the detection path has access to it.
|
|
http_content_length: ?u64 = null,
|
|
/// Source URL when the body came from an HTTP response.
|
|
server_url: ?[]const u8 = null,
|
|
/// Raw value of the server's `ETag` header, if present. Captured
|
|
/// verbatim (including surrounding quotes and any `sha256:` or
|
|
/// other scheme prefix) so post-mortem can see exactly what the
|
|
/// server advertised. Paired with `body_sha256` in the sidecar,
|
|
/// an `etag_mismatch` failure is trivially explained.
|
|
server_etag: ?[]const u8 = null,
|
|
};
|
|
|
|
/// Schema for the SRF `.meta` sidecar emitted by `archiveTornBody`.
|
|
/// Each field becomes a `key:type:value` entry in a single record
|
|
/// under a `#!srfv1` header. Optional fields with `null` defaults
|
|
/// are silently skipped by `srf.fmtFrom` when unset — which is the
|
|
/// behavior we want for the http_*, server_*, and `?[]const u8`
|
|
/// fields that only some detection paths populate.
|
|
const TearRecord = struct {
|
|
/// Fixed marker so `_torn/*.meta` is distinguishable at a glance
|
|
/// from any other SRF record a future reader might encounter in
|
|
/// the cache tree.
|
|
type: []const u8,
|
|
symbol: []const u8,
|
|
data_type: DataType,
|
|
unix_ts: i64,
|
|
iso_ts: []const u8,
|
|
body_length: u64,
|
|
body_sha256: []const u8,
|
|
failure_reason: FailureReason,
|
|
zfin_commit: []const u8,
|
|
http_status: ?u16 = null,
|
|
http_content_length: ?u64 = null,
|
|
server_url: ?[]const u8 = null,
|
|
server_etag: ?[]const u8 = null,
|
|
last_200_bytes_hex: []const u8,
|
|
};
|
|
|
|
/// Archive a torn cache body for post-mortem diagnosis.
|
|
///
|
|
/// Writes two sibling files under `{cache_dir}/_torn/`:
|
|
/// `{symbol}_{data_type}_{unix_ms}.bin` — the raw bytes as received
|
|
/// `{symbol}_{data_type}_{unix_ms}.meta` — SRF-formatted context
|
|
///
|
|
/// Filenames carry a millisecond-resolution timestamp so a retry
|
|
/// loop that tears twice within the same wall-clock second
|
|
/// produces distinct archive pairs — two back-to-back captures are
|
|
/// the most valuable forensic signal we can produce (byte offsets,
|
|
/// tail shapes, and time deltas are all impossible to infer from a
|
|
/// single failure).
|
|
///
|
|
/// The `.meta` file mirrors the rest of the cache's on-disk format
|
|
/// so tooling can iterate it with the standard SRF reader. See
|
|
/// `TearRecord` above for the schema. Required fields are always
|
|
/// emitted; `http_status`, `http_content_length`, `server_url`,
|
|
/// and `server_etag` appear only when the caller populated them
|
|
/// in `TornMeta`.
|
|
///
|
|
/// Best-effort: a failure here should never mask the original
|
|
/// detection path's return value. Callers log.debug the outcome and
|
|
/// move on.
|
|
pub fn archiveTornBody(
|
|
io: std.Io,
|
|
allocator: std.mem.Allocator,
|
|
cache_dir: []const u8,
|
|
symbol: []const u8,
|
|
data_type: DataType,
|
|
bytes: []const u8,
|
|
meta: TornMeta,
|
|
) !void {
|
|
// Ensure the _torn/ directory exists.
|
|
const torn_dir = try std.fs.path.join(allocator, &.{ cache_dir, "_torn" });
|
|
defer allocator.free(torn_dir);
|
|
std.Io.Dir.cwd().createDirPath(io, torn_dir) catch |err| switch (err) {
|
|
error.PathAlreadyExists => {},
|
|
else => return err,
|
|
};
|
|
|
|
// Second-resolution timestamp for the SRF `unix_ts` field (plus
|
|
// ISO rendering). Millisecond-resolution timestamp is used in
|
|
// the filename so retries within the same wall-clock second
|
|
// produce distinct archive entries rather than overwriting each
|
|
// other — two back-to-back tears from a refresh retry are the
|
|
// most valuable forensic signal we can capture.
|
|
const ts = std.Io.Timestamp.now(io, .real).toSeconds();
|
|
const ts_ms = @divTrunc(std.Io.Timestamp.now(io, .real).nanoseconds, std.time.ns_per_ms);
|
|
|
|
const bin_name = try std.fmt.allocPrint(
|
|
allocator,
|
|
"{s}_{s}_{d}.bin",
|
|
.{ symbol, @tagName(data_type), ts_ms },
|
|
);
|
|
defer allocator.free(bin_name);
|
|
const bin_path = try std.fs.path.join(allocator, &.{ torn_dir, bin_name });
|
|
defer allocator.free(bin_path);
|
|
|
|
const meta_name = try std.fmt.allocPrint(
|
|
allocator,
|
|
"{s}_{s}_{d}.meta",
|
|
.{ symbol, @tagName(data_type), ts_ms },
|
|
);
|
|
defer allocator.free(meta_name);
|
|
const meta_path = try std.fs.path.join(allocator, &.{ torn_dir, meta_name });
|
|
defer allocator.free(meta_path);
|
|
|
|
// Write the raw body first — if this fails we don't bother with
|
|
// the sidecar, since the sidecar is only useful paired with bytes.
|
|
try atomic.writeFileAtomic(io, allocator, bin_path, bytes);
|
|
|
|
// Compute sha256 of the body for the sidecar.
|
|
var hash: [std.crypto.hash.sha2.Sha256.digest_length]u8 = undefined;
|
|
std.crypto.hash.sha2.Sha256.hash(bytes, &hash, .{});
|
|
var hash_hex: [std.crypto.hash.sha2.Sha256.digest_length * 2]u8 = undefined;
|
|
_ = std.fmt.bufPrint(&hash_hex, "{x}", .{&hash}) catch unreachable;
|
|
|
|
// ISO-8601 UTC timestamp — computed by hand to avoid pulling in
|
|
// a dependency. Format: YYYY-MM-DDTHH:MM:SSZ.
|
|
const epoch_seconds = std.time.epoch.EpochSeconds{ .secs = @intCast(ts) };
|
|
const day_seconds = epoch_seconds.getDaySeconds();
|
|
const epoch_day = epoch_seconds.getEpochDay();
|
|
const year_day = epoch_day.calculateYearDay();
|
|
const month_day = year_day.calculateMonthDay();
|
|
var iso_buf: [32]u8 = undefined;
|
|
const iso_ts = std.fmt.bufPrint(&iso_buf, "{d:0>4}-{d:0>2}-{d:0>2}T{d:0>2}:{d:0>2}:{d:0>2}Z", .{
|
|
year_day.year,
|
|
@intFromEnum(month_day.month),
|
|
month_day.day_index + 1,
|
|
day_seconds.getHoursIntoDay(),
|
|
day_seconds.getMinutesIntoHour(),
|
|
day_seconds.getSecondsIntoMinute(),
|
|
}) catch unreachable;
|
|
|
|
// Last 200 bytes as lowercase hex so the sidecar is
|
|
// grep-friendly regardless of what binary garbage sits in the
|
|
// torn tail.
|
|
const tail_len: usize = @min(bytes.len, 200);
|
|
const tail = bytes[bytes.len - tail_len ..];
|
|
const tail_hex = try allocator.alloc(u8, tail_len * 2);
|
|
defer allocator.free(tail_hex);
|
|
_ = std.fmt.bufPrint(tail_hex, "{x}", .{tail}) catch unreachable;
|
|
|
|
const record = TearRecord{
|
|
.type = "tear_metadata",
|
|
.symbol = symbol,
|
|
.data_type = data_type,
|
|
.unix_ts = ts,
|
|
.iso_ts = iso_ts,
|
|
.body_length = bytes.len,
|
|
.body_sha256 = &hash_hex,
|
|
.failure_reason = meta.failure_reason,
|
|
.zfin_commit = version.version_string,
|
|
.http_status = meta.http_status,
|
|
.http_content_length = meta.http_content_length,
|
|
.server_url = meta.server_url,
|
|
.server_etag = meta.server_etag,
|
|
.last_200_bytes_hex = tail_hex,
|
|
};
|
|
|
|
var aw: std.Io.Writer.Allocating = .init(allocator);
|
|
defer aw.deinit();
|
|
const records = [_]TearRecord{record};
|
|
try aw.writer.print("{f}", .{srf.fmtFrom(TearRecord, allocator, &records, .{})});
|
|
|
|
try atomic.writeFileAtomic(io, allocator, meta_path, aw.writer.buffered());
|
|
}
|
|
|
|
/// Read-path self-heal for candle data. On detecting a torn
|
|
/// `candles_daily.srf` at read time, archive the bytes with full
|
|
/// diagnostic context, then wipe both `candles_daily.srf` and its
|
|
/// companion `candles_meta.srf` so the next fetch cycle pulls a
|
|
/// clean pair.
|
|
///
|
|
/// Best-effort throughout — archive failures are logged at debug
|
|
/// and do NOT block the cache invalidation. The goal is to keep
|
|
/// the read path recoverable; diagnostics are a bonus.
|
|
fn selfHealTornCandles(self: *Store, symbol: []const u8, data: []const u8) void {
|
|
archiveTornBody(
|
|
self.io,
|
|
self.allocator,
|
|
self.cache_dir,
|
|
symbol,
|
|
.candles_daily,
|
|
data,
|
|
.{ .failure_reason = .srf_parse_failed },
|
|
) catch |err| {
|
|
log.debug(
|
|
"{s}: failed to archive torn candles_daily body ({d} bytes): {s}",
|
|
.{ symbol, data.len, @errorName(err) },
|
|
);
|
|
};
|
|
log.debug(
|
|
"{s}: self-heal wiping torn candles_daily+candles_meta pair ({d} bytes)",
|
|
.{ symbol, data.len },
|
|
);
|
|
self.clearData(symbol, .candles_daily);
|
|
self.clearData(symbol, .candles_meta);
|
|
}
|
|
|
|
/// Check if a cached data file is a negative entry (fetch_failed marker).
|
|
/// Negative entries are always considered "fresh" -- they never expire.
|
|
pub fn isNegative(self: *Store, symbol: []const u8, data_type: DataType) bool {
|
|
const path = self.symbolPath(symbol, data_type.fileName()) catch return false;
|
|
defer self.allocator.free(path);
|
|
|
|
const file = std.Io.Dir.cwd().openFile(self.io, path, .{}) catch return false;
|
|
defer file.close(self.io);
|
|
|
|
var buf: [negative_cache_content.len]u8 = undefined;
|
|
var file_reader = file.reader(self.io, &.{});
|
|
const n = file_reader.interface.readSliceShort(&buf) catch return false;
|
|
return n == negative_cache_content.len and
|
|
std.mem.eql(u8, buf[0..n], negative_cache_content);
|
|
}
|
|
|
|
/// Clear a specific data type for a symbol.
|
|
pub fn clearData(self: *Store, symbol: []const u8, data_type: DataType) void {
|
|
const path = self.symbolPath(symbol, data_type.fileName()) catch return;
|
|
defer self.allocator.free(path);
|
|
std.Io.Dir.cwd().deleteFile(self.io, path) catch {};
|
|
}
|
|
|
|
/// Read the close price from the candle metadata file.
|
|
/// Returns null if no metadata exists.
|
|
pub fn readLastClose(self: *Store, symbol: []const u8) ?f64 {
|
|
const raw = self.readRaw(symbol, .candles_meta) catch return null;
|
|
const data = raw orelse return null;
|
|
defer self.allocator.free(data);
|
|
const meta = deserializeCandleMeta(self.allocator, data) catch return null;
|
|
return meta.last_close;
|
|
}
|
|
|
|
/// Read the full candle metadata (last_close, last_date) plus the `#!created=` timestamp.
|
|
/// Returns null if no metadata exists.
|
|
pub fn readCandleMeta(self: *Store, symbol: []const u8) ?struct { meta: CandleMeta, created: i64 } {
|
|
const raw = self.readRaw(symbol, .candles_meta) catch return null;
|
|
const data = raw orelse return null;
|
|
defer self.allocator.free(data);
|
|
|
|
var reader = std.Io.Reader.fixed(data);
|
|
var it = srf.iterator(&reader, self.allocator, .{ .alloc_strings = false }) catch return null;
|
|
defer it.deinit();
|
|
|
|
const created = it.created orelse std.Io.Timestamp.now(self.io, .real).toSeconds();
|
|
const fields = (it.next() catch return null) orelse return null;
|
|
const meta = fields.to(CandleMeta) catch return null;
|
|
return .{ .meta = meta, .created = created };
|
|
}
|
|
|
|
/// Check if candle metadata is fresh using the embedded `#!expires=` directive.
|
|
pub fn isCandleMetaFresh(self: *Store, symbol: []const u8) bool {
|
|
const raw = self.readRaw(symbol, .candles_meta) catch return false;
|
|
const data = raw orelse return false;
|
|
defer self.allocator.free(data);
|
|
|
|
if (std.mem.indexOf(u8, data, "# fetch_failed")) |_| return true;
|
|
|
|
var reader = std.Io.Reader.fixed(data);
|
|
const it = srf.iterator(&reader, self.allocator, .{}) catch return false;
|
|
defer it.deinit();
|
|
|
|
if (it.expires == null) return false;
|
|
return it.isFresh(self.io);
|
|
}
|
|
|
|
/// Clear all cached data.
|
|
pub fn clearAll(self: *Store) !void {
|
|
std.Io.Dir.cwd().deleteTree(self.io, self.cache_dir) catch {};
|
|
}
|
|
|
|
// ── Public types ─────────────────────────────────────────────
|
|
|
|
/// Metadata stored in the separate candles_meta.srf file.
|
|
/// Allows fast price lookups and freshness checks without parsing the full candle file.
|
|
/// The `#!created=` directive tracks when this metadata was written (replaces fetched_at).
|
|
pub const CandleMeta = struct {
|
|
last_close: f64,
|
|
last_date: Date,
|
|
/// Which provider sourced the candle data. **No default
|
|
/// value on purpose** — SRF auto-elides fields whose value
|
|
/// equals their default, which would hide the provider line
|
|
/// when it equaled the implicit default. We want every cache
|
|
/// file to record its provider explicitly so cache inspection
|
|
/// can always answer "where did this come from?". Construction
|
|
/// sites must pass the provider explicitly.
|
|
///
|
|
/// Cache compatibility: pre-2026-05 caches that elided the
|
|
/// provider field will fail to deserialize after this change
|
|
/// (SRF returns FieldNotFoundOnFieldWithoutDefaultValue).
|
|
/// `readCandleMeta` swallows the error and returns null,
|
|
/// making the symbol look like a cache miss — `getCandles`
|
|
/// then triggers a fresh fetch via `populateAllFromTiingo`,
|
|
/// which writes a new meta file with the provider explicit.
|
|
/// The wipe happens naturally on first use post-upgrade.
|
|
provider: CandleProvider,
|
|
/// Consecutive transient failure count for the primary provider (Tiingo).
|
|
/// Incremented on ServerError; reset to 0 on success. When >= 3, the
|
|
/// symbol is degraded to a fallback provider until Tiingo recovers.
|
|
fail_count: u8 = 0,
|
|
};
|
|
|
|
pub const CandleProvider = enum {
|
|
/// Legacy: candles were sourced from TwelveData. No new
|
|
/// writes produce this value (TwelveData was demoted in an
|
|
/// earlier change because its `adj_close` was unreliable).
|
|
/// Cache reads still recognize the value for backwards
|
|
/// compatibility.
|
|
twelvedata,
|
|
/// Legacy: candles were sourced from Yahoo Finance. No new
|
|
/// writes produce this value (Yahoo was removed from the
|
|
/// candle pipeline in the 2026-05 audit; Yahoo is still used
|
|
/// for `getQuote` real-time prices but not for historical
|
|
/// candles). Cache reads still recognize the value for
|
|
/// backwards compatibility.
|
|
yahoo,
|
|
/// Active: candles sourced from Tiingo. The only value
|
|
/// produced by current writes.
|
|
tiingo,
|
|
|
|
pub fn fromString(s: []const u8) CandleProvider {
|
|
if (std.mem.eql(u8, s, "yahoo")) return .yahoo;
|
|
if (std.mem.eql(u8, s, "tiingo")) return .tiingo;
|
|
return .twelvedata;
|
|
}
|
|
};
|
|
|
|
// ── Private I/O ──────────────────────────────────────────────
|
|
|
|
fn readRaw(self: *Store, symbol: []const u8, data_type: DataType) !?[]const u8 {
|
|
const path = try self.symbolPath(symbol, data_type.fileName());
|
|
defer self.allocator.free(path);
|
|
|
|
return std.Io.Dir.cwd().readFileAlloc(self.io, path, self.allocator, .limited(50 * 1024 * 1024)) catch |err| switch (err) {
|
|
error.FileNotFound => return null,
|
|
else => return err,
|
|
};
|
|
}
|
|
|
|
/// Write raw bytes to a cache file. Used by server sync to write
|
|
/// pre-serialized SRF data directly to the cache.
|
|
///
|
|
/// Atomic: writes to `<path>.tmp`, fsyncs, and renames. A concurrent
|
|
/// reader sees either the old complete file or the new complete file —
|
|
/// never a truncated-mid-write state. This matters because:
|
|
///
|
|
/// - `parallelServerSync` (service.zig) writes many cache files in
|
|
/// parallel; other code paths in the same process (TUI redraws,
|
|
/// progress callbacks that happen to peek, future concurrent
|
|
/// readers) must never observe a half-written file.
|
|
/// - The prior implementation used `createFile(truncate) + writeAll`,
|
|
/// which has a window between truncation and write-completion
|
|
/// where a reader gets 0..data.len bytes. The symptom was SRF
|
|
/// `custom parse of value 2026-04 failed : InvalidDateFormat`
|
|
/// — a date field truncated exactly 7 chars into its 10-char
|
|
/// value.
|
|
pub fn writeRaw(self: *Store, symbol: []const u8, data_type: DataType, data: []const u8) !void {
|
|
try self.ensureSymbolDir(symbol);
|
|
const path = try self.symbolPath(symbol, data_type.fileName());
|
|
defer self.allocator.free(path);
|
|
|
|
try atomic.writeFileAtomic(self.io, self.allocator, path, data);
|
|
}
|
|
|
|
/// Append raw bytes to an existing cache file.
|
|
///
|
|
/// Implemented as read-existing + concat + atomic write rather than
|
|
/// a direct open-for-append, for the same reason as `writeRaw`:
|
|
/// a true append (`seekFromEnd + writeAll`) leaves a reader that hits
|
|
/// the file mid-append with a valid head + partial tail, which for
|
|
/// SRF data means a truncated trailing record. With atomic rewrite,
|
|
/// the rename primitive guarantees readers see either the pre-append
|
|
/// state or the post-append state, never an in-between.
|
|
///
|
|
/// Returns `error.FileNotFound` if the target doesn't exist yet —
|
|
/// callers are expected to fall back to a full rewrite path in that
|
|
/// case (see `appendCandles`).
|
|
fn appendRaw(self: *Store, symbol: []const u8, data_type: DataType, data: []const u8) !void {
|
|
const path = try self.symbolPath(symbol, data_type.fileName());
|
|
defer self.allocator.free(path);
|
|
|
|
const existing = std.Io.Dir.cwd().readFileAlloc(self.io, path, self.allocator, .limited(50 * 1024 * 1024)) catch |err| switch (err) {
|
|
error.FileNotFound => return error.FileNotFound,
|
|
else => return err,
|
|
};
|
|
defer self.allocator.free(existing);
|
|
|
|
const combined = try self.allocator.alloc(u8, existing.len + data.len);
|
|
defer self.allocator.free(combined);
|
|
@memcpy(combined[0..existing.len], existing);
|
|
@memcpy(combined[existing.len..], data);
|
|
|
|
try atomic.writeFileAtomic(self.io, self.allocator, path, combined);
|
|
}
|
|
|
|
fn symbolPath(self: *Store, symbol: []const u8, file_name: []const u8) ![]const u8 {
|
|
if (file_name.len == 0) {
|
|
return std.fs.path.join(self.allocator, &.{ self.cache_dir, symbol });
|
|
}
|
|
return std.fs.path.join(self.allocator, &.{ self.cache_dir, symbol, file_name });
|
|
}
|
|
|
|
// ── Private serialization: generic ───────────────────────────
|
|
|
|
/// Generic SRF deserializer with optional freshness check.
|
|
/// Single-pass: creates one iterator, optionally checks freshness, extracts
|
|
/// `#!created=` timestamp, and deserializes all records.
|
|
fn readSlice(
|
|
comptime T: type,
|
|
io: std.Io,
|
|
allocator: std.mem.Allocator,
|
|
data: []const u8,
|
|
comptime postProcess: ?*const fn (*T, std.mem.Allocator) anyerror!void,
|
|
comptime freshness: Freshness,
|
|
) ?CacheResult(T) {
|
|
var reader = std.Io.Reader.fixed(data);
|
|
var it = srf.iterator(&reader, allocator, .{ .alloc_strings = false }) catch return null;
|
|
defer it.deinit();
|
|
|
|
if (freshness == .fresh_only) {
|
|
// Negative cache entries are always "fresh" — they match exactly
|
|
const is_negative = std.mem.eql(u8, data, negative_cache_content);
|
|
if (!is_negative) {
|
|
if (it.expires == null) return null;
|
|
if (!it.isFresh(io)) return null;
|
|
}
|
|
}
|
|
|
|
const timestamp: i64 = it.created orelse std.Io.Timestamp.now(io, .real).toSeconds();
|
|
|
|
var items: std.ArrayList(T) = .empty;
|
|
defer {
|
|
if (items.items.len != 0) {
|
|
if (comptime @hasDecl(T, "deinit")) {
|
|
for (items.items) |item| item.deinit(allocator);
|
|
}
|
|
items.deinit(allocator);
|
|
}
|
|
}
|
|
|
|
while (it.next() catch return null) |fields| {
|
|
var item = fields.to(T) catch continue;
|
|
if (comptime postProcess) |pp| {
|
|
pp(&item, allocator) catch {
|
|
if (comptime @hasDecl(T, "deinit")) item.deinit(allocator);
|
|
return null;
|
|
};
|
|
}
|
|
items.append(allocator, item) catch {
|
|
if (comptime @hasDecl(T, "deinit")) item.deinit(allocator);
|
|
return null;
|
|
};
|
|
}
|
|
|
|
const result = items.toOwnedSlice(allocator) catch return null;
|
|
items = .empty; // prevent defer from freeing the returned slice
|
|
return .{ .data = result, .timestamp = timestamp };
|
|
}
|
|
|
|
/// Generic SRF serializer: emit directives (including `#!created=`) then data records.
|
|
fn serializeWithMeta(
|
|
comptime T: type,
|
|
io: std.Io,
|
|
allocator: std.mem.Allocator,
|
|
items: []const T,
|
|
options: srf.FormatOptions,
|
|
) ![]const u8 {
|
|
var aw: std.Io.Writer.Allocating = .init(allocator);
|
|
errdefer aw.deinit();
|
|
var opts = options;
|
|
opts.created = std.Io.Timestamp.now(io, .real).toSeconds();
|
|
try aw.writer.print("{f}", .{srf.fmtFrom(T, allocator, items, opts)});
|
|
return aw.toOwnedSlice();
|
|
}
|
|
|
|
// ── Private serialization: candles ───────────────────────────
|
|
|
|
fn serializeCandles(allocator: std.mem.Allocator, candles: []const Candle, options: srf.FormatOptions) ![]const u8 {
|
|
var aw: std.Io.Writer.Allocating = .init(allocator);
|
|
errdefer aw.deinit();
|
|
try aw.writer.print("{f}", .{srf.fmtFrom(Candle, allocator, candles, options)});
|
|
return aw.toOwnedSlice();
|
|
}
|
|
|
|
/// Serialize CandleMeta to its SRF on-disk representation.
|
|
/// Uses SRF's generic field emission. Because `CandleMeta`
|
|
/// declares no default for `provider`, every meta file emits
|
|
/// the provider line explicitly — cache inspection can always
|
|
/// answer "where did this come from?".
|
|
fn serializeCandleMeta(io: std.Io, allocator: std.mem.Allocator, meta: CandleMeta, options: srf.FormatOptions) ![]const u8 {
|
|
var aw: std.Io.Writer.Allocating = .init(allocator);
|
|
errdefer aw.deinit();
|
|
const items = [_]CandleMeta{meta};
|
|
var opts = options;
|
|
opts.created = std.Io.Timestamp.now(io, .real).toSeconds();
|
|
try aw.writer.print("{f}", .{srf.fmtFrom(CandleMeta, allocator, &items, opts)});
|
|
return aw.toOwnedSlice();
|
|
}
|
|
|
|
fn deserializeCandleMeta(allocator: std.mem.Allocator, data: []const u8) !CandleMeta {
|
|
var reader = std.Io.Reader.fixed(data);
|
|
var it = srf.iterator(&reader, allocator, .{ .alloc_strings = false }) catch return error.InvalidData;
|
|
defer it.deinit();
|
|
|
|
const fields = (try it.next()) orelse return error.InvalidData;
|
|
return fields.to(CandleMeta) catch error.InvalidData;
|
|
}
|
|
|
|
// ── Private serialization: options (bespoke) ─────────────────
|
|
|
|
const ChainHeader = struct {
|
|
expiration: Date,
|
|
symbol: []const u8,
|
|
price: ?f64 = null,
|
|
};
|
|
|
|
const OptionsRecord = union(enum) {
|
|
pub const srf_tag_field = "type";
|
|
chain: ChainHeader,
|
|
call: OptionContract,
|
|
put: OptionContract,
|
|
};
|
|
|
|
fn serializeOptions(io: std.Io, allocator: std.mem.Allocator, chains: []const OptionsChain, options: srf.FormatOptions) ![]const u8 {
|
|
var records: std.ArrayList(OptionsRecord) = .empty;
|
|
defer records.deinit(allocator);
|
|
|
|
for (chains) |chain| {
|
|
try records.append(allocator, .{ .chain = .{
|
|
.expiration = chain.expiration,
|
|
.symbol = chain.underlying_symbol,
|
|
.price = chain.underlying_price,
|
|
} });
|
|
for (chain.calls) |c|
|
|
try records.append(allocator, .{ .call = c });
|
|
for (chain.puts) |p|
|
|
try records.append(allocator, .{ .put = p });
|
|
}
|
|
|
|
var aw: std.Io.Writer.Allocating = .init(allocator);
|
|
errdefer aw.deinit();
|
|
var opts = options;
|
|
opts.created = std.Io.Timestamp.now(io, .real).toSeconds();
|
|
try aw.writer.print("{f}", .{srf.fmtFrom(OptionsRecord, allocator, records.items, opts)});
|
|
return aw.toOwnedSlice();
|
|
}
|
|
|
|
fn deserializeOptions(allocator: std.mem.Allocator, it: *srf.RecordIterator) ![]OptionsChain {
|
|
var chains: std.ArrayList(OptionsChain) = .empty;
|
|
errdefer {
|
|
for (chains.items) |*ch| {
|
|
allocator.free(ch.underlying_symbol);
|
|
allocator.free(ch.calls);
|
|
allocator.free(ch.puts);
|
|
}
|
|
chains.deinit(allocator);
|
|
}
|
|
|
|
var exp_map = std.AutoHashMap(i32, usize).init(allocator);
|
|
defer exp_map.deinit();
|
|
|
|
var calls_map = std.AutoHashMap(usize, std.ArrayList(OptionContract)).init(allocator);
|
|
defer {
|
|
var iter = calls_map.valueIterator();
|
|
while (iter.next()) |v| v.deinit(allocator);
|
|
calls_map.deinit();
|
|
}
|
|
var puts_map = std.AutoHashMap(usize, std.ArrayList(OptionContract)).init(allocator);
|
|
defer {
|
|
var iter = puts_map.valueIterator();
|
|
while (iter.next()) |v| v.deinit(allocator);
|
|
puts_map.deinit();
|
|
}
|
|
|
|
while (try it.next()) |fields| {
|
|
const opt_rec = fields.to(OptionsRecord) catch continue;
|
|
switch (opt_rec) {
|
|
.chain => |ch| {
|
|
const idx = chains.items.len;
|
|
try chains.append(allocator, .{
|
|
.underlying_symbol = try allocator.dupe(u8, ch.symbol),
|
|
.underlying_price = ch.price,
|
|
.expiration = ch.expiration,
|
|
.calls = &.{},
|
|
.puts = &.{},
|
|
});
|
|
try exp_map.put(ch.expiration.days, idx);
|
|
},
|
|
.call => |c| {
|
|
if (exp_map.get(c.expiration.days)) |idx| {
|
|
const entry = try calls_map.getOrPut(idx);
|
|
if (!entry.found_existing) entry.value_ptr.* = .empty;
|
|
try entry.value_ptr.append(allocator, c);
|
|
}
|
|
},
|
|
.put => |c| {
|
|
if (exp_map.get(c.expiration.days)) |idx| {
|
|
const entry = try puts_map.getOrPut(idx);
|
|
if (!entry.found_existing) entry.value_ptr.* = .empty;
|
|
try entry.value_ptr.append(allocator, c);
|
|
}
|
|
},
|
|
}
|
|
}
|
|
|
|
for (chains.items, 0..) |*chain, idx| {
|
|
if (calls_map.getPtr(idx)) |cl| {
|
|
chain.calls = try cl.toOwnedSlice(allocator);
|
|
}
|
|
if (puts_map.getPtr(idx)) |pl| {
|
|
chain.puts = try pl.toOwnedSlice(allocator);
|
|
}
|
|
}
|
|
|
|
return chains.toOwnedSlice(allocator);
|
|
}
|
|
|
|
// ── Private serialization: ETF profile (bespoke) ─────────────
|
|
|
|
const EtfRecord = union(enum) {
|
|
pub const srf_tag_field = "type";
|
|
meta: EtfProfile,
|
|
sector: SectorWeight,
|
|
holding: Holding,
|
|
};
|
|
|
|
fn serializeEtfProfile(io: std.Io, allocator: std.mem.Allocator, profile: EtfProfile, options: srf.FormatOptions) ![]const u8 {
|
|
var records: std.ArrayList(EtfRecord) = .empty;
|
|
defer records.deinit(allocator);
|
|
|
|
try records.append(allocator, .{ .meta = profile });
|
|
if (profile.sectors) |sectors| {
|
|
for (sectors) |s| try records.append(allocator, .{ .sector = s });
|
|
}
|
|
if (profile.holdings) |holdings| {
|
|
for (holdings) |h| try records.append(allocator, .{ .holding = h });
|
|
}
|
|
|
|
var aw: std.Io.Writer.Allocating = .init(allocator);
|
|
errdefer aw.deinit();
|
|
var opts = options;
|
|
opts.created = std.Io.Timestamp.now(io, .real).toSeconds();
|
|
try aw.writer.print("{f}", .{srf.fmtFrom(EtfRecord, allocator, records.items, opts)});
|
|
return aw.toOwnedSlice();
|
|
}
|
|
|
|
fn deserializeEtfProfile(allocator: std.mem.Allocator, it: *srf.RecordIterator) !EtfProfile {
|
|
var profile = EtfProfile{ .symbol = "" };
|
|
var sectors: std.ArrayList(SectorWeight) = .empty;
|
|
errdefer {
|
|
for (sectors.items) |s| allocator.free(s.name);
|
|
sectors.deinit(allocator);
|
|
}
|
|
var holdings: std.ArrayList(Holding) = .empty;
|
|
errdefer {
|
|
for (holdings.items) |h| {
|
|
if (h.symbol) |s| allocator.free(s);
|
|
allocator.free(h.name);
|
|
}
|
|
holdings.deinit(allocator);
|
|
}
|
|
|
|
while (try it.next()) |fields| {
|
|
const etf_rec = fields.to(EtfRecord) catch continue;
|
|
switch (etf_rec) {
|
|
.meta => |m| {
|
|
profile = m;
|
|
},
|
|
.sector => |s| {
|
|
const duped = try allocator.dupe(u8, s.name);
|
|
try sectors.append(allocator, .{ .name = duped, .weight = s.weight });
|
|
},
|
|
.holding => |h| {
|
|
const duped_sym = if (h.symbol) |s| try allocator.dupe(u8, s) else null;
|
|
const duped_name = try allocator.dupe(u8, h.name);
|
|
try holdings.append(allocator, .{ .symbol = duped_sym, .name = duped_name, .weight = h.weight });
|
|
},
|
|
}
|
|
}
|
|
|
|
if (sectors.items.len > 0) {
|
|
profile.sectors = try sectors.toOwnedSlice(allocator);
|
|
} else {
|
|
sectors.deinit(allocator);
|
|
}
|
|
if (holdings.items.len > 0) {
|
|
profile.holdings = try holdings.toOwnedSlice(allocator);
|
|
} else {
|
|
holdings.deinit(allocator);
|
|
}
|
|
|
|
return profile;
|
|
}
|
|
};
|
|
|
|
/// Serialize a portfolio (list of lots) to SRF format.
|
|
pub fn serializePortfolio(allocator: std.mem.Allocator, lots: []const Lot) ![]const u8 {
|
|
var aw: std.Io.Writer.Allocating = .init(allocator);
|
|
errdefer aw.deinit();
|
|
try aw.writer.print("{f}", .{srf.fmtFrom(Lot, allocator, lots, .{})});
|
|
return aw.toOwnedSlice();
|
|
}
|
|
|
|
/// Deserialize a portfolio from SRF data. Caller owns the returned Portfolio.
|
|
pub fn deserializePortfolio(allocator: std.mem.Allocator, data: []const u8) !Portfolio {
|
|
var lots: std.ArrayList(Lot) = .empty;
|
|
errdefer {
|
|
for (lots.items) |lot| {
|
|
allocator.free(lot.symbol);
|
|
if (lot.note) |n| allocator.free(n);
|
|
if (lot.account) |a| allocator.free(a);
|
|
if (lot.ticker) |t| allocator.free(t);
|
|
if (lot.underlying) |u| allocator.free(u);
|
|
}
|
|
lots.deinit(allocator);
|
|
}
|
|
|
|
var reader = std.Io.Reader.fixed(data);
|
|
var it = srf.iterator(&reader, allocator, .{ .alloc_strings = false }) catch return error.InvalidData;
|
|
defer it.deinit();
|
|
|
|
var skipped: usize = 0;
|
|
while (try it.next()) |fields| {
|
|
const line = it.state.line;
|
|
var lot = fields.to(Lot) catch {
|
|
std.log.warn("portfolio: could not parse record at line {d}", .{line});
|
|
skipped += 1;
|
|
continue;
|
|
};
|
|
|
|
// Dupe owned strings before iterator.deinit() frees the backing buffer
|
|
lot.symbol = try allocator.dupe(u8, lot.symbol);
|
|
if (lot.note) |n| lot.note = try allocator.dupe(u8, n);
|
|
if (lot.account) |a| lot.account = try allocator.dupe(u8, a);
|
|
if (lot.ticker) |t| lot.ticker = try allocator.dupe(u8, t);
|
|
if (lot.underlying) |u| lot.underlying = try allocator.dupe(u8, u);
|
|
|
|
// Cash lots without a symbol get a placeholder
|
|
if (lot.symbol.len == 0) {
|
|
allocator.free(lot.symbol);
|
|
lot.symbol = switch (lot.security_type) {
|
|
.cash => try allocator.dupe(u8, "CASH"),
|
|
.illiquid => try allocator.dupe(u8, "ILLIQUID"),
|
|
else => {
|
|
std.log.warn("portfolio: record at line {d} has no symbol, skipping", .{line});
|
|
if (lot.note) |n| allocator.free(n);
|
|
if (lot.account) |a| allocator.free(a);
|
|
if (lot.ticker) |t| allocator.free(t);
|
|
if (lot.underlying) |u| allocator.free(u);
|
|
skipped += 1;
|
|
continue;
|
|
},
|
|
};
|
|
}
|
|
|
|
try lots.append(allocator, lot);
|
|
}
|
|
|
|
if (skipped > 0) {
|
|
std.log.warn("portfolio: {d} record(s) could not be parsed and were skipped", .{skipped});
|
|
}
|
|
|
|
return .{
|
|
.lots = try lots.toOwnedSlice(allocator),
|
|
.allocator = allocator,
|
|
};
|
|
}
|
|
|
|
test "dividend serialize/deserialize round-trip" {
|
|
const io = std.testing.io;
|
|
const allocator = std.testing.allocator;
|
|
const divs = [_]Dividend{
|
|
.{ .ex_date = Date.fromYmd(2024, 3, 15), .amount = 0.8325, .pay_date = Date.fromYmd(2024, 3, 28), .type = .regular },
|
|
.{ .ex_date = Date.fromYmd(2024, 6, 14), .amount = 0.9148, .type = .special },
|
|
};
|
|
|
|
const data = try Store.serializeWithMeta(Dividend, io, allocator, &divs, .{});
|
|
defer allocator.free(data);
|
|
|
|
// No postProcess needed — test data has no currency strings to dupe
|
|
const result = Store.readSlice(Dividend, io, allocator, data, null, .any) orelse return error.TestUnexpectedResult;
|
|
const parsed = result.data;
|
|
defer allocator.free(parsed);
|
|
|
|
try std.testing.expectEqual(@as(usize, 2), parsed.len);
|
|
|
|
try std.testing.expect(parsed[0].ex_date.eql(Date.fromYmd(2024, 3, 15)));
|
|
try std.testing.expectApproxEqAbs(@as(f64, 0.8325), parsed[0].amount, 0.0001);
|
|
try std.testing.expect(parsed[0].pay_date != null);
|
|
try std.testing.expect(parsed[0].pay_date.?.eql(Date.fromYmd(2024, 3, 28)));
|
|
try std.testing.expectEqual(DividendType.regular, parsed[0].type);
|
|
|
|
try std.testing.expect(parsed[1].ex_date.eql(Date.fromYmd(2024, 6, 14)));
|
|
try std.testing.expectApproxEqAbs(@as(f64, 0.9148), parsed[1].amount, 0.0001);
|
|
try std.testing.expect(parsed[1].pay_date == null);
|
|
try std.testing.expectEqual(DividendType.special, parsed[1].type);
|
|
}
|
|
|
|
test "split serialize/deserialize round-trip" {
|
|
const io = std.testing.io;
|
|
const allocator = std.testing.allocator;
|
|
const splits = [_]Split{
|
|
.{ .date = Date.fromYmd(2020, 8, 31), .numerator = 4, .denominator = 1 },
|
|
.{ .date = Date.fromYmd(2014, 6, 9), .numerator = 7, .denominator = 1 },
|
|
};
|
|
|
|
const data = try Store.serializeWithMeta(Split, io, allocator, &splits, .{});
|
|
defer allocator.free(data);
|
|
|
|
const result = Store.readSlice(Split, io, allocator, data, null, .any) orelse return error.TestUnexpectedResult;
|
|
const parsed = result.data;
|
|
defer allocator.free(parsed);
|
|
|
|
try std.testing.expectEqual(@as(usize, 2), parsed.len);
|
|
|
|
try std.testing.expect(parsed[0].date.eql(Date.fromYmd(2020, 8, 31)));
|
|
try std.testing.expectApproxEqAbs(@as(f64, 4), parsed[0].numerator, 0.001);
|
|
try std.testing.expectApproxEqAbs(@as(f64, 1), parsed[0].denominator, 0.001);
|
|
|
|
try std.testing.expect(parsed[1].date.eql(Date.fromYmd(2014, 6, 9)));
|
|
try std.testing.expectApproxEqAbs(@as(f64, 7), parsed[1].numerator, 0.001);
|
|
}
|
|
|
|
test "writeMerged Dividend: empty cache writes input sorted descending" {
|
|
const allocator = std.testing.allocator;
|
|
const io = std.testing.io;
|
|
var tmp = std.testing.tmpDir(.{});
|
|
defer tmp.cleanup();
|
|
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
|
|
defer allocator.free(dir_path);
|
|
|
|
var s = Store.init(io, allocator, dir_path);
|
|
// Intentionally pass entries out of order — writeMerged must sort.
|
|
var incoming = [_]Dividend{
|
|
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50 },
|
|
.{ .ex_date = Date.fromYmd(2024, 8, 15), .amount = 0.55 },
|
|
.{ .ex_date = Date.fromYmd(2024, 2, 15), .amount = 0.48 },
|
|
};
|
|
s.write(Dividend, "TEST", incoming[0..], Ttl.dividends);
|
|
|
|
const result = s.read(Dividend, "TEST", null, .any) orelse return error.NoCache;
|
|
defer allocator.free(result.data);
|
|
defer for (result.data) |d| d.deinit(allocator);
|
|
|
|
try std.testing.expectEqual(@as(usize, 3), result.data.len);
|
|
try std.testing.expect(result.data[0].ex_date.eql(Date.fromYmd(2024, 8, 15)));
|
|
try std.testing.expect(result.data[1].ex_date.eql(Date.fromYmd(2024, 5, 15)));
|
|
try std.testing.expect(result.data[2].ex_date.eql(Date.fromYmd(2024, 2, 15)));
|
|
}
|
|
|
|
test "writeMerged Dividend: existing entries preserved on key collision" {
|
|
const allocator = std.testing.allocator;
|
|
const io = std.testing.io;
|
|
var tmp = std.testing.tmpDir(.{});
|
|
defer tmp.cleanup();
|
|
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
|
|
defer allocator.free(dir_path);
|
|
|
|
var s = Store.init(io, allocator, dir_path);
|
|
// Initial write: rich entry from "Polygon" with metadata.
|
|
var initial = [_]Dividend{
|
|
.{
|
|
.ex_date = Date.fromYmd(2024, 5, 15),
|
|
.pay_date = Date.fromYmd(2024, 6, 1),
|
|
.amount = 0.50,
|
|
.type = .regular,
|
|
},
|
|
};
|
|
s.write(Dividend, "TEST", initial[0..], Ttl.dividends);
|
|
|
|
// Second write: same ex_date, sparser entry (Tiingo-style: no pay_date, no type).
|
|
// Existing entry should win.
|
|
var incoming = [_]Dividend{
|
|
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.99, .type = .unknown },
|
|
};
|
|
s.write(Dividend, "TEST", incoming[0..], Ttl.dividends);
|
|
|
|
const result = s.read(Dividend, "TEST", null, .any) orelse return error.NoCache;
|
|
defer allocator.free(result.data);
|
|
defer for (result.data) |d| d.deinit(allocator);
|
|
|
|
try std.testing.expectEqual(@as(usize, 1), result.data.len);
|
|
// Original Polygon-style amount (0.50) must remain — Tiingo's 0.99 must not overwrite.
|
|
try std.testing.expectApproxEqAbs(@as(f64, 0.50), result.data[0].amount, 0.001);
|
|
try std.testing.expect(result.data[0].pay_date != null);
|
|
try std.testing.expectEqual(DividendType.regular, result.data[0].type);
|
|
}
|
|
|
|
test "writeMerged Dividend: union sorted desc, new entry added" {
|
|
const allocator = std.testing.allocator;
|
|
const io = std.testing.io;
|
|
var tmp = std.testing.tmpDir(.{});
|
|
defer tmp.cleanup();
|
|
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
|
|
defer allocator.free(dir_path);
|
|
|
|
var s = Store.init(io, allocator, dir_path);
|
|
var initial = [_]Dividend{
|
|
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50, .type = .regular },
|
|
.{ .ex_date = Date.fromYmd(2024, 2, 15), .amount = 0.48, .type = .regular },
|
|
};
|
|
s.write(Dividend, "TEST", initial[0..], Ttl.dividends);
|
|
|
|
// New ex_date that wasn't already present.
|
|
var incoming = [_]Dividend{
|
|
.{ .ex_date = Date.fromYmd(2024, 8, 15), .amount = 0.55 },
|
|
};
|
|
s.write(Dividend, "TEST", incoming[0..], Ttl.dividends);
|
|
|
|
const result = s.read(Dividend, "TEST", null, .any) orelse return error.NoCache;
|
|
defer allocator.free(result.data);
|
|
defer for (result.data) |d| d.deinit(allocator);
|
|
|
|
try std.testing.expectEqual(@as(usize, 3), result.data.len);
|
|
try std.testing.expect(result.data[0].ex_date.eql(Date.fromYmd(2024, 8, 15)));
|
|
try std.testing.expect(result.data[1].ex_date.eql(Date.fromYmd(2024, 5, 15)));
|
|
try std.testing.expect(result.data[2].ex_date.eql(Date.fromYmd(2024, 2, 15)));
|
|
}
|
|
|
|
test "writeMerged Dividend: no-op when nothing new" {
|
|
const allocator = std.testing.allocator;
|
|
const io = std.testing.io;
|
|
var tmp = std.testing.tmpDir(.{});
|
|
defer tmp.cleanup();
|
|
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
|
|
defer allocator.free(dir_path);
|
|
|
|
var s = Store.init(io, allocator, dir_path);
|
|
var initial = [_]Dividend{
|
|
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50, .type = .regular },
|
|
};
|
|
s.write(Dividend, "TEST", initial[0..], Ttl.dividends);
|
|
|
|
// Capture file mtime before second (no-op) write.
|
|
const path = try std.fs.path.join(allocator, &.{ dir_path, "TEST", "dividends.srf" });
|
|
defer allocator.free(path);
|
|
const stat_before = try std.Io.Dir.cwd().statFile(io, path, .{});
|
|
|
|
// Sleep briefly so mtime resolution can detect a write if one happens.
|
|
std.Io.sleep(io, std.Io.Duration.fromMilliseconds(20), .awake) catch {};
|
|
|
|
// Same incoming entry — nothing new, should not rewrite.
|
|
var repeat = [_]Dividend{
|
|
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50, .type = .regular },
|
|
};
|
|
s.write(Dividend, "TEST", repeat[0..], Ttl.dividends);
|
|
|
|
const stat_after = try std.Io.Dir.cwd().statFile(io, path, .{});
|
|
try std.testing.expectEqual(stat_before.mtime, stat_after.mtime);
|
|
}
|
|
|
|
test "writeMerged Dividend: field-level upgrade fills nulls (Tiingo-then-Polygon)" {
|
|
// Simulates the order that happens during a server refresh:
|
|
// Tiingo writes its sparse view first (via populateAllFromTiingo
|
|
// during getCandles), then Polygon writes its rich view second
|
|
// (via fetchCached during getDividends). The on-disk record
|
|
// must end up with Polygon's metadata, not stuck with Tiingo's
|
|
// null fields.
|
|
const allocator = std.testing.allocator;
|
|
const io = std.testing.io;
|
|
var tmp = std.testing.tmpDir(.{});
|
|
defer tmp.cleanup();
|
|
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
|
|
defer allocator.free(dir_path);
|
|
|
|
var s = Store.init(io, allocator, dir_path);
|
|
|
|
// Tiingo first: sparse — only ex_date and amount.
|
|
var tiingo_view = [_]Dividend{
|
|
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50 },
|
|
};
|
|
s.writeWithSource(Dividend, "TEST", tiingo_view[0..], Ttl.dividends, "tiingo");
|
|
|
|
// Polygon second: rich — pay_date, record_date, type, currency.
|
|
var polygon_view = [_]Dividend{
|
|
.{
|
|
.ex_date = Date.fromYmd(2024, 5, 15),
|
|
.pay_date = Date.fromYmd(2024, 6, 1),
|
|
.record_date = Date.fromYmd(2024, 5, 17),
|
|
.amount = 0.50,
|
|
.type = .regular,
|
|
// currency stays null in this fixture so we don't have
|
|
// to deal with allocation; the upgrade-currency case is
|
|
// covered by a separate test.
|
|
},
|
|
};
|
|
s.writeWithSource(Dividend, "TEST", polygon_view[0..], Ttl.dividends, "polygon");
|
|
|
|
const result = s.read(Dividend, "TEST", null, .any) orelse return error.NoCache;
|
|
defer allocator.free(result.data);
|
|
defer for (result.data) |d| d.deinit(allocator);
|
|
|
|
try std.testing.expectEqual(@as(usize, 1), result.data.len);
|
|
try std.testing.expect(result.data[0].pay_date != null);
|
|
try std.testing.expect(result.data[0].pay_date.?.eql(Date.fromYmd(2024, 6, 1)));
|
|
try std.testing.expect(result.data[0].record_date != null);
|
|
try std.testing.expect(result.data[0].record_date.?.eql(Date.fromYmd(2024, 5, 17)));
|
|
try std.testing.expectEqual(DividendType.regular, result.data[0].type);
|
|
}
|
|
|
|
test "writeMerged Dividend: type unknown counts as null and gets upgraded" {
|
|
// Tiingo's dividend records always carry type = .unknown. A
|
|
// later Polygon write with type = .regular must upgrade the
|
|
// existing record. This is the "type-unknown is null-equivalent"
|
|
// rule.
|
|
const allocator = std.testing.allocator;
|
|
const io = std.testing.io;
|
|
var tmp = std.testing.tmpDir(.{});
|
|
defer tmp.cleanup();
|
|
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
|
|
defer allocator.free(dir_path);
|
|
|
|
var s = Store.init(io, allocator, dir_path);
|
|
|
|
var tiingo_view = [_]Dividend{
|
|
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50, .type = .unknown },
|
|
};
|
|
s.writeWithSource(Dividend, "TEST", tiingo_view[0..], Ttl.dividends, "tiingo");
|
|
|
|
var polygon_view = [_]Dividend{
|
|
.{ .ex_date = Date.fromYmd(2024, 5, 15), .amount = 0.50, .type = .special },
|
|
};
|
|
s.writeWithSource(Dividend, "TEST", polygon_view[0..], Ttl.dividends, "polygon");
|
|
|
|
const result = s.read(Dividend, "TEST", null, .any) orelse return error.NoCache;
|
|
defer allocator.free(result.data);
|
|
defer for (result.data) |d| d.deinit(allocator);
|
|
|
|
try std.testing.expectEqual(@as(usize, 1), result.data.len);
|
|
try std.testing.expectEqual(DividendType.special, result.data[0].type);
|
|
}
|
|
|
|
test "writeMerged Dividend: non-null fields are not overwritten" {
|
|
// If the existing record already has a non-null field, an
|
|
// incoming record's different value must NOT overwrite it. The
|
|
// rule is "first non-null wins." Catches a regression where the
|
|
// upgrade logic is too eager.
|
|
const allocator = std.testing.allocator;
|
|
const io = std.testing.io;
|
|
var tmp = std.testing.tmpDir(.{});
|
|
defer tmp.cleanup();
|
|
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
|
|
defer allocator.free(dir_path);
|
|
|
|
var s = Store.init(io, allocator, dir_path);
|
|
|
|
var first = [_]Dividend{
|
|
.{
|
|
.ex_date = Date.fromYmd(2024, 5, 15),
|
|
.pay_date = Date.fromYmd(2024, 6, 1),
|
|
.amount = 0.50,
|
|
.type = .regular,
|
|
},
|
|
};
|
|
s.writeWithSource(Dividend, "TEST", first[0..], Ttl.dividends, "polygon");
|
|
|
|
// Second write with a different (and wrong) pay_date and type.
|
|
var second = [_]Dividend{
|
|
.{
|
|
.ex_date = Date.fromYmd(2024, 5, 15),
|
|
.pay_date = Date.fromYmd(2099, 1, 1),
|
|
.amount = 0.50,
|
|
.type = .special,
|
|
},
|
|
};
|
|
s.writeWithSource(Dividend, "TEST", second[0..], Ttl.dividends, "polygon");
|
|
|
|
const result = s.read(Dividend, "TEST", null, .any) orelse return error.NoCache;
|
|
defer allocator.free(result.data);
|
|
defer for (result.data) |d| d.deinit(allocator);
|
|
|
|
try std.testing.expectEqual(@as(usize, 1), result.data.len);
|
|
// First record's fields stick.
|
|
try std.testing.expect(result.data[0].pay_date.?.eql(Date.fromYmd(2024, 6, 1)));
|
|
try std.testing.expectEqual(DividendType.regular, result.data[0].type);
|
|
}
|
|
|
|
test "writeMerged Dividend: upgrade is no-op when both have same fields" {
|
|
// Both writes have the same ex_date, amount, pay_date, and
|
|
// type. There's nothing to upgrade and nothing new — the file
|
|
// should not be touched on the second write.
|
|
const allocator = std.testing.allocator;
|
|
const io = std.testing.io;
|
|
var tmp = std.testing.tmpDir(.{});
|
|
defer tmp.cleanup();
|
|
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
|
|
defer allocator.free(dir_path);
|
|
|
|
var s = Store.init(io, allocator, dir_path);
|
|
|
|
var initial = [_]Dividend{
|
|
.{
|
|
.ex_date = Date.fromYmd(2024, 5, 15),
|
|
.pay_date = Date.fromYmd(2024, 6, 1),
|
|
.amount = 0.50,
|
|
.type = .regular,
|
|
},
|
|
};
|
|
s.writeWithSource(Dividend, "TEST", initial[0..], Ttl.dividends, "polygon");
|
|
|
|
const path = try std.fs.path.join(allocator, &.{ dir_path, "TEST", "dividends.srf" });
|
|
defer allocator.free(path);
|
|
const stat_before = try std.Io.Dir.cwd().statFile(io, path, .{});
|
|
|
|
std.Io.sleep(io, std.Io.Duration.fromMilliseconds(20), .awake) catch {};
|
|
|
|
var repeat = [_]Dividend{
|
|
.{
|
|
.ex_date = Date.fromYmd(2024, 5, 15),
|
|
.pay_date = Date.fromYmd(2024, 6, 1),
|
|
.amount = 0.50,
|
|
.type = .regular,
|
|
},
|
|
};
|
|
s.writeWithSource(Dividend, "TEST", repeat[0..], Ttl.dividends, "polygon");
|
|
|
|
const stat_after = try std.Io.Dir.cwd().statFile(io, path, .{});
|
|
try std.testing.expectEqual(stat_before.mtime, stat_after.mtime);
|
|
}
|
|
|
|
test "writeMerged Split: SPYM-style supplementary entry added" {
|
|
const allocator = std.testing.allocator;
|
|
const io = std.testing.io;
|
|
var tmp = std.testing.tmpDir(.{});
|
|
defer tmp.cleanup();
|
|
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
|
|
defer allocator.free(dir_path);
|
|
|
|
var s = Store.init(io, allocator, dir_path);
|
|
// Polygon's view: empty (the bug case — Polygon doesn't carry SPYM's 2017 split).
|
|
var initial = [_]Split{};
|
|
s.write(Split, "SPYM", initial[0..], Ttl.splits);
|
|
|
|
// Tiingo supplements with the 2017 4:1 split.
|
|
var tiingo_view = [_]Split{
|
|
.{ .date = Date.fromYmd(2017, 10, 16), .numerator = 4, .denominator = 1 },
|
|
};
|
|
s.write(Split, "SPYM", tiingo_view[0..], Ttl.splits);
|
|
|
|
const result = s.read(Split, "SPYM", null, .any) orelse return error.NoCache;
|
|
defer allocator.free(result.data);
|
|
|
|
try std.testing.expectEqual(@as(usize, 1), result.data.len);
|
|
try std.testing.expect(result.data[0].date.eql(Date.fromYmd(2017, 10, 16)));
|
|
try std.testing.expectApproxEqAbs(@as(f64, 4), result.data[0].numerator, 0.001);
|
|
}
|
|
|
|
test "writeMerged Split: forward-looking Polygon entry preserved across Tiingo refresh" {
|
|
// Simulates the ARCC-like case for splits: Polygon writes a
|
|
// forward-looking entry; a later Tiingo write must not erase it.
|
|
const allocator = std.testing.allocator;
|
|
const io = std.testing.io;
|
|
var tmp = std.testing.tmpDir(.{});
|
|
defer tmp.cleanup();
|
|
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", allocator);
|
|
defer allocator.free(dir_path);
|
|
|
|
var s = Store.init(io, allocator, dir_path);
|
|
// Polygon initial: includes a forward-looking entry.
|
|
var polygon_view = [_]Split{
|
|
.{ .date = Date.fromYmd(2026, 12, 1), .numerator = 2, .denominator = 1 },
|
|
.{ .date = Date.fromYmd(2020, 8, 31), .numerator = 4, .denominator = 1 },
|
|
};
|
|
s.write(Split, "TEST", polygon_view[0..], Ttl.splits);
|
|
|
|
// Tiingo refresh: only knows about historical entries (its own data
|
|
// doesn't include forward-looking yet-to-occur splits).
|
|
var tiingo_view = [_]Split{
|
|
.{ .date = Date.fromYmd(2020, 8, 31), .numerator = 4, .denominator = 1 },
|
|
};
|
|
s.write(Split, "TEST", tiingo_view[0..], Ttl.splits);
|
|
|
|
const result = s.read(Split, "TEST", null, .any) orelse return error.NoCache;
|
|
defer allocator.free(result.data);
|
|
|
|
// Both entries must remain — Polygon's forward-looking entry survives.
|
|
try std.testing.expectEqual(@as(usize, 2), result.data.len);
|
|
try std.testing.expect(result.data[0].date.eql(Date.fromYmd(2026, 12, 1)));
|
|
try std.testing.expect(result.data[1].date.eql(Date.fromYmd(2020, 8, 31)));
|
|
}
|
|
|
|
test "portfolio serialize/deserialize round-trip" {
|
|
const allocator = std.testing.allocator;
|
|
// Today is after the lots' open_dates and after the one close_date,
|
|
// so "open" means "no close_date and not matured".
|
|
const today = Date.fromYmd(2024, 6, 1);
|
|
const lots = [_]Lot{
|
|
.{ .symbol = "AMZN", .shares = 10, .open_date = Date.fromYmd(2022, 3, 15), .open_price = 150.25 },
|
|
.{ .symbol = "AMZN", .shares = 5, .open_date = Date.fromYmd(2023, 6, 1), .open_price = 125.00, .close_date = Date.fromYmd(2024, 1, 15), .close_price = 185.50 },
|
|
.{ .symbol = "VTI", .shares = 100, .open_date = Date.fromYmd(2022, 1, 10), .open_price = 220.00 },
|
|
};
|
|
|
|
const data = try serializePortfolio(allocator, &lots);
|
|
defer allocator.free(data);
|
|
|
|
var portfolio = try deserializePortfolio(allocator, data);
|
|
defer portfolio.deinit();
|
|
|
|
try std.testing.expectEqual(@as(usize, 3), portfolio.lots.len);
|
|
|
|
try std.testing.expectEqualStrings("AMZN", portfolio.lots[0].symbol);
|
|
try std.testing.expectApproxEqAbs(@as(f64, 10), portfolio.lots[0].shares, 0.01);
|
|
try std.testing.expect(portfolio.lots[0].isOpen(today));
|
|
|
|
try std.testing.expectEqualStrings("AMZN", portfolio.lots[1].symbol);
|
|
try std.testing.expectApproxEqAbs(@as(f64, 5), portfolio.lots[1].shares, 0.01);
|
|
try std.testing.expect(!portfolio.lots[1].isOpen(today));
|
|
try std.testing.expect(portfolio.lots[1].close_date.?.eql(Date.fromYmd(2024, 1, 15)));
|
|
try std.testing.expectApproxEqAbs(@as(f64, 185.50), portfolio.lots[1].close_price.?, 0.01);
|
|
|
|
try std.testing.expectEqualStrings("VTI", portfolio.lots[2].symbol);
|
|
}
|
|
|
|
test "portfolio: cash lots without symbol get CASH placeholder" {
|
|
const allocator = std.testing.allocator;
|
|
// Raw SRF with a cash lot that has no symbol field
|
|
const data =
|
|
\\#!srfv1
|
|
\\security_type::cash,shares:num:598.66,open_date::2026-02-25,open_price:num:1.00,account::Savings
|
|
\\symbol::AAPL,shares:num:10,open_date::2024-01-15,open_price:num:150.00
|
|
\\
|
|
;
|
|
|
|
var portfolio = try deserializePortfolio(allocator, data);
|
|
defer portfolio.deinit();
|
|
|
|
try std.testing.expectEqual(@as(usize, 2), portfolio.lots.len);
|
|
|
|
// Cash lot: no symbol in data -> gets "CASH" placeholder
|
|
try std.testing.expectEqualStrings("CASH", portfolio.lots[0].symbol);
|
|
try std.testing.expectEqual(LotType.cash, portfolio.lots[0].security_type);
|
|
try std.testing.expectApproxEqAbs(@as(f64, 598.66), portfolio.lots[0].shares, 0.01);
|
|
try std.testing.expectEqualStrings("Savings", portfolio.lots[0].account.?);
|
|
|
|
// Stock lot: symbol present
|
|
try std.testing.expectEqualStrings("AAPL", portfolio.lots[1].symbol);
|
|
}
|
|
|
|
test "portfolio: price_ratio round-trip" {
|
|
const allocator = std.testing.allocator;
|
|
const data =
|
|
\\#!srfv1
|
|
\\symbol::02315N600,shares:num:100,open_date::2024-01-15,open_price:num:140.00,ticker::VTTHX,price_ratio:num:5.185,note::VANGUARD TARGET 2035
|
|
\\symbol::AAPL,shares:num:10,open_date::2024-03-01,open_price:num:150.00
|
|
\\
|
|
;
|
|
|
|
var portfolio = try deserializePortfolio(allocator, data);
|
|
defer portfolio.deinit();
|
|
|
|
try std.testing.expectEqual(@as(usize, 2), portfolio.lots.len);
|
|
|
|
// CUSIP lot with price_ratio and ticker
|
|
try std.testing.expectEqualStrings("02315N600", portfolio.lots[0].symbol);
|
|
try std.testing.expectEqualStrings("VTTHX", portfolio.lots[0].ticker.?);
|
|
try std.testing.expectEqualStrings("VTTHX", portfolio.lots[0].priceSymbol());
|
|
try std.testing.expectApproxEqAbs(@as(f64, 5.185), portfolio.lots[0].price_ratio, 0.001);
|
|
try std.testing.expectEqualStrings("VANGUARD TARGET 2035", portfolio.lots[0].note.?);
|
|
|
|
// Regular lot — no price_ratio (default 1.0)
|
|
try std.testing.expectEqualStrings("AAPL", portfolio.lots[1].symbol);
|
|
try std.testing.expectApproxEqAbs(@as(f64, 1.0), portfolio.lots[1].price_ratio, 0.001);
|
|
try std.testing.expect(portfolio.lots[1].ticker == null);
|
|
|
|
// Round-trip: serialize and deserialize again
|
|
const reserialized = try serializePortfolio(allocator, portfolio.lots);
|
|
defer allocator.free(reserialized);
|
|
|
|
var portfolio2 = try deserializePortfolio(allocator, reserialized);
|
|
defer portfolio2.deinit();
|
|
|
|
try std.testing.expectEqual(@as(usize, 2), portfolio2.lots.len);
|
|
try std.testing.expectApproxEqAbs(@as(f64, 5.185), portfolio2.lots[0].price_ratio, 0.001);
|
|
try std.testing.expectEqualStrings("VTTHX", portfolio2.lots[0].ticker.?);
|
|
try std.testing.expectApproxEqAbs(@as(f64, 1.0), portfolio2.lots[1].price_ratio, 0.001);
|
|
}
|
|
|
|
// ── TTL and Negative Cache Tests ─────────────────────────────────
|
|
|
|
test "TTL constants are reasonable" {
|
|
// Historical candles never expire
|
|
try std.testing.expectEqual(@as(i64, -1), Ttl.candles_historical);
|
|
|
|
// Latest candles expire just under 24 hours (allowing for cron jitter)
|
|
try std.testing.expect(Ttl.candles_latest > 23 * std.time.s_per_hour);
|
|
try std.testing.expect(Ttl.candles_latest < 24 * std.time.s_per_hour);
|
|
|
|
// Dividends and splits refresh biweekly
|
|
try std.testing.expectEqual(@as(i64, 14 * std.time.s_per_day), Ttl.dividends);
|
|
try std.testing.expectEqual(@as(i64, 14 * std.time.s_per_day), Ttl.splits);
|
|
|
|
// Options refresh hourly
|
|
try std.testing.expectEqual(@as(i64, std.time.s_per_hour), Ttl.options);
|
|
|
|
// Earnings and ETF profiles refresh monthly
|
|
try std.testing.expectEqual(@as(i64, 30 * std.time.s_per_day), Ttl.earnings);
|
|
try std.testing.expectEqual(@as(i64, 30 * std.time.s_per_day), Ttl.etf_profile);
|
|
}
|
|
|
|
test "DataType.ttl returns correct values" {
|
|
try std.testing.expectEqual(Ttl.dividends, DataType.dividends.ttl());
|
|
try std.testing.expectEqual(Ttl.splits, DataType.splits.ttl());
|
|
try std.testing.expectEqual(Ttl.options, DataType.options.ttl());
|
|
try std.testing.expectEqual(Ttl.earnings, DataType.earnings.ttl());
|
|
try std.testing.expectEqual(Ttl.etf_profile, DataType.etf_profile.ttl());
|
|
|
|
// These types have no TTL (0 = managed elsewhere)
|
|
try std.testing.expectEqual(@as(i64, 0), DataType.candles_daily.ttl());
|
|
try std.testing.expectEqual(@as(i64, 0), DataType.candles_meta.ttl());
|
|
try std.testing.expectEqual(@as(i64, 0), DataType.meta.ttl());
|
|
}
|
|
|
|
test "DataType.fileName returns correct file names" {
|
|
try std.testing.expectEqualStrings("candles_daily.srf", DataType.candles_daily.fileName());
|
|
try std.testing.expectEqualStrings("candles_meta.srf", DataType.candles_meta.fileName());
|
|
try std.testing.expectEqualStrings("dividends.srf", DataType.dividends.fileName());
|
|
try std.testing.expectEqualStrings("splits.srf", DataType.splits.fileName());
|
|
try std.testing.expectEqualStrings("options.srf", DataType.options.fileName());
|
|
try std.testing.expectEqualStrings("earnings.srf", DataType.earnings.fileName());
|
|
try std.testing.expectEqualStrings("etf_profile.srf", DataType.etf_profile.fileName());
|
|
try std.testing.expectEqualStrings("meta.srf", DataType.meta.fileName());
|
|
}
|
|
|
|
test "negative_cache_content format" {
|
|
// Negative cache marker should be valid SRF with a comment
|
|
try std.testing.expect(std.mem.startsWith(u8, Store.negative_cache_content, "#!srfv1"));
|
|
try std.testing.expect(std.mem.indexOf(u8, Store.negative_cache_content, "fetch_failed") != null);
|
|
}
|
|
|
|
test "looksCompleteSrf: empty is invalid" {
|
|
try std.testing.expect(!Store.looksCompleteSrf(""));
|
|
}
|
|
|
|
test "looksCompleteSrf: missing srfv1 header rejected" {
|
|
try std.testing.expect(!Store.looksCompleteSrf("garbage\n"));
|
|
try std.testing.expect(!Store.looksCompleteSrf("some json body\n"));
|
|
try std.testing.expect(!Store.looksCompleteSrf("{\"error\":\"not found\"}\n"));
|
|
}
|
|
|
|
test "looksCompleteSrf: missing trailing newline rejected" {
|
|
// Classic torn-write symptom: body ends mid-record with no newline.
|
|
// FRDM 2026-05-02 looked like this: trailing `date::2026-04` with
|
|
// no comma, no remaining fields, no `\n`.
|
|
try std.testing.expect(!Store.looksCompleteSrf(
|
|
"#!srfv1\ndate::2026-04-22,open:num:62.82\ndate::2026-04",
|
|
));
|
|
try std.testing.expect(!Store.looksCompleteSrf("#!srfv1"));
|
|
}
|
|
|
|
test "looksCompleteSrf: well-formed body accepted" {
|
|
try std.testing.expect(Store.looksCompleteSrf(
|
|
"#!srfv1\ndate::2026-04-22,open:num:62.82,close:num:63.23\n",
|
|
));
|
|
// Negative cache content is also valid
|
|
try std.testing.expect(Store.looksCompleteSrf(Store.negative_cache_content));
|
|
// Minimal: just the header with its own terminator
|
|
try std.testing.expect(Store.looksCompleteSrf("#!srfv1\n"));
|
|
}
|
|
|
|
test "archiveTornBody writes .bin + .meta pair with expected SRF content" {
|
|
const io = std.testing.io;
|
|
var tmp = std.testing.tmpDir(.{});
|
|
defer tmp.cleanup();
|
|
|
|
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", testing.allocator);
|
|
defer testing.allocator.free(dir_path);
|
|
|
|
// Shape mirrors the canonical FRDM torn body: a header plus a
|
|
// complete record, then a record that was cut mid-date-field with
|
|
// no terminating newline.
|
|
const torn_bytes = "#!srfv1\ndate::2026-04-22,open:num:62.82,close:num:63.23\ndate::2026-04";
|
|
|
|
try Store.archiveTornBody(
|
|
std.testing.io,
|
|
testing.allocator,
|
|
dir_path,
|
|
"FRDM",
|
|
.candles_daily,
|
|
torn_bytes,
|
|
.{
|
|
.failure_reason = .looks_complete_srf_failed,
|
|
.http_status = 200,
|
|
.http_content_length = 214637,
|
|
.server_url = "https://example.test/FRDM/candles",
|
|
.server_etag = "\"sha256:deadbeefcafe\"",
|
|
},
|
|
);
|
|
|
|
// Find the produced files. The timestamp in the name is produced
|
|
// at write time so we can't predict it — list the _torn dir and
|
|
// assert exactly one `.bin` + one `.meta`, and that they share a
|
|
// prefix (so they're unambiguously paired).
|
|
const torn_dir_path = try std.fs.path.join(testing.allocator, &.{ dir_path, "_torn" });
|
|
defer testing.allocator.free(torn_dir_path);
|
|
|
|
var torn_dir = try std.Io.Dir.cwd().openDir(std.testing.io, torn_dir_path, .{ .iterate = true });
|
|
defer torn_dir.close(io);
|
|
|
|
var bin_name_buf: [128]u8 = undefined;
|
|
var meta_name_buf: [128]u8 = undefined;
|
|
var bin_len: usize = 0;
|
|
var meta_len: usize = 0;
|
|
|
|
var it = torn_dir.iterate();
|
|
while (try it.next(io)) |entry| {
|
|
if (std.mem.endsWith(u8, entry.name, ".bin")) {
|
|
@memcpy(bin_name_buf[0..entry.name.len], entry.name);
|
|
bin_len = entry.name.len;
|
|
} else if (std.mem.endsWith(u8, entry.name, ".meta")) {
|
|
@memcpy(meta_name_buf[0..entry.name.len], entry.name);
|
|
meta_len = entry.name.len;
|
|
}
|
|
}
|
|
try std.testing.expect(bin_len > 0);
|
|
try std.testing.expect(meta_len > 0);
|
|
|
|
// Prefix without extension should match (same symbol + data_type
|
|
// + timestamp).
|
|
const bin_prefix = bin_name_buf[0 .. bin_len - ".bin".len];
|
|
const meta_prefix = meta_name_buf[0 .. meta_len - ".meta".len];
|
|
try std.testing.expectEqualStrings(bin_prefix, meta_prefix);
|
|
try std.testing.expect(std.mem.startsWith(u8, bin_prefix, "FRDM_candles_daily_"));
|
|
|
|
// .bin round-trips verbatim.
|
|
const bin_path = try std.fs.path.join(testing.allocator, &.{ torn_dir_path, bin_name_buf[0..bin_len] });
|
|
defer testing.allocator.free(bin_path);
|
|
const bin_contents = try std.Io.Dir.cwd().readFileAlloc(std.testing.io, bin_path, testing.allocator, .limited(1024 * 1024));
|
|
defer testing.allocator.free(bin_contents);
|
|
try std.testing.expectEqualStrings(torn_bytes, bin_contents);
|
|
|
|
// .meta is valid SRF and carries the fields we care about.
|
|
const meta_path = try std.fs.path.join(testing.allocator, &.{ torn_dir_path, meta_name_buf[0..meta_len] });
|
|
defer testing.allocator.free(meta_path);
|
|
const meta_contents = try std.Io.Dir.cwd().readFileAlloc(std.testing.io, meta_path, testing.allocator, .limited(1024 * 1024));
|
|
defer testing.allocator.free(meta_contents);
|
|
|
|
try std.testing.expect(std.mem.startsWith(u8, meta_contents, "#!srfv1\n"));
|
|
try std.testing.expect(std.mem.indexOf(u8, meta_contents, "symbol::FRDM") != null);
|
|
try std.testing.expect(std.mem.indexOf(u8, meta_contents, "data_type::candles_daily") != null);
|
|
try std.testing.expect(std.mem.indexOf(u8, meta_contents, "failure_reason::looks_complete_srf_failed") != null);
|
|
try std.testing.expect(std.mem.indexOf(u8, meta_contents, "http_status:num:200") != null);
|
|
try std.testing.expect(std.mem.indexOf(u8, meta_contents, "http_content_length:num:214637") != null);
|
|
try std.testing.expect(std.mem.indexOf(u8, meta_contents, "server_url::https://example.test/FRDM/candles") != null);
|
|
try std.testing.expect(std.mem.indexOf(u8, meta_contents, "server_etag::\"sha256:deadbeefcafe\"") != null);
|
|
// Body length matches the torn slice exactly.
|
|
var len_buf: [32]u8 = undefined;
|
|
const len_str = try std.fmt.bufPrint(&len_buf, "body_length:num:{d}", .{torn_bytes.len});
|
|
try std.testing.expect(std.mem.indexOf(u8, meta_contents, len_str) != null);
|
|
// Hex-encoded sha256 — length check (64 hex chars).
|
|
const sha_idx = std.mem.indexOf(u8, meta_contents, "body_sha256::").?;
|
|
const sha_start = sha_idx + "body_sha256::".len;
|
|
try std.testing.expect(meta_contents.len - sha_start >= 64);
|
|
for (meta_contents[sha_start .. sha_start + 64]) |c| {
|
|
try std.testing.expect(std.ascii.isHex(c));
|
|
}
|
|
// last_200_bytes_hex is exactly 2 * min(body_len, 200) chars.
|
|
const tail_idx = std.mem.indexOf(u8, meta_contents, "last_200_bytes_hex::").?;
|
|
const tail_start = tail_idx + "last_200_bytes_hex::".len;
|
|
const tail_end = std.mem.indexOfScalarPos(u8, meta_contents, tail_start, '\n').?;
|
|
try std.testing.expectEqual(@as(usize, torn_bytes.len * 2), tail_end - tail_start);
|
|
}
|
|
|
|
test "Store.read self-heals torn candles_daily and wipes the pair" {
|
|
const io = std.testing.io;
|
|
var tmp = std.testing.tmpDir(.{});
|
|
defer tmp.cleanup();
|
|
|
|
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", testing.allocator);
|
|
defer testing.allocator.free(dir_path);
|
|
|
|
var store = Store.init(std.testing.io, testing.allocator, dir_path);
|
|
try store.ensureSymbolDir("FRDM");
|
|
|
|
// Seed a torn daily and an intact meta — the exact state we saw
|
|
// on disk for FRDM in the 2026-05-08 incident.
|
|
const torn_daily = "#!srfv1\ndate::2026-04-22,open:num:62.82,close:num:63.23\ndate::2026-04";
|
|
const intact_meta = "#!srfv1\n#!expires=9999999999\n#!created=1777000000\nlast_close:num:67.11,last_date::2026-05-07\n";
|
|
try store.writeRaw("FRDM", .candles_daily, torn_daily);
|
|
try store.writeRaw("FRDM", .candles_meta, intact_meta);
|
|
|
|
// Reading candles MUST signal cache miss.
|
|
const result = store.read(Candle, "FRDM", null, .any);
|
|
try std.testing.expect(result == null);
|
|
|
|
// Both files are wiped.
|
|
const daily_path = try std.fs.path.join(testing.allocator, &.{ dir_path, "FRDM", "candles_daily.srf" });
|
|
defer testing.allocator.free(daily_path);
|
|
const meta_path = try std.fs.path.join(testing.allocator, &.{ dir_path, "FRDM", "candles_meta.srf" });
|
|
defer testing.allocator.free(meta_path);
|
|
try std.testing.expectError(error.FileNotFound, std.Io.Dir.cwd().access(std.testing.io, daily_path, .{}));
|
|
try std.testing.expectError(error.FileNotFound, std.Io.Dir.cwd().access(std.testing.io, meta_path, .{}));
|
|
|
|
// And the torn body was archived under _torn/.
|
|
const torn_dir_path = try std.fs.path.join(testing.allocator, &.{ dir_path, "_torn" });
|
|
defer testing.allocator.free(torn_dir_path);
|
|
var torn_dir = try std.Io.Dir.cwd().openDir(std.testing.io, torn_dir_path, .{ .iterate = true });
|
|
defer torn_dir.close(io);
|
|
var found_bin = false;
|
|
var found_meta = false;
|
|
var it = torn_dir.iterate();
|
|
while (try it.next(io)) |entry| {
|
|
if (std.mem.startsWith(u8, entry.name, "FRDM_candles_daily_")) {
|
|
if (std.mem.endsWith(u8, entry.name, ".bin")) found_bin = true;
|
|
if (std.mem.endsWith(u8, entry.name, ".meta")) found_meta = true;
|
|
}
|
|
}
|
|
try std.testing.expect(found_bin);
|
|
try std.testing.expect(found_meta);
|
|
}
|
|
|
|
test "Store.read does not self-heal an intact candles_daily" {
|
|
const io = std.testing.io;
|
|
var tmp = std.testing.tmpDir(.{});
|
|
defer tmp.cleanup();
|
|
|
|
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", testing.allocator);
|
|
defer testing.allocator.free(dir_path);
|
|
|
|
var store = Store.init(std.testing.io, testing.allocator, dir_path);
|
|
try store.ensureSymbolDir("OK");
|
|
|
|
// Seed a complete (well-formed) candles_daily.srf — a single
|
|
// record with the full OHLCV field set the Candle type expects.
|
|
const good_daily =
|
|
"#!srfv1\n" ++
|
|
"date::2026-04-22,open:num:62.82,high:num:63.24,low:num:62.6,close:num:63.23,adj_close:num:63.23,volume:num:189473\n";
|
|
const good_meta = "#!srfv1\n#!expires=9999999999\n#!created=1777000000\nlast_close:num:63.23,last_date::2026-04-22\n";
|
|
try store.writeRaw("OK", .candles_daily, good_daily);
|
|
try store.writeRaw("OK", .candles_meta, good_meta);
|
|
|
|
// Reading should succeed, and the cache files must still be there.
|
|
const result = store.read(Candle, "OK", null, .any);
|
|
try std.testing.expect(result != null);
|
|
if (result) |r| {
|
|
defer testing.allocator.free(r.data);
|
|
try std.testing.expectEqual(@as(usize, 1), r.data.len);
|
|
}
|
|
|
|
const daily_path = try std.fs.path.join(testing.allocator, &.{ dir_path, "OK", "candles_daily.srf" });
|
|
defer testing.allocator.free(daily_path);
|
|
const daily_stat = try std.Io.Dir.cwd().statFile(std.testing.io, daily_path, .{});
|
|
try std.testing.expect(daily_stat.size > 0);
|
|
|
|
// And no _torn/ directory was created.
|
|
const torn_dir_path = try std.fs.path.join(testing.allocator, &.{ dir_path, "_torn" });
|
|
defer testing.allocator.free(torn_dir_path);
|
|
try std.testing.expectError(error.FileNotFound, std.Io.Dir.cwd().access(std.testing.io, torn_dir_path, .{}));
|
|
}
|
|
|
|
test "Store.dataTypeFor maps model types correctly" {
|
|
try std.testing.expectEqual(DataType.candles_daily, Store.dataTypeFor(Candle));
|
|
try std.testing.expectEqual(DataType.dividends, Store.dataTypeFor(Dividend));
|
|
try std.testing.expectEqual(DataType.splits, Store.dataTypeFor(Split));
|
|
try std.testing.expectEqual(DataType.earnings, Store.dataTypeFor(EarningsEvent));
|
|
try std.testing.expectEqual(DataType.options, Store.dataTypeFor(OptionsChain));
|
|
try std.testing.expectEqual(DataType.etf_profile, Store.dataTypeFor(EtfProfile));
|
|
}
|
|
|
|
test "Store.DataFor returns correct types" {
|
|
// EtfProfile returns single struct, others return slices
|
|
try std.testing.expect(@TypeOf(Store.DataFor(EtfProfile)) == type);
|
|
try std.testing.expect(Store.DataFor(EtfProfile) == EtfProfile);
|
|
try std.testing.expect(Store.DataFor(Candle) == []Candle);
|
|
try std.testing.expect(Store.DataFor(Dividend) == []Dividend);
|
|
try std.testing.expect(Store.DataFor(Split) == []Split);
|
|
}
|
|
|
|
test "Store.Freshness enum values" {
|
|
// Ensure enum has expected values
|
|
try std.testing.expect(Store.Freshness.fresh_only != Store.Freshness.any);
|
|
}
|
|
|
|
test "CandleProvider.fromString parses provider names" {
|
|
try std.testing.expectEqual(Store.CandleProvider.yahoo, Store.CandleProvider.fromString("yahoo"));
|
|
try std.testing.expectEqual(Store.CandleProvider.tiingo, Store.CandleProvider.fromString("tiingo"));
|
|
try std.testing.expectEqual(Store.CandleProvider.twelvedata, Store.CandleProvider.fromString("twelvedata"));
|
|
// Unknown defaults to twelvedata
|
|
try std.testing.expectEqual(Store.CandleProvider.twelvedata, Store.CandleProvider.fromString("unknown"));
|
|
try std.testing.expectEqual(Store.CandleProvider.twelvedata, Store.CandleProvider.fromString(""));
|
|
}
|
|
|
|
test "Store init creates valid store" {
|
|
const allocator = std.testing.allocator;
|
|
const store = Store.init(std.testing.io, allocator, "/tmp/zfin-test");
|
|
try std.testing.expectEqualStrings("/tmp/zfin-test", store.cache_dir);
|
|
}
|
|
|
|
test "CandleMeta has no default provider (must be set explicitly)" {
|
|
// Regression: pre-2026-05 the model had `provider: CandleProvider = .tiingo`
|
|
// which caused SRF to elide the field when it equaled the default,
|
|
// hiding the provider in cache inspection. We removed the default
|
|
// so every cache file records its provider explicitly.
|
|
//
|
|
// This test confirms the field is required at construction. If
|
|
// someone re-adds a default later, this test fails to compile.
|
|
const meta = Store.CandleMeta{
|
|
.last_close = 100.0,
|
|
.last_date = Date.fromYmd(2024, 1, 1),
|
|
.provider = .tiingo,
|
|
};
|
|
try std.testing.expectEqual(Store.CandleProvider.tiingo, meta.provider);
|
|
}
|
|
|
|
test "serializeCandleMeta unconditionally emits provider field" {
|
|
// Regression: SRF auto-elision used to hide `provider::tiingo`
|
|
// when it equaled the model default. With the default removed
|
|
// from the struct, SRF emits the field unconditionally.
|
|
const allocator = std.testing.allocator;
|
|
const meta = Store.CandleMeta{
|
|
.last_close = 100.0,
|
|
.last_date = Date.fromYmd(2024, 1, 1),
|
|
.provider = .tiingo,
|
|
};
|
|
const data = try Store.serializeCandleMeta(std.testing.io, allocator, meta, .{ .expires = 1234567890 });
|
|
defer allocator.free(data);
|
|
|
|
try std.testing.expect(std.mem.indexOf(u8, data, "provider::tiingo") != null);
|
|
try std.testing.expect(std.mem.indexOf(u8, data, "last_close:num:100") != null);
|
|
try std.testing.expect(std.mem.indexOf(u8, data, "last_date::2024-01-01") != null);
|
|
try std.testing.expect(std.mem.indexOf(u8, data, "#!expires=1234567890") != null);
|
|
}
|
|
|
|
test "serializeCandleMeta round-trips through deserializeCandleMeta" {
|
|
const allocator = std.testing.allocator;
|
|
const meta = Store.CandleMeta{
|
|
.last_close = 42.57,
|
|
.last_date = Date.fromYmd(2026, 5, 19),
|
|
.provider = .tiingo,
|
|
.fail_count = 2,
|
|
};
|
|
const data = try Store.serializeCandleMeta(std.testing.io, allocator, meta, .{ .expires = 1234567890 });
|
|
defer allocator.free(data);
|
|
|
|
const parsed = try Store.deserializeCandleMeta(allocator, data);
|
|
try std.testing.expectApproxEqAbs(@as(f64, 42.57), parsed.last_close, 0.001);
|
|
try std.testing.expect(parsed.last_date.eql(Date.fromYmd(2026, 5, 19)));
|
|
try std.testing.expectEqual(Store.CandleProvider.tiingo, parsed.provider);
|
|
try std.testing.expectEqual(@as(u8, 2), parsed.fail_count);
|
|
}
|
|
|
|
test "deserializeCandleMeta fails on old cache that elided provider field" {
|
|
// Pre-2026-05 cache files elided `provider::tiingo` when it
|
|
// equaled the model default. Those caches no longer deserialize
|
|
// (model has no default for provider). The graceful handling is
|
|
// upstream: `readCandleMeta` swallows the deserialization error
|
|
// and returns null, which makes `getCandles` treat it as a cache
|
|
// miss and trigger a fresh fetch via `populateAllFromTiingo`.
|
|
// The new fetch writes a meta file with the provider explicit.
|
|
//
|
|
// This test documents the failure mode and confirms it's not a
|
|
// silent corruption — the caller gets an error, not stale data.
|
|
const allocator = std.testing.allocator;
|
|
const old_format =
|
|
\\#!srfv1
|
|
\\#!expires=1779384748
|
|
\\#!created=1779299248
|
|
\\last_close:num:298.97,last_date::2026-05-19
|
|
\\
|
|
;
|
|
const result = Store.deserializeCandleMeta(allocator, old_format);
|
|
try std.testing.expectError(error.InvalidData, result);
|
|
}
|
|
|
|
// ── writeRaw / appendRaw atomicity ───────────────────────────
|
|
//
|
|
// A concurrent reader hitting a cache file mid-write must never see a
|
|
// truncated or partial-field state — the symptom was srf `custom parse
|
|
// of value 2026-04 failed : InvalidDateFormat`, a date field chopped
|
|
// exactly 7 chars into a 10-char value.
|
|
//
|
|
// These tests exercise the atomicity primitive underneath writeRaw/
|
|
// appendRaw: while one thread hammers writes of two alternating,
|
|
// differently-sized SRF blobs, reader threads hammer reads and assert
|
|
// that every read they succeed at parses cleanly as one of those two
|
|
// blobs — no partial content, no partial dates. A pre-atomic-fix
|
|
// version of writeRaw would fail this test within a handful of
|
|
// iterations.
|
|
|
|
const testing = std.testing;
|
|
|
|
test "writeRaw atomicity: concurrent readers never observe a truncated file" {
|
|
const io = std.testing.io;
|
|
// Two SRF blobs with dates at different byte offsets. A non-atomic
|
|
// writer that truncates + writes would leak partial bytes of
|
|
// whichever blob is mid-write; that would show up as either a
|
|
// partial date or a split field — both caught by strict equality
|
|
// against these two complete blobs.
|
|
const blob_a =
|
|
\\#!srfv1
|
|
\\#!expires=1780000000
|
|
\\#!created=1777000000
|
|
\\symbol::AAPL,date::2026-04-30,actual:num:2.78
|
|
\\symbol::MSFT,date::2026-01-29,actual:num:4.27
|
|
\\
|
|
;
|
|
const blob_b =
|
|
\\#!srfv1
|
|
\\#!expires=1780500000
|
|
\\#!created=1777500000
|
|
\\symbol::AMZN,date::2026-07-29,estimate:num:1.83
|
|
\\
|
|
;
|
|
try std.testing.expect(blob_a.len != blob_b.len); // ensure size changes
|
|
|
|
var tmp = std.testing.tmpDir(.{});
|
|
defer tmp.cleanup();
|
|
|
|
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", testing.allocator);
|
|
defer testing.allocator.free(dir_path);
|
|
|
|
// 0.16's DebugAllocator (which testing.allocator uses) is
|
|
// thread-safe by default, so no ThreadSafeAllocator wrapper needed.
|
|
var store = Store.init(std.testing.io, testing.allocator, dir_path);
|
|
|
|
// Make sure the symbol subdir exists once up-front (writeRaw will
|
|
// also call ensureSymbolDir, but doing it here keeps the writer
|
|
// hot loop lean).
|
|
try store.ensureSymbolDir("SYM");
|
|
|
|
const Ctx = struct {
|
|
store: *Store,
|
|
blob_a: []const u8,
|
|
blob_b: []const u8,
|
|
stop: std.atomic.Value(bool),
|
|
bad_reads: std.atomic.Value(u32),
|
|
completed_reads: std.atomic.Value(u32),
|
|
|
|
fn writer(self: *@This()) void {
|
|
var i: usize = 0;
|
|
while (!self.stop.load(.acquire)) : (i += 1) {
|
|
const bytes = if (i & 1 == 0) self.blob_a else self.blob_b;
|
|
// We don't care about errors here — worst case the
|
|
// writer just skips this iteration.
|
|
self.store.writeRaw("SYM", .candles_daily, bytes) catch {};
|
|
}
|
|
}
|
|
|
|
fn reader(self: *@This()) void {
|
|
const path = self.store.symbolPath("SYM", DataType.candles_daily.fileName()) catch return;
|
|
defer self.store.allocator.free(path);
|
|
|
|
while (!self.stop.load(.acquire)) {
|
|
const bytes = std.Io.Dir.cwd().readFileAlloc(self.store.io, path, self.store.allocator, .limited(1 * 1024 * 1024)) catch |err| switch (err) {
|
|
error.FileNotFound => continue, // pre-first-write race
|
|
else => continue,
|
|
};
|
|
defer self.store.allocator.free(bytes);
|
|
|
|
// Every successful read must match exactly one of the
|
|
// two source blobs. A non-atomic writer would eventually
|
|
// produce reads that match neither (truncated, zero
|
|
// bytes, or mid-blob switchover garbage).
|
|
const matches = std.mem.eql(u8, bytes, self.blob_a) or std.mem.eql(u8, bytes, self.blob_b);
|
|
if (!matches) _ = self.bad_reads.fetchAdd(1, .monotonic);
|
|
_ = self.completed_reads.fetchAdd(1, .monotonic);
|
|
}
|
|
}
|
|
};
|
|
|
|
var ctx = Ctx{
|
|
.store = &store,
|
|
.blob_a = blob_a,
|
|
.blob_b = blob_b,
|
|
.stop = std.atomic.Value(bool).init(false),
|
|
.bad_reads = std.atomic.Value(u32).init(0),
|
|
.completed_reads = std.atomic.Value(u32).init(0),
|
|
};
|
|
|
|
var writer_thread = try std.Thread.spawn(.{}, Ctx.writer, .{&ctx});
|
|
var reader_threads: [3]std.Thread = undefined;
|
|
for (&reader_threads) |*t| t.* = try std.Thread.spawn(.{}, Ctx.reader, .{&ctx});
|
|
|
|
// Run the stress for a fixed duration. 200ms is plenty for thousands
|
|
// of iterations on any reasonable machine — enough to reliably catch
|
|
// a non-atomic write window at the scheduler granularity.
|
|
try io.sleep(.fromMilliseconds(200), .boot);
|
|
ctx.stop.store(true, .release);
|
|
|
|
writer_thread.join();
|
|
for (&reader_threads) |*t| t.join();
|
|
|
|
// The critical assertion: zero reads landed in a partial state.
|
|
const bad = ctx.bad_reads.load(.monotonic);
|
|
const total = ctx.completed_reads.load(.monotonic);
|
|
|
|
// Sanity: we actually did work (otherwise the test is a no-op).
|
|
try testing.expect(total > 0);
|
|
try testing.expectEqual(@as(u32, 0), bad);
|
|
}
|
|
|
|
test "appendRaw atomicity: concurrent readers see either pre- or post-append, never mid" {
|
|
const io = std.testing.io;
|
|
// Seed an initial file with a complete SRF doc, then have one thread
|
|
// append more records repeatedly while readers race to read it. Every
|
|
// successful read must parse cleanly and have a valid termination
|
|
// — no trailing partial record from an in-flight append.
|
|
const seed =
|
|
\\#!srfv1
|
|
\\#!created=1777000000
|
|
\\symbol::AAPL,date::2024-04-30,actual:num:1.53
|
|
\\
|
|
;
|
|
const chunk =
|
|
\\symbol::AAPL,date::2024-07-30,actual:num:1.65
|
|
\\
|
|
;
|
|
|
|
var tmp = std.testing.tmpDir(.{});
|
|
defer tmp.cleanup();
|
|
|
|
const dir_path = try tmp.dir.realPathFileAlloc(io, ".", testing.allocator);
|
|
defer testing.allocator.free(dir_path);
|
|
|
|
// 0.16's DebugAllocator (which testing.allocator uses) is
|
|
// thread-safe by default, so no ThreadSafeAllocator wrapper needed.
|
|
var store = Store.init(std.testing.io, testing.allocator, dir_path);
|
|
try store.ensureSymbolDir("SYM");
|
|
|
|
// Write seed atomically up front.
|
|
try store.writeRaw("SYM", .candles_daily, seed);
|
|
|
|
const Ctx = struct {
|
|
store: *Store,
|
|
chunk: []const u8,
|
|
stop: std.atomic.Value(bool),
|
|
bad_reads: std.atomic.Value(u32),
|
|
completed_reads: std.atomic.Value(u32),
|
|
|
|
fn appender(self: *@This()) void {
|
|
while (!self.stop.load(.acquire)) {
|
|
self.store.appendRaw("SYM", .candles_daily, self.chunk) catch {};
|
|
}
|
|
}
|
|
|
|
fn reader(self: *@This()) void {
|
|
const path = self.store.symbolPath("SYM", DataType.candles_daily.fileName()) catch return;
|
|
defer self.store.allocator.free(path);
|
|
|
|
while (!self.stop.load(.acquire)) {
|
|
const bytes = std.Io.Dir.cwd().readFileAlloc(self.store.io, path, self.store.allocator, .limited(4 * 1024 * 1024)) catch continue;
|
|
defer self.store.allocator.free(bytes);
|
|
|
|
// Invariants for a valid appended file:
|
|
// 1. Starts with `#!srfv1\n` (the header is never torn).
|
|
// 2. Ends with `\n` (no partial trailing record).
|
|
// 3. Total length is `seed.len + k * chunk.len` for some
|
|
// integer k ≥ 0 — which, with atomic appends, is the
|
|
// only observable shape.
|
|
const ok_header = std.mem.startsWith(u8, bytes, "#!srfv1\n");
|
|
const ok_tail = bytes.len > 0 and bytes[bytes.len - 1] == '\n';
|
|
if (!ok_header or !ok_tail) {
|
|
_ = self.bad_reads.fetchAdd(1, .monotonic);
|
|
}
|
|
_ = self.completed_reads.fetchAdd(1, .monotonic);
|
|
}
|
|
}
|
|
};
|
|
|
|
var ctx = Ctx{
|
|
.store = &store,
|
|
.chunk = chunk,
|
|
.stop = std.atomic.Value(bool).init(false),
|
|
.bad_reads = std.atomic.Value(u32).init(0),
|
|
.completed_reads = std.atomic.Value(u32).init(0),
|
|
};
|
|
|
|
var appender_thread = try std.Thread.spawn(.{}, Ctx.appender, .{&ctx});
|
|
var reader_threads: [3]std.Thread = undefined;
|
|
for (&reader_threads) |*t| t.* = try std.Thread.spawn(.{}, Ctx.reader, .{&ctx});
|
|
|
|
try io.sleep(.fromMilliseconds(200), .boot);
|
|
ctx.stop.store(true, .release);
|
|
|
|
appender_thread.join();
|
|
for (&reader_threads) |*t| t.join();
|
|
|
|
const bad = ctx.bad_reads.load(.monotonic);
|
|
const total = ctx.completed_reads.load(.monotonic);
|
|
try testing.expect(total > 0);
|
|
try testing.expectEqual(@as(u32, 0), bad);
|
|
}
|