diff --git a/src/interface.zig b/src/interface.zig index 7cb9ded..7e6e6b5 100644 --- a/src/interface.zig +++ b/src/interface.zig @@ -16,7 +16,7 @@ pub const Response = extern struct { }; pub const Request = extern struct { - method: [*]u8, + method: [*:0]u8, method_len: usize, content: [*]u8, diff --git a/src/main-lib.zig b/src/main-lib.zig index 97bfe78..0367431 100644 --- a/src/main-lib.zig +++ b/src/main-lib.zig @@ -3,23 +3,28 @@ const interface = @import("interface.zig"); const testing = std.testing; const log = std.log.scoped(.@"main-lib"); -var child_allocator = std.heap.raw_c_allocator; // raw allocator recommended for use in arenas -var arena: std.heap.ArenaAllocator = undefined; +var allocator: ?*std.mem.Allocator = null; const Response = struct { body: *std.ArrayList(u8), headers: *std.StringHashMap([]const u8), }; +/// This function is optional and can be exported by zig libraries for +/// initialization. If exported, it will be called once in the beginning of +/// a request and will be provided a pointer to std.mem.Allocator, which is +/// useful for reusing the parent allocator +export fn zigInit(parent_allocator: *anyopaque) void { + allocator = @ptrCast(*std.mem.Allocator, @alignCast(@alignOf(*std.mem.Allocator), parent_allocator)); +} export fn handle_request() ?*interface.Response { - arena = std.heap.ArenaAllocator.init(child_allocator); - var allocator = arena.allocator(); + var alloc = if (allocator) |a| a.* else @panic("zigInit not called prior to handle_request. This is a coding error"); // setup response body - var response = std.ArrayList(u8).init(allocator); + var response = std.ArrayList(u8).init(alloc); // setup headers - var headers = std.StringHashMap([]const u8).init(allocator); + var headers = std.StringHashMap([]const u8).init(alloc); handleRequest(.{ .body = &response, .headers = &headers, @@ -34,10 +39,10 @@ export fn handle_request() ?*interface.Response { log.debug("response ptr: {*}", .{response.items.ptr}); // Marshall data back for handling by server - var rc = allocator.create(interface.Response) catch @panic("OOM"); + var rc = alloc.create(interface.Response) catch @panic("OOM"); rc.ptr = response.items.ptr; rc.len = response.items.len; - rc.headers = interface.toHeaders(allocator, headers) catch |e| { + rc.headers = interface.toHeaders(alloc, headers) catch |e| { log.err("Unexpected error processing request: {any}", .{e}); if (@errorReturnTrace()) |trace| { std.debug.dumpStackTrace(trace.*); @@ -48,10 +53,10 @@ export fn handle_request() ?*interface.Response { return rc; } -/// having request_deinit allows for a general deinit as well -export fn request_deinit() void { - arena.deinit(); -} +/// request_deinit is an optional export and will be called a the end of the +/// request. Useful for deallocating memory +// export fn request_deinit() void { +// } // ************************************************************************ // Boilerplate ^^, Custom code below @@ -69,8 +74,10 @@ fn handleRequest(response: Response) !void { } test "handle_request" { - defer request_deinit(); - child_allocator = std.testing.allocator; + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); + defer arena.deinit(); + var aa = arena.allocator(); + allocator = &aa; const response = handle_request().?; try testing.expectEqualStrings(" 2.", response.ptr[0..response.len]); try testing.expectEqualStrings("X-custom-foo", response.headers[0].name_ptr[0..response.headers[0].name_len]); diff --git a/src/main.zig b/src/main.zig index 19480d1..28a9a98 100644 --- a/src/main.zig +++ b/src/main.zig @@ -2,7 +2,8 @@ const std = @import("std"); const builtin = @import("builtin"); const interface = @import("interface.zig"); const Watch = @import("Watch.zig"); -const serveFn = *const fn () ?*interface.Response; +const serveFn = *const fn (*interface.Request) ?*interface.Response; +const zigInitFn = *const fn (*anyopaque) void; const requestDeinitFn = *const fn () void; const timeout = 250; @@ -19,6 +20,7 @@ const Executor = struct { // fields used at runtime to do real work library: ?*anyopaque = null, serveFn: ?serveFn = null, + zigInitFn: ?zigInitFn = null, requestDeinitFn: ?requestDeinitFn = null, // fields used for internal accounting @@ -36,6 +38,7 @@ var executors = [_]Executor{ var watcher = Watch.init(executorChanged); var watcher_thread: ?std.Thread = null; +var timer: ?std.time.Timer = null; // timer used by processRequest const log = std.log.scoped(.main); pub const std_options = struct { @@ -48,26 +51,33 @@ pub const std_options = struct { const SERVE_FN_NAME = "handle_request"; const PORT = 8069; -fn serve(allocator: std.mem.Allocator, response: *std.http.Server.Response) !*FullReturn { - // pub const Request = extern struct { - // method: [*]u8, - // method_len: usize, - // - // content: [*]u8, - // content_len: usize, - // - // headers: [*]Header, - // headers_len: usize, - // }; +fn serve(allocator: *std.mem.Allocator, response: *std.http.Server.Response) !*FullReturn { // if (some path routing thing) { - // TODO: Get request body into executor - // TODO: Get headers back from executor - // TODO: Get request headers into executor const executor = try getExecutor(0); + if (executor.zigInitFn) |f| + f(allocator); + executor.in_request_lock = true; errdefer executor.in_request_lock = false; // Call external library - var serve_result = executor.serveFn.?().?; // ok for this pointer deref to fail + const method_tag = @tagName(response.request.method); + const headers = try toHeaders(allocator.*, response.request.headers); + var request_content: []u8 = &[_]u8{}; + if (response.request.content_length) |l| { + request_content = try response.reader().readAllAlloc(allocator.*, @as(usize, l)); + } + log.debug("{d} bytes read from request", .{request_content.len}); + var request = interface.Request{ + .method = @constCast(method_tag[0..].ptr), + .method_len = method_tag.len, + + .headers = headers, + .headers_len = response.request.headers.list.items.len, + + .content = request_content.ptr, + .content_len = request_content.len, + }; + var serve_result = executor.serveFn.?(&request).?; // ok for this pointer deref to fail log.debug("target: {s}", .{response.request.target}); log.warn("response ptr: {*}", .{serve_result.ptr}); // BUG: This works in tests, but does not when compiled (even debug mode) var slice: []u8 = serve_result.ptr[0..serve_result.len]; @@ -77,14 +87,13 @@ fn serve(allocator: std.mem.Allocator, response: *std.http.Server.Response) !*Fu var content_type_added = false; for (0..serve_result.headers_len) |inx| { const head = serve_result.headers[inx]; - // head.name_ptr[0..head.name_len], try response.headers.append( head.name_ptr[0..head.name_len], head.value_ptr[0..head.value_len], ); - // TODO: are these headers case insensitive? - content_type_added = std.mem.eql(u8, head.name_ptr[0..head.name_len], "content-type"); + // headers are case insensitive + content_type_added = std.ascii.eqlIgnoreCase(head.name_ptr[0..head.name_len], "content-type"); } if (!content_type_added) try response.headers.append("content-type", "text/plain"); @@ -129,6 +138,9 @@ fn loadOptionalSymbols(executor: *Executor) void { if (std.c.dlsym(executor.library.?, "request_deinit")) |s| { executor.requestDeinitFn = @ptrCast(requestDeinitFn, s); } + if (std.c.dlsym(executor.library.?, "zigInit")) |s| { + executor.zigInitFn = @ptrCast(zigInitFn, s); + } } fn executorChanged(watch: usize) void { // NOTE: This will be called off the main thread @@ -240,31 +252,53 @@ pub fn main() !void { log.info("pid: {d}", .{std.os.linux.getpid()}); try installSignalHandler(); + var arena = std.heap.ArenaAllocator.init(allocator); + defer arena.deinit(); + var aa = arena.allocator(); + const bytes_preallocated = try preWarmArena(aa, &arena, 1); while (true) { - var arena = std.heap.ArenaAllocator.init(allocator); - defer arena.deinit(); + // TODO: Learn what is typical and change this to .retain_with_limit = + defer { + if (!arena.reset(.{ .retain_capacity = {} })) { + // reallocation failed, arena is degraded + log.warn("Arena reset failed and is degraded. Resetting arena", .{}); + arena.deinit(); + arena = std.heap.ArenaAllocator.init(allocator); + aa = arena.allocator(); + } + } - processRequest(arena.allocator(), &server) catch |e| { + processRequest(&aa, &server, stdout) catch |e| { log.err("Unexpected error processing request: {any}", .{e}); if (@errorReturnTrace()) |trace| { std.debug.dumpStackTrace(trace.*); } }; + try stdout.print(" (pre-alloc: {}, alloc: {})\n", .{ bytes_preallocated, arena.queryCapacity() }); + try bw.flush(); } } -fn processRequest(allocator: std.mem.Allocator, server: *std.http.Server) !void { +fn processRequest(allocator: *std.mem.Allocator, server: *std.http.Server, writer: anytype) !void { const max_header_size = 8192; + if (timer == null) timer = try std.time.Timer.start(); + var tm = timer.?; const res = try server.accept(.{ .dynamic = max_header_size }); defer res.deinit(); defer res.reset(); - try res.wait(); - - // TODO: deal with this - var buf: [1024]u8 = undefined; - const n = try res.readAll(&buf); - _ = n; + try res.wait(); // wait for client to send a complete request head + // I believe it's fair to start our timer after this is done + tm.reset(); + // This is an nginx log: + // git.lerch.org 50.39.111.175 - - [16/May/2023:02:56:31 +0000] "POST /api/actions/runner.v1.RunnerService/FetchTask HTTP/2.0" 200 0 "-" "connect-go/1.2.0-dev (go1.20.1)" "172.20.0.5:3000" + // TODO: replicate this + try writer.print("{} - - \"{s} {s} {s}\"", .{ + res.address, + @tagName(res.request.method), + res.request.target, + @tagName(res.request.version), + }); // TODO: we need to also have a defer statement to deinit whatever happens // with the executor library. This will also add a race condition where // we could have a memory leak if the executor reloads in the middle of a @@ -294,12 +328,48 @@ fn processRequest(allocator: std.mem.Allocator, server: *std.http.Server) !void response_bytes = f.response; res.transfer_encoding = .{ .content_length = response_bytes.len }; try res.headers.append("connection", "close"); + try writer.print(" {d} ttfb {d:.3}ms", .{ @enumToInt(res.status), @intToFloat(f64, tm.read()) / std.time.ns_per_ms }); if (builtin.is_test) writeToTestBuffers(response_bytes, res); try res.do(); _ = try res.writer().writeAll(response_bytes); try res.finish(); + try writer.print(" {d} ttlb {d:.3}ms", .{ + response_bytes.len, + @intToFloat(f64, tm.read()) / std.time.ns_per_ms, + }); } +fn toHeaders(allocator: std.mem.Allocator, headers: std.http.Headers) ![*]interface.Header { + var header_array = try std.ArrayList(interface.Header).initCapacity(allocator, headers.list.items.len); + for (headers.list.items) |kv| { + header_array.appendAssumeCapacity(.{ + .name_ptr = @constCast(kv.name).ptr, + .name_len = kv.name.len, + + .value_ptr = @constCast(kv.value).ptr, + .value_len = kv.value.len, + }); + } + return header_array.items.ptr; +} + +/// Allocates at least preallocated_kb kilobytes of ram for usage. Some overhead +/// will mean that more +fn preWarmArena(aa: std.mem.Allocator, arena: *std.heap.ArenaAllocator, preallocated_kb: usize) !usize { + if (preallocated_kb == 0) return 0; + // capacity 0 at this point + const warm_array = try aa.alloc(u8, 1024 * preallocated_kb); // after this, we are at 1569 (545 extra) + aa.free(warm_array); + log.debug( + "allocator preallocation. Limiting to: {d} bytes", + .{(arena.queryCapacity() + @as(usize, 1023)) / @as(usize, 1024) * 1024}, + ); + if (!arena.reset(.{ .retain_with_limit = (arena.queryCapacity() + @as(usize, 1023)) / @as(usize, 1024) * 1024 })) + log.warn("arena reset failed, arena degraded", .{}); + var bytes_allocated = arena.queryCapacity(); + log.debug("preallocated {d} bytes", .{bytes_allocated}); + return bytes_allocated; +} fn writeToTestBuffers(response: []const u8, res: *std.http.Server.Response) void { _ = res; log.debug("writing to test buffers", .{}); @@ -325,10 +395,17 @@ fn testRequest(request_bytes: []const u8) !void { try server.listen(address); const server_port = server.socket.listen_address.in.getPort(); + var al = std.ArrayList(u8).init(allocator); + defer al.deinit(); + var writer = al.writer(); + var aa = arena.allocator(); + var bytes_allocated: usize = 0; + // pre-warm + bytes_allocated = try preWarmArena(aa, &arena, 1); const server_thread = try std.Thread.spawn( .{}, processRequest, - .{ arena.allocator(), &server }, + .{ &aa, &server, writer }, ); const stream = try std.net.tcpConnectToHost(allocator, "127.0.0.1", server_port); @@ -336,6 +413,8 @@ fn testRequest(request_bytes: []const u8) !void { _ = try stream.writeAll(request_bytes[0..]); server_thread.join(); + log.debug("Bytes allocated during request: {d}", .{arena.queryCapacity() - bytes_allocated}); + log.debug("Stdout: {s}", .{al.items}); } fn testGet(comptime path: []const u8) !void {