multiple worker threads - first cut

This commit is contained in:
Emil Lerch 2023-06-03 08:31:24 -07:00
parent 7b271a0698
commit eaef2c6ddc
Signed by: lobo
GPG Key ID: A7B62D657EF764F8

View File

@ -48,8 +48,9 @@ const Executor = struct {
// fields used for internal accounting
watch: ?usize = null,
reload_lock: bool = false,
in_request_lock: bool = false,
drain_in_progress: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
load_in_progress: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
requests_in_flight: std.atomic.Atomic(usize) = std.atomic.Atomic(usize).init(0),
};
const SERVE_FN_NAME = "handle_request";
@ -63,11 +64,10 @@ var parsed_config: config.ParsedConfig = undefined;
/// interface, calls the executor and marshalls data back
fn serve(allocator: *std.mem.Allocator, response: *std.http.Server.Response) !*FullReturn {
const executor = try getExecutor(response.request.target, response.request.headers);
errdefer _ = executor.requests_in_flight.fetchSub(1, .Monotonic);
if (executor.zigInitFn) |f|
f(allocator);
executor.in_request_lock = true;
errdefer executor.in_request_lock = false;
// Call external library
const method_tag = @tagName(response.request.method);
const headers = try toHeaders(allocator.*, response.request.headers);
@ -124,17 +124,35 @@ fn getExecutor(requested_path: []const u8, headers: std.http.Headers) !*Executor
log.err("Could not find executor for target path '{s}'", .{requested_path});
return error.NoApplicableExecutor;
};
while (executor.drain_in_progress.load(.Acquire)) {
// we need to stand down and stand by
std.atomic.spinLoopHint(); // Let CPU know we're just hanging out
std.time.sleep(1 * std.time.ns_per_ms / 2);
}
// Tell everyone we're about to use this bad boy
// While requests_in_flight is >= 0, nobody should unload the library
_ = executor.requests_in_flight.fetchAdd(1, .Acquire);
errdefer _ = executor.requests_in_flight.fetchSub(1, .Release);
if (executor.serveFn != null) return executor;
executor.library = blk: {
if (executor.library) |l|
break :blk l;
while (executor.reload_lock) // system is reloading the library
// If the library is being reloaded and a bunch of requests come in,
// we could have multiple threads racing to load
// NOTE: I am not confident of the memory ordering here on tryCompareAndSwap
while (executor.load_in_progress.tryCompareAndSwap(false, true, .Acquire, .Acquire)) |_| {
// we need to stand down and stand by
std.atomic.spinLoopHint(); // Let CPU know we're just hanging out
std.time.sleep(1 * std.time.ns_per_ms / 2);
}
// we have the conch...lock others out
defer executor.load_in_progress.store(false, .Release);
if (executor.library) |l| // check again to see where we are at
break :blk l;
if (executor.library) |l|
break :blk l; // someone beat us to the race..our defer above will take care of unlocking
log.info("library {s} requested but not loaded. Loading library", .{executor.path});
const l = try dlopen(executor.path);
@ -211,10 +229,12 @@ fn executorChanged(watch: usize) void {
if (executor.watch) |w| {
if (w == watch) {
if (executor.library) |l| {
while (executor.in_request_lock)
executor.drain_in_progress.store(true, .Release);
defer executor.drain_in_progress.store(false, .Release);
while (executor.requests_in_flight.load(.Acquire) > 0) {
std.atomic.spinLoopHint();
std.time.sleep(1 * std.time.ns_per_ms / 2);
executor.reload_lock = true;
defer executor.reload_lock = false;
}
if (std.c.dlclose(l) != 0)
@panic("System unstable: Error after library open and cannot close");
@ -323,9 +343,9 @@ pub fn main() !void {
// stdout is for the actual output of your application, for example if you
// are implementing gzip, then only the compressed bytes should be sent to
// stdout, not any debugging messages.
const stdout_file = std.io.getStdOut().writer();
var bw = std.io.bufferedWriter(stdout_file);
const stdout = bw.writer();
// const stdout_file = std.io.getStdOut().writer();
// var bw = std.io.bufferedWriter(stdout_file);
// const stdout = bw.writer();
var allocator = std.heap.raw_c_allocator; // raw allocator recommended for use in arenas
executors = try loadConfig(allocator);
@ -345,6 +365,28 @@ pub fn main() !void {
log.info("pid: {d}", .{std.os.linux.getpid()});
try installSignalHandler();
// Set up thread pool
const server_thread = try std.Thread.spawn(
.{},
threadMain,
.{ allocator, &server },
);
const server_thread2 = try std.Thread.spawn(
.{},
threadMain,
.{ allocator, &server },
);
// main thread will no longer do anything
std.time.sleep(std.math.maxInt(u64));
server_thread.join();
server_thread2.join();
}
fn threadMain(allocator: std.mem.Allocator, server: *std.http.Server) !void {
// TODO: If we're in a thread pool we need to be careful with this...
const stdout_file = std.io.getStdOut().writer();
var bw = std.io.bufferedWriter(stdout_file);
const stdout = bw.writer();
var arena = std.heap.ArenaAllocator.init(allocator);
defer arena.deinit();
var aa = arena.allocator();
@ -360,7 +402,7 @@ pub fn main() !void {
}
}
processRequest(&aa, &server, stdout) catch |e| {
processRequest(&aa, server, stdout) catch |e| {
log.err("Unexpected error processing request: {any}", .{e});
if (@errorReturnTrace()) |trace| {
std.debug.dumpStackTrace(trace.*);
@ -370,6 +412,7 @@ pub fn main() !void {
try bw.flush();
}
}
fn loadConfig(allocator: std.mem.Allocator) ![]Executor {
log.info("loading config", .{});
// We will not watch this file - let it reload on SIGHUP
@ -424,9 +467,16 @@ fn processRequest(allocator: *std.mem.Allocator, server: *std.http.Server, write
break :brk null;
};
defer {
// The call above converts uncaught errors to a null
// In request counter gets incremented during getExecutor
// Caught errors will get our in_flight counter decremented (see serve fn)
// - we cannot do this here because we may not have an executor at all
// This leaves this defer block as the only place we can/should decrement
// under normal conditions
if (full_response) |f| {
if (f.executor.requestDeinitFn) |d| d();
f.executor.in_request_lock = false;
_ = f.executor.requests_in_flight.fetchSub(1, .Release);
}
}
if (full_response) |f|