use atomic file writes for data pulling from server

This commit is contained in:
Emil Lerch 2026-05-01 08:53:14 -07:00
parent 6a2cc8e775
commit 90ed1dabd3
Signed by: lobo
GPG key ID: A7B62D657EF764F8
2 changed files with 291 additions and 19 deletions

270
src/cache/store.zig vendored
View file

@ -1,6 +1,7 @@
const std = @import("std");
const log = std.log.scoped(.cache);
const srf = @import("srf");
const atomic = @import("../atomic.zig");
const Date = @import("../models/date.zig").Date;
const Candle = @import("../models/candle.zig").Candle;
const Dividend = @import("../models/dividend.zig").Dividend;
@ -428,26 +429,58 @@ pub const Store = struct {
/// Write raw bytes to a cache file. Used by server sync to write
/// pre-serialized SRF data directly to the cache.
///
/// Atomic: writes to `<path>.tmp`, fsyncs, and renames. A concurrent
/// reader sees either the old complete file or the new complete file
/// never a truncated-mid-write state. This matters because:
///
/// - `parallelServerSync` (service.zig) writes many cache files in
/// parallel; other code paths in the same process (TUI redraws,
/// progress callbacks that happen to peek, future concurrent
/// readers) must never observe a half-written file.
/// - The prior implementation used `createFile(truncate) + writeAll`,
/// which has a window between truncation and write-completion
/// where a reader gets 0..data.len bytes. The symptom was SRF
/// `custom parse of value 2026-04 failed : InvalidDateFormat`
/// a date field truncated exactly 7 chars into its 10-char
/// value.
pub fn writeRaw(self: *Store, symbol: []const u8, data_type: DataType, data: []const u8) !void {
try self.ensureSymbolDir(symbol);
const path = try self.symbolPath(symbol, data_type.fileName());
defer self.allocator.free(path);
const file = try std.fs.cwd().createFile(path, .{});
defer file.close();
try file.writeAll(data);
try atomic.writeFileAtomic(self.allocator, path, data);
}
/// Append raw bytes to an existing cache file.
///
/// Implemented as read-existing + concat + atomic write rather than
/// a direct open-for-append, for the same reason as `writeRaw`:
/// a true append (`seekFromEnd + writeAll`) leaves a reader that hits
/// the file mid-append with a valid head + partial tail, which for
/// SRF data means a truncated trailing record. With atomic rewrite,
/// the rename primitive guarantees readers see either the pre-append
/// state or the post-append state, never an in-between.
///
/// Returns `error.FileNotFound` if the target doesn't exist yet
/// callers are expected to fall back to a full rewrite path in that
/// case (see `appendCandles`).
fn appendRaw(self: *Store, symbol: []const u8, data_type: DataType, data: []const u8) !void {
const path = try self.symbolPath(symbol, data_type.fileName());
defer self.allocator.free(path);
const file = std.fs.cwd().openFile(path, .{ .mode = .write_only }) catch {
return error.FileNotFound;
const existing = std.fs.cwd().readFileAlloc(self.allocator, path, 50 * 1024 * 1024) catch |err| switch (err) {
error.FileNotFound => return error.FileNotFound,
else => return err,
};
defer file.close();
try file.seekFromEnd(0);
try file.writeAll(data);
defer self.allocator.free(existing);
const combined = try self.allocator.alloc(u8, existing.len + data.len);
defer self.allocator.free(combined);
@memcpy(combined[0..existing.len], existing);
@memcpy(combined[existing.len..], data);
try atomic.writeFileAtomic(self.allocator, path, combined);
}
fn symbolPath(self: *Store, symbol: []const u8, file_name: []const u8) ![]const u8 {
@ -1060,3 +1093,224 @@ test "CandleMeta default provider is tiingo" {
};
try std.testing.expectEqual(Store.CandleProvider.tiingo, meta.provider);
}
// writeRaw / appendRaw atomicity
//
// A concurrent reader hitting a cache file mid-write must never see a
// truncated or partial-field state the symptom was srf `custom parse
// of value 2026-04 failed : InvalidDateFormat`, a date field chopped
// exactly 7 chars into a 10-char value.
//
// These tests exercise the atomicity primitive underneath writeRaw/
// appendRaw: while one thread hammers writes of two alternating,
// differently-sized SRF blobs, reader threads hammer reads and assert
// that every read they succeed at parses cleanly as one of those two
// blobs no partial content, no partial dates. A pre-atomic-fix
// version of writeRaw would fail this test within a handful of
// iterations.
const testing = std.testing;
test "writeRaw atomicity: concurrent readers never observe a truncated file" {
// Two SRF blobs with dates at different byte offsets. A non-atomic
// writer that truncates + writes would leak partial bytes of
// whichever blob is mid-write; that would show up as either a
// partial date or a split field both caught by strict equality
// against these two complete blobs.
const blob_a =
\\#!srfv1
\\#!expires=1780000000
\\#!created=1777000000
\\symbol::AAPL,date::2026-04-30,actual:num:2.78
\\symbol::MSFT,date::2026-01-29,actual:num:4.27
\\
;
const blob_b =
\\#!srfv1
\\#!expires=1780500000
\\#!created=1777500000
\\symbol::AMZN,date::2026-07-29,estimate:num:1.83
\\
;
try std.testing.expect(blob_a.len != blob_b.len); // ensure size changes
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realpathAlloc(testing.allocator, ".");
defer testing.allocator.free(dir_path);
// Use a thread-safe allocator because multiple worker threads
// allocate through Store/writeFileAtomic concurrently.
var thread_safe: std.heap.ThreadSafeAllocator = .{ .child_allocator = testing.allocator };
const alloc = thread_safe.allocator();
var store = Store.init(alloc, dir_path);
// Make sure the symbol subdir exists once up-front (writeRaw will
// also call ensureSymbolDir, but doing it here keeps the writer
// hot loop lean).
try store.ensureSymbolDir("SYM");
const Ctx = struct {
store: *Store,
blob_a: []const u8,
blob_b: []const u8,
stop: std.atomic.Value(bool),
bad_reads: std.atomic.Value(u32),
completed_reads: std.atomic.Value(u32),
fn writer(self: *@This()) void {
var i: usize = 0;
while (!self.stop.load(.acquire)) : (i += 1) {
const bytes = if (i & 1 == 0) self.blob_a else self.blob_b;
// We don't care about errors here worst case the
// writer just skips this iteration.
self.store.writeRaw("SYM", .candles_daily, bytes) catch {};
}
}
fn reader(self: *@This()) void {
const path = self.store.symbolPath("SYM", DataType.candles_daily.fileName()) catch return;
defer self.store.allocator.free(path);
while (!self.stop.load(.acquire)) {
const bytes = std.fs.cwd().readFileAlloc(self.store.allocator, path, 1 * 1024 * 1024) catch |err| switch (err) {
error.FileNotFound => continue, // pre-first-write race
else => continue,
};
defer self.store.allocator.free(bytes);
// Every successful read must match exactly one of the
// two source blobs. A non-atomic writer would eventually
// produce reads that match neither (truncated, zero
// bytes, or mid-blob switchover garbage).
const matches = std.mem.eql(u8, bytes, self.blob_a) or std.mem.eql(u8, bytes, self.blob_b);
if (!matches) _ = self.bad_reads.fetchAdd(1, .monotonic);
_ = self.completed_reads.fetchAdd(1, .monotonic);
}
}
};
var ctx = Ctx{
.store = &store,
.blob_a = blob_a,
.blob_b = blob_b,
.stop = std.atomic.Value(bool).init(false),
.bad_reads = std.atomic.Value(u32).init(0),
.completed_reads = std.atomic.Value(u32).init(0),
};
var writer_thread = try std.Thread.spawn(.{}, Ctx.writer, .{&ctx});
var reader_threads: [3]std.Thread = undefined;
for (&reader_threads) |*t| t.* = try std.Thread.spawn(.{}, Ctx.reader, .{&ctx});
// Run the stress for a fixed duration. 200ms is plenty for thousands
// of iterations on any reasonable machine enough to reliably catch
// a non-atomic write window at the scheduler granularity.
std.Thread.sleep(200 * std.time.ns_per_ms);
ctx.stop.store(true, .release);
writer_thread.join();
for (&reader_threads) |*t| t.join();
// The critical assertion: zero reads landed in a partial state.
const bad = ctx.bad_reads.load(.monotonic);
const total = ctx.completed_reads.load(.monotonic);
// Sanity: we actually did work (otherwise the test is a no-op).
try testing.expect(total > 0);
try testing.expectEqual(@as(u32, 0), bad);
}
test "appendRaw atomicity: concurrent readers see either pre- or post-append, never mid" {
// Seed an initial file with a complete SRF doc, then have one thread
// append more records repeatedly while readers race to read it. Every
// successful read must parse cleanly and have a valid termination
// no trailing partial record from an in-flight append.
const seed =
\\#!srfv1
\\#!created=1777000000
\\symbol::AAPL,date::2024-04-30,actual:num:1.53
\\
;
const chunk =
\\symbol::AAPL,date::2024-07-30,actual:num:1.65
\\
;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
const dir_path = try tmp.dir.realpathAlloc(testing.allocator, ".");
defer testing.allocator.free(dir_path);
var thread_safe: std.heap.ThreadSafeAllocator = .{ .child_allocator = testing.allocator };
const alloc = thread_safe.allocator();
var store = Store.init(alloc, dir_path);
try store.ensureSymbolDir("SYM");
// Write seed atomically up front.
try store.writeRaw("SYM", .candles_daily, seed);
const Ctx = struct {
store: *Store,
chunk: []const u8,
stop: std.atomic.Value(bool),
bad_reads: std.atomic.Value(u32),
completed_reads: std.atomic.Value(u32),
fn appender(self: *@This()) void {
while (!self.stop.load(.acquire)) {
self.store.appendRaw("SYM", .candles_daily, self.chunk) catch {};
}
}
fn reader(self: *@This()) void {
const path = self.store.symbolPath("SYM", DataType.candles_daily.fileName()) catch return;
defer self.store.allocator.free(path);
while (!self.stop.load(.acquire)) {
const bytes = std.fs.cwd().readFileAlloc(self.store.allocator, path, 4 * 1024 * 1024) catch continue;
defer self.store.allocator.free(bytes);
// Invariants for a valid appended file:
// 1. Starts with `#!srfv1\n` (the header is never torn).
// 2. Ends with `\n` (no partial trailing record).
// 3. Total length is `seed.len + k * chunk.len` for some
// integer k 0 which, with atomic appends, is the
// only observable shape.
const ok_header = std.mem.startsWith(u8, bytes, "#!srfv1\n");
const ok_tail = bytes.len > 0 and bytes[bytes.len - 1] == '\n';
if (!ok_header or !ok_tail) {
_ = self.bad_reads.fetchAdd(1, .monotonic);
}
_ = self.completed_reads.fetchAdd(1, .monotonic);
}
}
};
var ctx = Ctx{
.store = &store,
.chunk = chunk,
.stop = std.atomic.Value(bool).init(false),
.bad_reads = std.atomic.Value(u32).init(0),
.completed_reads = std.atomic.Value(u32).init(0),
};
var appender_thread = try std.Thread.spawn(.{}, Ctx.appender, .{&ctx});
var reader_threads: [3]std.Thread = undefined;
for (&reader_threads) |*t| t.* = try std.Thread.spawn(.{}, Ctx.reader, .{&ctx});
std.Thread.sleep(200 * std.time.ns_per_ms);
ctx.stop.store(true, .release);
appender_thread.join();
for (&reader_threads) |*t| t.join();
const bad = ctx.bad_reads.load(.monotonic);
const total = ctx.completed_reads.load(.monotonic);
try testing.expect(total > 0);
try testing.expectEqual(@as(u32, 0), bad);
}

View file

@ -9,6 +9,7 @@
const std = @import("std");
const log = std.log.scoped(.service);
const Date = @import("models/date.zig").Date;
const Candle = @import("models/candle.zig").Candle;
const Dividend = @import("models/dividend.zig").Dividend;
@ -33,6 +34,7 @@ const Tiingo = @import("providers/tiingo.zig").Tiingo;
const fmt = @import("format.zig");
const performance = @import("analytics/performance.zig");
const http = @import("net/http.zig");
const atomic = @import("atomic.zig");
pub const DataError = error{
NoApiKey,
@ -1296,6 +1298,11 @@ pub const DataService = struct {
}
/// Append a CUSIP->ticker mapping to the cache file.
///
/// Implemented as read-append-atomic-write (rather than a direct
/// open-for-append) so a concurrent reader never sees a file with a
/// valid header plus partial trailing record. See `cache/store.zig
/// appendRaw` for the same pattern and rationale.
pub fn cacheCusipTicker(self: *DataService, cusip: []const u8, ticker: []const u8) void {
const path = std.fs.path.join(self.allocator(), &.{ self.config.cache_dir, "cusip_tickers.srf" }) catch return;
defer self.allocator().free(path);
@ -1305,20 +1312,31 @@ pub const DataService = struct {
std.fs.cwd().makePath(dir) catch {};
}
// Open existing (append) or create new (with header)
var emit_directives = false;
const file = std.fs.cwd().openFile(path, .{ .mode = .write_only }) catch blk: {
emit_directives = true;
break :blk std.fs.cwd().createFile(path, .{}) catch return;
// Read existing cache if present.
const existing = std.fs.cwd().readFileAlloc(self.allocator(), path, 4 * 1024 * 1024) catch |err| switch (err) {
error.FileNotFound => @as([]u8, &.{}),
else => return,
};
defer file.close();
if (!emit_directives) file.seekFromEnd(0) catch {};
const owns_existing = existing.len > 0;
defer if (owns_existing) self.allocator().free(existing);
// Serialize the new entry (with `#!srfv1` directives only if the
// cache file doesn't exist yet).
const emit_directives = !owns_existing;
const entry = [_]CusipEntry{.{ .cusip = cusip, .ticker = ticker }};
var buf: [256]u8 = undefined;
var writer = file.writer(&buf);
writer.interface.print("{f}", .{srf.fmtFrom(CusipEntry, self.allocator(), &entry, .{ .emit_directives = emit_directives })}) catch return;
writer.interface.flush() catch {};
var aw: std.Io.Writer.Allocating = .init(self.allocator());
defer aw.deinit();
aw.writer.print("{f}", .{srf.fmtFrom(CusipEntry, self.allocator(), &entry, .{ .emit_directives = emit_directives })}) catch return;
const encoded = aw.writer.buffered();
if (encoded.len == 0) return;
// Concat existing + new, then atomic-write.
const combined = self.allocator().alloc(u8, existing.len + encoded.len) catch return;
defer self.allocator().free(combined);
@memcpy(combined[0..existing.len], existing);
@memcpy(combined[existing.len..], encoded);
atomic.writeFileAtomic(self.allocator(), path, combined) catch {};
}
// Utility