diff --git a/src/main.zig b/src/main.zig index 72c293c..095c885 100644 --- a/src/main.zig +++ b/src/main.zig @@ -48,8 +48,9 @@ const Executor = struct { // fields used for internal accounting watch: ?usize = null, - reload_lock: bool = false, - in_request_lock: bool = false, + drain_in_progress: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false), + load_in_progress: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false), + requests_in_flight: std.atomic.Atomic(usize) = std.atomic.Atomic(usize).init(0), }; const SERVE_FN_NAME = "handle_request"; @@ -63,11 +64,10 @@ var parsed_config: config.ParsedConfig = undefined; /// interface, calls the executor and marshalls data back fn serve(allocator: *std.mem.Allocator, response: *std.http.Server.Response) !*FullReturn { const executor = try getExecutor(response.request.target, response.request.headers); + errdefer _ = executor.requests_in_flight.fetchSub(1, .Monotonic); if (executor.zigInitFn) |f| f(allocator); - executor.in_request_lock = true; - errdefer executor.in_request_lock = false; // Call external library const method_tag = @tagName(response.request.method); const headers = try toHeaders(allocator.*, response.request.headers); @@ -124,17 +124,35 @@ fn getExecutor(requested_path: []const u8, headers: std.http.Headers) !*Executor log.err("Could not find executor for target path '{s}'", .{requested_path}); return error.NoApplicableExecutor; }; + while (executor.drain_in_progress.load(.Acquire)) { + // we need to stand down and stand by + std.atomic.spinLoopHint(); // Let CPU know we're just hanging out + std.time.sleep(1 * std.time.ns_per_ms / 2); + } + // Tell everyone we're about to use this bad boy + // While requests_in_flight is >= 0, nobody should unload the library + _ = executor.requests_in_flight.fetchAdd(1, .Acquire); + errdefer _ = executor.requests_in_flight.fetchSub(1, .Release); + if (executor.serveFn != null) return executor; executor.library = blk: { if (executor.library) |l| break :blk l; - while (executor.reload_lock) // system is reloading the library + // If the library is being reloaded and a bunch of requests come in, + // we could have multiple threads racing to load + // NOTE: I am not confident of the memory ordering here on tryCompareAndSwap + while (executor.load_in_progress.tryCompareAndSwap(false, true, .Acquire, .Acquire)) |_| { + // we need to stand down and stand by + std.atomic.spinLoopHint(); // Let CPU know we're just hanging out std.time.sleep(1 * std.time.ns_per_ms / 2); + } + // we have the conch...lock others out + defer executor.load_in_progress.store(false, .Release); - if (executor.library) |l| // check again to see where we are at - break :blk l; + if (executor.library) |l| + break :blk l; // someone beat us to the race..our defer above will take care of unlocking log.info("library {s} requested but not loaded. Loading library", .{executor.path}); const l = try dlopen(executor.path); @@ -211,10 +229,12 @@ fn executorChanged(watch: usize) void { if (executor.watch) |w| { if (w == watch) { if (executor.library) |l| { - while (executor.in_request_lock) + executor.drain_in_progress.store(true, .Release); + defer executor.drain_in_progress.store(false, .Release); + while (executor.requests_in_flight.load(.Acquire) > 0) { + std.atomic.spinLoopHint(); std.time.sleep(1 * std.time.ns_per_ms / 2); - executor.reload_lock = true; - defer executor.reload_lock = false; + } if (std.c.dlclose(l) != 0) @panic("System unstable: Error after library open and cannot close"); @@ -323,9 +343,9 @@ pub fn main() !void { // stdout is for the actual output of your application, for example if you // are implementing gzip, then only the compressed bytes should be sent to // stdout, not any debugging messages. - const stdout_file = std.io.getStdOut().writer(); - var bw = std.io.bufferedWriter(stdout_file); - const stdout = bw.writer(); + // const stdout_file = std.io.getStdOut().writer(); + // var bw = std.io.bufferedWriter(stdout_file); + // const stdout = bw.writer(); var allocator = std.heap.raw_c_allocator; // raw allocator recommended for use in arenas executors = try loadConfig(allocator); @@ -345,6 +365,28 @@ pub fn main() !void { log.info("pid: {d}", .{std.os.linux.getpid()}); try installSignalHandler(); + // Set up thread pool + const server_thread = try std.Thread.spawn( + .{}, + threadMain, + .{ allocator, &server }, + ); + const server_thread2 = try std.Thread.spawn( + .{}, + threadMain, + .{ allocator, &server }, + ); + // main thread will no longer do anything + std.time.sleep(std.math.maxInt(u64)); + server_thread.join(); + server_thread2.join(); +} + +fn threadMain(allocator: std.mem.Allocator, server: *std.http.Server) !void { + // TODO: If we're in a thread pool we need to be careful with this... + const stdout_file = std.io.getStdOut().writer(); + var bw = std.io.bufferedWriter(stdout_file); + const stdout = bw.writer(); var arena = std.heap.ArenaAllocator.init(allocator); defer arena.deinit(); var aa = arena.allocator(); @@ -360,7 +402,7 @@ pub fn main() !void { } } - processRequest(&aa, &server, stdout) catch |e| { + processRequest(&aa, server, stdout) catch |e| { log.err("Unexpected error processing request: {any}", .{e}); if (@errorReturnTrace()) |trace| { std.debug.dumpStackTrace(trace.*); @@ -370,6 +412,7 @@ pub fn main() !void { try bw.flush(); } } + fn loadConfig(allocator: std.mem.Allocator) ![]Executor { log.info("loading config", .{}); // We will not watch this file - let it reload on SIGHUP @@ -424,9 +467,16 @@ fn processRequest(allocator: *std.mem.Allocator, server: *std.http.Server, write break :brk null; }; defer { + // The call above converts uncaught errors to a null + // In request counter gets incremented during getExecutor + // Caught errors will get our in_flight counter decremented (see serve fn) + // - we cannot do this here because we may not have an executor at all + // This leaves this defer block as the only place we can/should decrement + // under normal conditions + if (full_response) |f| { if (f.executor.requestDeinitFn) |d| d(); - f.executor.in_request_lock = false; + _ = f.executor.requests_in_flight.fetchSub(1, .Release); } } if (full_response) |f|