diff --git a/src/cache/store.zig b/src/cache/store.zig index 232df2f..3d82557 100644 --- a/src/cache/store.zig +++ b/src/cache/store.zig @@ -1,6 +1,7 @@ const std = @import("std"); const log = std.log.scoped(.cache); const srf = @import("srf"); +const atomic = @import("../atomic.zig"); const Date = @import("../models/date.zig").Date; const Candle = @import("../models/candle.zig").Candle; const Dividend = @import("../models/dividend.zig").Dividend; @@ -428,26 +429,58 @@ pub const Store = struct { /// Write raw bytes to a cache file. Used by server sync to write /// pre-serialized SRF data directly to the cache. + /// + /// Atomic: writes to `.tmp`, fsyncs, and renames. A concurrent + /// reader sees either the old complete file or the new complete file — + /// never a truncated-mid-write state. This matters because: + /// + /// - `parallelServerSync` (service.zig) writes many cache files in + /// parallel; other code paths in the same process (TUI redraws, + /// progress callbacks that happen to peek, future concurrent + /// readers) must never observe a half-written file. + /// - The prior implementation used `createFile(truncate) + writeAll`, + /// which has a window between truncation and write-completion + /// where a reader gets 0..data.len bytes. The symptom was SRF + /// `custom parse of value 2026-04 failed : InvalidDateFormat` + /// — a date field truncated exactly 7 chars into its 10-char + /// value. pub fn writeRaw(self: *Store, symbol: []const u8, data_type: DataType, data: []const u8) !void { try self.ensureSymbolDir(symbol); const path = try self.symbolPath(symbol, data_type.fileName()); defer self.allocator.free(path); - const file = try std.fs.cwd().createFile(path, .{}); - defer file.close(); - try file.writeAll(data); + try atomic.writeFileAtomic(self.allocator, path, data); } + /// Append raw bytes to an existing cache file. + /// + /// Implemented as read-existing + concat + atomic write rather than + /// a direct open-for-append, for the same reason as `writeRaw`: + /// a true append (`seekFromEnd + writeAll`) leaves a reader that hits + /// the file mid-append with a valid head + partial tail, which for + /// SRF data means a truncated trailing record. With atomic rewrite, + /// the rename primitive guarantees readers see either the pre-append + /// state or the post-append state, never an in-between. + /// + /// Returns `error.FileNotFound` if the target doesn't exist yet — + /// callers are expected to fall back to a full rewrite path in that + /// case (see `appendCandles`). fn appendRaw(self: *Store, symbol: []const u8, data_type: DataType, data: []const u8) !void { const path = try self.symbolPath(symbol, data_type.fileName()); defer self.allocator.free(path); - const file = std.fs.cwd().openFile(path, .{ .mode = .write_only }) catch { - return error.FileNotFound; + const existing = std.fs.cwd().readFileAlloc(self.allocator, path, 50 * 1024 * 1024) catch |err| switch (err) { + error.FileNotFound => return error.FileNotFound, + else => return err, }; - defer file.close(); - try file.seekFromEnd(0); - try file.writeAll(data); + defer self.allocator.free(existing); + + const combined = try self.allocator.alloc(u8, existing.len + data.len); + defer self.allocator.free(combined); + @memcpy(combined[0..existing.len], existing); + @memcpy(combined[existing.len..], data); + + try atomic.writeFileAtomic(self.allocator, path, combined); } fn symbolPath(self: *Store, symbol: []const u8, file_name: []const u8) ![]const u8 { @@ -1060,3 +1093,224 @@ test "CandleMeta default provider is tiingo" { }; try std.testing.expectEqual(Store.CandleProvider.tiingo, meta.provider); } + +// ── writeRaw / appendRaw atomicity ─────────────────────────── +// +// A concurrent reader hitting a cache file mid-write must never see a +// truncated or partial-field state — the symptom was srf `custom parse +// of value 2026-04 failed : InvalidDateFormat`, a date field chopped +// exactly 7 chars into a 10-char value. +// +// These tests exercise the atomicity primitive underneath writeRaw/ +// appendRaw: while one thread hammers writes of two alternating, +// differently-sized SRF blobs, reader threads hammer reads and assert +// that every read they succeed at parses cleanly as one of those two +// blobs — no partial content, no partial dates. A pre-atomic-fix +// version of writeRaw would fail this test within a handful of +// iterations. + +const testing = std.testing; + +test "writeRaw atomicity: concurrent readers never observe a truncated file" { + // Two SRF blobs with dates at different byte offsets. A non-atomic + // writer that truncates + writes would leak partial bytes of + // whichever blob is mid-write; that would show up as either a + // partial date or a split field — both caught by strict equality + // against these two complete blobs. + const blob_a = + \\#!srfv1 + \\#!expires=1780000000 + \\#!created=1777000000 + \\symbol::AAPL,date::2026-04-30,actual:num:2.78 + \\symbol::MSFT,date::2026-01-29,actual:num:4.27 + \\ + ; + const blob_b = + \\#!srfv1 + \\#!expires=1780500000 + \\#!created=1777500000 + \\symbol::AMZN,date::2026-07-29,estimate:num:1.83 + \\ + ; + try std.testing.expect(blob_a.len != blob_b.len); // ensure size changes + + var tmp = std.testing.tmpDir(.{}); + defer tmp.cleanup(); + + const dir_path = try tmp.dir.realpathAlloc(testing.allocator, "."); + defer testing.allocator.free(dir_path); + + // Use a thread-safe allocator because multiple worker threads + // allocate through Store/writeFileAtomic concurrently. + var thread_safe: std.heap.ThreadSafeAllocator = .{ .child_allocator = testing.allocator }; + const alloc = thread_safe.allocator(); + + var store = Store.init(alloc, dir_path); + + // Make sure the symbol subdir exists once up-front (writeRaw will + // also call ensureSymbolDir, but doing it here keeps the writer + // hot loop lean). + try store.ensureSymbolDir("SYM"); + + const Ctx = struct { + store: *Store, + blob_a: []const u8, + blob_b: []const u8, + stop: std.atomic.Value(bool), + bad_reads: std.atomic.Value(u32), + completed_reads: std.atomic.Value(u32), + + fn writer(self: *@This()) void { + var i: usize = 0; + while (!self.stop.load(.acquire)) : (i += 1) { + const bytes = if (i & 1 == 0) self.blob_a else self.blob_b; + // We don't care about errors here — worst case the + // writer just skips this iteration. + self.store.writeRaw("SYM", .candles_daily, bytes) catch {}; + } + } + + fn reader(self: *@This()) void { + const path = self.store.symbolPath("SYM", DataType.candles_daily.fileName()) catch return; + defer self.store.allocator.free(path); + + while (!self.stop.load(.acquire)) { + const bytes = std.fs.cwd().readFileAlloc(self.store.allocator, path, 1 * 1024 * 1024) catch |err| switch (err) { + error.FileNotFound => continue, // pre-first-write race + else => continue, + }; + defer self.store.allocator.free(bytes); + + // Every successful read must match exactly one of the + // two source blobs. A non-atomic writer would eventually + // produce reads that match neither (truncated, zero + // bytes, or mid-blob switchover garbage). + const matches = std.mem.eql(u8, bytes, self.blob_a) or std.mem.eql(u8, bytes, self.blob_b); + if (!matches) _ = self.bad_reads.fetchAdd(1, .monotonic); + _ = self.completed_reads.fetchAdd(1, .monotonic); + } + } + }; + + var ctx = Ctx{ + .store = &store, + .blob_a = blob_a, + .blob_b = blob_b, + .stop = std.atomic.Value(bool).init(false), + .bad_reads = std.atomic.Value(u32).init(0), + .completed_reads = std.atomic.Value(u32).init(0), + }; + + var writer_thread = try std.Thread.spawn(.{}, Ctx.writer, .{&ctx}); + var reader_threads: [3]std.Thread = undefined; + for (&reader_threads) |*t| t.* = try std.Thread.spawn(.{}, Ctx.reader, .{&ctx}); + + // Run the stress for a fixed duration. 200ms is plenty for thousands + // of iterations on any reasonable machine — enough to reliably catch + // a non-atomic write window at the scheduler granularity. + std.Thread.sleep(200 * std.time.ns_per_ms); + ctx.stop.store(true, .release); + + writer_thread.join(); + for (&reader_threads) |*t| t.join(); + + // The critical assertion: zero reads landed in a partial state. + const bad = ctx.bad_reads.load(.monotonic); + const total = ctx.completed_reads.load(.monotonic); + + // Sanity: we actually did work (otherwise the test is a no-op). + try testing.expect(total > 0); + try testing.expectEqual(@as(u32, 0), bad); +} + +test "appendRaw atomicity: concurrent readers see either pre- or post-append, never mid" { + // Seed an initial file with a complete SRF doc, then have one thread + // append more records repeatedly while readers race to read it. Every + // successful read must parse cleanly and have a valid termination + // — no trailing partial record from an in-flight append. + const seed = + \\#!srfv1 + \\#!created=1777000000 + \\symbol::AAPL,date::2024-04-30,actual:num:1.53 + \\ + ; + const chunk = + \\symbol::AAPL,date::2024-07-30,actual:num:1.65 + \\ + ; + + var tmp = std.testing.tmpDir(.{}); + defer tmp.cleanup(); + + const dir_path = try tmp.dir.realpathAlloc(testing.allocator, "."); + defer testing.allocator.free(dir_path); + + var thread_safe: std.heap.ThreadSafeAllocator = .{ .child_allocator = testing.allocator }; + const alloc = thread_safe.allocator(); + + var store = Store.init(alloc, dir_path); + try store.ensureSymbolDir("SYM"); + + // Write seed atomically up front. + try store.writeRaw("SYM", .candles_daily, seed); + + const Ctx = struct { + store: *Store, + chunk: []const u8, + stop: std.atomic.Value(bool), + bad_reads: std.atomic.Value(u32), + completed_reads: std.atomic.Value(u32), + + fn appender(self: *@This()) void { + while (!self.stop.load(.acquire)) { + self.store.appendRaw("SYM", .candles_daily, self.chunk) catch {}; + } + } + + fn reader(self: *@This()) void { + const path = self.store.symbolPath("SYM", DataType.candles_daily.fileName()) catch return; + defer self.store.allocator.free(path); + + while (!self.stop.load(.acquire)) { + const bytes = std.fs.cwd().readFileAlloc(self.store.allocator, path, 4 * 1024 * 1024) catch continue; + defer self.store.allocator.free(bytes); + + // Invariants for a valid appended file: + // 1. Starts with `#!srfv1\n` (the header is never torn). + // 2. Ends with `\n` (no partial trailing record). + // 3. Total length is `seed.len + k * chunk.len` for some + // integer k ≥ 0 — which, with atomic appends, is the + // only observable shape. + const ok_header = std.mem.startsWith(u8, bytes, "#!srfv1\n"); + const ok_tail = bytes.len > 0 and bytes[bytes.len - 1] == '\n'; + if (!ok_header or !ok_tail) { + _ = self.bad_reads.fetchAdd(1, .monotonic); + } + _ = self.completed_reads.fetchAdd(1, .monotonic); + } + } + }; + + var ctx = Ctx{ + .store = &store, + .chunk = chunk, + .stop = std.atomic.Value(bool).init(false), + .bad_reads = std.atomic.Value(u32).init(0), + .completed_reads = std.atomic.Value(u32).init(0), + }; + + var appender_thread = try std.Thread.spawn(.{}, Ctx.appender, .{&ctx}); + var reader_threads: [3]std.Thread = undefined; + for (&reader_threads) |*t| t.* = try std.Thread.spawn(.{}, Ctx.reader, .{&ctx}); + + std.Thread.sleep(200 * std.time.ns_per_ms); + ctx.stop.store(true, .release); + + appender_thread.join(); + for (&reader_threads) |*t| t.join(); + + const bad = ctx.bad_reads.load(.monotonic); + const total = ctx.completed_reads.load(.monotonic); + try testing.expect(total > 0); + try testing.expectEqual(@as(u32, 0), bad); +} diff --git a/src/service.zig b/src/service.zig index a0e238f..36c585e 100644 --- a/src/service.zig +++ b/src/service.zig @@ -9,6 +9,7 @@ const std = @import("std"); const log = std.log.scoped(.service); + const Date = @import("models/date.zig").Date; const Candle = @import("models/candle.zig").Candle; const Dividend = @import("models/dividend.zig").Dividend; @@ -33,6 +34,7 @@ const Tiingo = @import("providers/tiingo.zig").Tiingo; const fmt = @import("format.zig"); const performance = @import("analytics/performance.zig"); const http = @import("net/http.zig"); +const atomic = @import("atomic.zig"); pub const DataError = error{ NoApiKey, @@ -1296,6 +1298,11 @@ pub const DataService = struct { } /// Append a CUSIP->ticker mapping to the cache file. + /// + /// Implemented as read-append-atomic-write (rather than a direct + /// open-for-append) so a concurrent reader never sees a file with a + /// valid header plus partial trailing record. See `cache/store.zig + /// appendRaw` for the same pattern and rationale. pub fn cacheCusipTicker(self: *DataService, cusip: []const u8, ticker: []const u8) void { const path = std.fs.path.join(self.allocator(), &.{ self.config.cache_dir, "cusip_tickers.srf" }) catch return; defer self.allocator().free(path); @@ -1305,20 +1312,31 @@ pub const DataService = struct { std.fs.cwd().makePath(dir) catch {}; } - // Open existing (append) or create new (with header) - var emit_directives = false; - const file = std.fs.cwd().openFile(path, .{ .mode = .write_only }) catch blk: { - emit_directives = true; - break :blk std.fs.cwd().createFile(path, .{}) catch return; + // Read existing cache if present. + const existing = std.fs.cwd().readFileAlloc(self.allocator(), path, 4 * 1024 * 1024) catch |err| switch (err) { + error.FileNotFound => @as([]u8, &.{}), + else => return, }; - defer file.close(); - if (!emit_directives) file.seekFromEnd(0) catch {}; + const owns_existing = existing.len > 0; + defer if (owns_existing) self.allocator().free(existing); + // Serialize the new entry (with `#!srfv1` directives only if the + // cache file doesn't exist yet). + const emit_directives = !owns_existing; const entry = [_]CusipEntry{.{ .cusip = cusip, .ticker = ticker }}; - var buf: [256]u8 = undefined; - var writer = file.writer(&buf); - writer.interface.print("{f}", .{srf.fmtFrom(CusipEntry, self.allocator(), &entry, .{ .emit_directives = emit_directives })}) catch return; - writer.interface.flush() catch {}; + var aw: std.Io.Writer.Allocating = .init(self.allocator()); + defer aw.deinit(); + aw.writer.print("{f}", .{srf.fmtFrom(CusipEntry, self.allocator(), &entry, .{ .emit_directives = emit_directives })}) catch return; + const encoded = aw.writer.buffered(); + if (encoded.len == 0) return; + + // Concat existing + new, then atomic-write. + const combined = self.allocator().alloc(u8, existing.len + encoded.len) catch return; + defer self.allocator().free(combined); + @memcpy(combined[0..existing.len], existing); + @memcpy(combined[existing.len..], encoded); + + atomic.writeFileAtomic(self.allocator(), path, combined) catch {}; } // ── Utility ──────────────────────────────────────────────────