From 5be44c911f535b6f787340bf36a874214ddc2e7f Mon Sep 17 00:00:00 2001 From: Emil Lerch Date: Fri, 12 May 2023 17:10:30 -0700 Subject: [PATCH] get all response body data from executor without allocations in core --- build.zig | 1 + src/main-lib.zig | 34 ++++++++-- src/main.zig | 160 +++++++++++++++++++++++++++++++++++------------ 3 files changed, 150 insertions(+), 45 deletions(-) diff --git a/build.zig b/build.zig index 946a12d..b642927 100644 --- a/build.zig +++ b/build.zig @@ -34,6 +34,7 @@ pub fn build(b: *std.Build) void { .target = target, .optimize = optimize, }); + lib.linkLibC(); // This declares intent for the executable to be installed into the // standard location when the user invokes the "install" step (the default diff --git a/src/main-lib.zig b/src/main-lib.zig index 120938f..676b41a 100644 --- a/src/main-lib.zig +++ b/src/main-lib.zig @@ -1,12 +1,34 @@ const std = @import("std"); const testing = std.testing; -export fn serve() void { - const stdout_file = std.io.getStdOut().writer(); - var bw = std.io.bufferedWriter(stdout_file); - const stdout = bw.writer(); - stdout.print(" 2 ", .{}) catch unreachable; - bw.flush() catch unreachable; +const Response = extern struct { + ptr: [*]u8, + len: usize, +}; + +var child_allocator = std.heap.raw_c_allocator; // raw allocator recommended for use in arenas +var arena: std.heap.ArenaAllocator = undefined; + +export fn handle_request() *Response { + arena = std.heap.ArenaAllocator.init(child_allocator); + var allocator = arena.allocator(); + var al = std.ArrayList(u8).init(allocator); + var writer = al.writer(); + + writer.print(" 2.", .{}) catch unreachable; + + // Cannot simply return &Blah. Need to assign to var first + var rc = &Response{ + .ptr = al.items.ptr, + .len = al.items.len, + }; + return rc; +} + +/// having request_deinit allows for a general deinit as well +export fn request_deinit() void { + std.log.debug("deinit", .{}); + arena.deinit(); } export fn add(a: i32, b: i32) i32 { diff --git a/src/main.zig b/src/main.zig index 71c09c7..e416183 100644 --- a/src/main.zig +++ b/src/main.zig @@ -2,16 +2,29 @@ const std = @import("std"); const builtin = @import("builtin"); const Watch = @import("Watch.zig"); -const serveFn = *const fn () void; +const serveFn = *const fn () *ServeReturn; +const requestDeinitFn = *const fn () void; const timeout = 250; +const ServeReturn = extern struct { + ptr: [*]u8, + len: usize, +}; + +const FullReturn = struct { + response: []u8, + executor: *Executor, +}; + const Executor = struct { path: [:0]const u8, library: ?*anyopaque = null, serveFn: ?serveFn = null, + requestDeinitFn: ?requestDeinitFn = null, watch: ?usize = null, reload_lock: bool = false, + in_request_lock: bool = false, }; var executors = [_]Executor{ @@ -30,30 +43,58 @@ pub const std_options = struct { .{ .scope = .watch, .level = .info }, }; }; -const SERVE_FN_NAME = "serve"; +const SERVE_FN_NAME = "handle_request"; const PORT = 8069; -fn serve() !void { +// TODO: writer as anytype is not going to survive across library boundaries... +fn serve(allocator: std.mem.Allocator, response: *std.http.Server.Response) !*FullReturn { + var null_server = std.http.Server.init(allocator, .{}); + defer null_server.deinit(); + var data: [14]u8 = @constCast(&[_]u8{0x00} ** 14).*; + var child_response = std.http.Server.Response{ + .server = &null_server, + .request = response.request, + .connection = .{ + .conn = .{ + .stream = .{ .handle = 0 }, + .protocol = .plain, + }, + }, + .address = .{ .any = .{ + .data = data, + .family = 0, + } }, + .headers = response.headers, + }; + _ = child_response; // if (some path routing thing) { + // TODO: Get request body into executor + // TODO: Get headers back from executor + // TODO: Get request headers into executor + const executor = try getExecutor(0); + executor.in_request_lock = true; + errdefer executor.in_request_lock = false; + // Call external library + var serve_result = executor.serveFn.?(); - (try getExecutor(0))(); - // if (inx > 4) { - // if (inx % 2 == 0) - // (try getExecutor(0))() - // else - // (try getExecutor(1))(); - // } + // Deal with results + var slice: []u8 = serve_result.ptr[0..serve_result.len]; + var rc = &FullReturn{ + .executor = executor, + .response = slice, + }; + return rc; } -fn getExecutor(key: usize) !serveFn { +fn getExecutor(key: usize) !*Executor { var executor = &executors[key]; - if (executor.serveFn) |s| return s; + if (executor.serveFn != null) return executor; executor.library = blk: { if (executor.library) |l| break :blk l; while (executor.reload_lock) // system is reloading the library - std.time.sleep(1); + std.time.sleep(1 * std.time.ns_per_ms / 2); if (executor.library) |l| // check again to see where we are at break :blk l; @@ -71,9 +112,15 @@ fn getExecutor(key: usize) !serveFn { if (serve_fn == null) return error.CouldNotLoadSymbolServe; executor.serveFn = @ptrCast(serveFn, serve_fn.?); - return executor.serveFn.?; + loadOptionalSymbols(executor); + return executor; } +fn loadOptionalSymbols(executor: *Executor) void { + if (std.c.dlsym(executor.library.?, "request_deinit")) |s| { + executor.requestDeinitFn = @ptrCast(requestDeinitFn, s); + } +} fn executorChanged(watch: usize) void { // NOTE: This will be called off the main thread log.debug("executor with watch {d} changed", .{watch}); @@ -81,6 +128,8 @@ fn executorChanged(watch: usize) void { if (executor.watch) |w| { if (w == watch) { if (executor.library) |l| { + while (executor.in_request_lock) + std.time.sleep(1 * std.time.ns_per_ms / 2); executor.reload_lock = true; defer executor.reload_lock = false; @@ -91,13 +140,15 @@ fn executorChanged(watch: usize) void { log.warn("could not reload! error opening library", .{}); return; }; - executor.serveFn = @ptrCast(serveFn, std.c.dlsym(executor.library.?, SERVE_FN_NAME)); - if (executor.serveFn == null) { + var symbol = std.c.dlsym(executor.library.?, SERVE_FN_NAME); + if (symbol == null) { log.warn("could not reload! error finding symbol", .{}); if (std.c.dlclose(executor.library.?) != 0) @panic("System unstable: Error after library open and cannot close"); return; } + executor.serveFn = @ptrCast(serveFn, symbol); + loadOptionalSymbols(executor); } } } @@ -164,37 +215,68 @@ pub fn main() !void { log.info("listening on port: {d}", .{server_port}); if (builtin.os.tag == .linux) log.info("pid: {d}", .{std.os.linux.getpid()}); - // install signal handler - var act = std.os.Sigaction{ - .handler = .{ .sigaction = exitApplication }, - .mask = std.os.empty_sigset, - .flags = (std.os.SA.SIGINFO | std.os.SA.RESTART | std.os.SA.RESETHAND), - }; - try std.os.sigaction(std.os.SIG.INT, &act, null); - try std.os.sigaction(std.os.SIG.TERM, &act, null); + try installSignalHandler(); while (true) { var arena = std.heap.ArenaAllocator.init(std.heap.c_allocator); defer arena.deinit(); - const res = try server.accept(.{ .dynamic = max_header_size }); - defer res.deinit(); - defer res.reset(); - try res.wait(); - const server_body: []const u8 = "message from server!\n"; - res.transfer_encoding = .{ .content_length = server_body.len }; - try res.headers.append("content-type", "text/plain"); - try res.headers.append("connection", "close"); - try res.do(); - - var buf: [128]u8 = undefined; - const n = try res.readAll(&buf); - _ = n; - _ = try res.writer().writeAll(server_body); - try res.finish(); + processRequest(arena.allocator(), &server) catch |e| { + log.err("Unexpected error processing request: {any}", .{e}); + if (@errorReturnTrace()) |trace| { + std.debug.dumpStackTrace(trace.*); + } + }; } } +fn processRequest(allocator: std.mem.Allocator, server: *std.http.Server) !void { + const max_header_size = 8192; + const res = try server.accept(.{ .dynamic = max_header_size }); + defer res.deinit(); + defer res.reset(); + try res.wait(); + + // TODO: deal with this + var buf: [1024]u8 = undefined; + const n = try res.readAll(&buf); + _ = n; + + // TODO: we need to also have a defer statement to deinit whatever happens + // with the executor library. This will also add a race condition where + // we could have a memory leak if the executor reloads in the middle of a + // request. We may want to add a new spinlock on the reload thread to + // avoid reloading in the middle of a request, which would be generally + // bad anyway + const errstr = "Internal Server Error\n"; + var errbuf: [errstr.len]u8 = undefined; + var response_bytes = try std.fmt.bufPrint(&errbuf, errstr, .{}); + + var full_response = serve(allocator, res) catch |e| brk: { + res.status = .internal_server_error; + // TODO: more about this particular request + log.err("Unexpected error from executor processing request: {any}", .{e}); + if (@errorReturnTrace()) |trace| { + std.debug.dumpStackTrace(trace.*); + } + break :brk null; + }; + defer { + if (full_response) |f| { + if (f.executor.requestDeinitFn) |d| d(); + f.executor.in_request_lock = false; + } + } + if (full_response) |f| + response_bytes = f.response; + res.transfer_encoding = .{ .content_length = response_bytes.len }; + try res.headers.append("content-type", "text/plain"); + try res.headers.append("connection", "close"); + try res.do(); + _ = try res.writer().writeAll(response_bytes); + try res.finish(); +} + test { // To run nested container tests, either, call `refAllDecls` which will // reference all declarations located in the given argument.