diff --git a/build.zig b/build.zig index bc52910..4297c9a 100644 --- a/build.zig +++ b/build.zig @@ -27,8 +27,7 @@ pub fn build(b: *std.Build) void { exe.linkLibC(); _ = b.addModule("flexilib-interface", .{ - .source_file = .{ .path = "src/interface.zig" }, - .dependencies = &[_]std.build.ModuleDependency{}, + .root_source_file = b.path("src/interface.zig"), }); const lib = b.addSharedLibrary(.{ diff --git a/src/Watch.zig b/src/Watch.zig index 8bf19e3..b970e7a 100644 --- a/src/Watch.zig +++ b/src/Watch.zig @@ -12,13 +12,13 @@ const Wd = struct { }; fileChanged: *const fn (usize) void, -inotify_fd: ?std.os.fd_t = null, +inotify_fd: ?std.posix.fd_t = null, nfds_t: usize = 0, wds: [MAX_FDS]Wd = [_]Wd{.{}} ** MAX_FDS, dir_nfds_t: usize = 0, dir_wds: [MAX_FDS]Wd = [_]Wd{.{}} ** MAX_FDS, -control_socket: ?std.os.socket_t = null, +control_socket: ?std.posix.socket_t = null, watch_started: bool = false, sock_name: [:0]const u8, @@ -35,21 +35,14 @@ pub fn deinit(self: *Self) void { if (self.control_socket) |s| { // Sockets...where Unix still pretends everything is a file, but it's not... log.debug("closing control socket", .{}); - std.os.closeSocket(s); + std.posix.close(s); } if (self.inotify_fd) |fd| { for (0..self.nfds_t + self.dir_nfds_t) |inx| { const wd = if (inx < self.nfds_t) self.wds[inx].wd else self.dir_wds[inx - self.nfds_t].wd; - switch (std.os.errno(std.os.linux.inotify_rm_watch(fd, wd))) { - .SUCCESS => {}, - .BADF => unreachable, - // NOTE: Getting EINVAL, but the call looks valid to me? - // ...and wait...not all the time? - .INVAL => log.err("error removing watch (EINVAL). OS claims fd ({d}) or wd ({d}) is invalid", .{ self.inotify_fd.?, wd }), - else => unreachable, - } + std.posix.inotify_rm_watch(fd, wd); } - std.os.close(fd); + std.posix.close(fd); } const cwd = std.fs.cwd(); cwd.deleteFileZ(self.sock_name) catch |e| @@ -77,12 +70,12 @@ pub fn startWatch(self: *Self) void { while (true) { self.watch_started = true; - var fds = if (self.inotify_fd == null) - @constCast(&[_]std.os.pollfd{.{ .fd = self.control_socket.?, .events = std.os.POLL.IN, .revents = undefined }}) + const fds = if (self.inotify_fd == null) + @constCast(&[_]std.posix.pollfd{.{ .fd = self.control_socket.?, .events = std.posix.POLL.IN, .revents = undefined }}) else - @constCast(&[_]std.os.pollfd{ - .{ .fd = self.control_socket.?, .events = std.os.POLL.IN, .revents = undefined }, - .{ .fd = self.inotify_fd.?, .events = std.os.POLL.IN, .revents = undefined }, + @constCast(&[_]std.posix.pollfd{ + .{ .fd = self.control_socket.?, .events = std.posix.POLL.IN, .revents = undefined }, + .{ .fd = self.inotify_fd.?, .events = std.posix.POLL.IN, .revents = undefined }, }); const control_fd_inx = 0; @@ -97,11 +90,11 @@ pub fn startWatch(self: *Self) void { // std.fs.watch looks really good...but it requires event based I/O, // which is not yet ready to be (re)added. log.debug("tid={d} start poll with {d} fds", .{ std.Thread.getCurrentId(), fds.len }); - if ((std.os.poll( + if ((std.posix.poll( fds, -1, // Infinite timeout ) catch @panic("poll error")) > 0) { - if (fds[control_fd_inx].revents & std.os.POLL.IN == std.os.POLL.IN) { // POLLIN means "there is data to read" + if (fds[control_fd_inx].revents & std.posix.POLL.IN == std.posix.POLL.IN) { // POLLIN means "there is data to read" log.debug("tid={d} control event", .{std.Thread.getCurrentId()}); // we only need one byte for what we're doing var control_buf: [1]u8 = undefined; @@ -109,9 +102,9 @@ pub fn startWatch(self: *Self) void { // self.control_socket_accepted_fd = self.control_socket_accepted_fd orelse acceptSocket(self.control_socket.?); // const fd = self.control_socket_accepted_fd.?; // let's save some typing const fd = acceptSocket(self.sock_name, self.control_socket.?); - defer std.os.close(fd); + defer std.posix.close(fd); - var readcount = std.os.recv(fd, &control_buf, 0) catch unreachable; + const readcount = std.posix.recv(fd, &control_buf, 0) catch unreachable; // var other_buf: [1]u8 = undefined; // if (std.os.recv(fd, &other_buf, 0) catch unreachable != 0) // @panic("socket contains more data than expected"); @@ -142,11 +135,11 @@ pub fn startWatch(self: *Self) void { // fds[1] is inotify, so if we have data in that file descriptor, // we can force the data into an inotify_event structure and act on it - if (self.inotify_fd != null and fds[inotify_fd_inx].revents & std.os.POLL.IN == std.os.POLL.IN) { + if (self.inotify_fd != null and fds[inotify_fd_inx].revents & std.posix.POLL.IN == std.posix.POLL.IN) { log.debug("tid={d} inotify event", .{std.Thread.getCurrentId()}); var event_buf: [4096]u8 align(@alignOf(std.os.linux.inotify_event)) = undefined; // "borrowed" from https://ziglang.org/documentation/master/std/src/std/fs/watch.zig.html#L588 - const bytes_read = std.os.read(self.inotify_fd.?, &event_buf) catch unreachable; + const bytes_read = std.posix.read(self.inotify_fd.?, &event_buf) catch unreachable; var ptr: [*]u8 = &event_buf; const end_ptr = ptr + bytes_read; @@ -162,11 +155,11 @@ pub fn startWatch(self: *Self) void { } } -fn acceptSocket(name: [:0]const u8, socket: std.os.socket_t) std.os.socket_t { +fn acceptSocket(name: [:0]const u8, socket: std.posix.socket_t) std.posix.socket_t { var sockaddr = std.net.Address.initUnix(name) catch @panic("could not get sockaddr"); - var sockaddr_len: std.os.socklen_t = sockaddr.getOsSockLen(); + var sockaddr_len: std.posix.socklen_t = sockaddr.getOsSockLen(); log.debug("tid={d} accepting on socket fd {d}", .{ std.Thread.getCurrentId(), socket }); - return std.os.accept( + return std.posix.accept( socket, &sockaddr.any, &sockaddr_len, @@ -266,16 +259,16 @@ test "nameMatch" { /// adds a file to watch. The return will be a handle that will be returned /// in the fileChanged event triffered from startWatch pub fn addFileWatch(self: *Self, path: *[:0]const u8) !usize { - self.inotify_fd = self.inotify_fd orelse try std.os.inotify_init1(std.os.linux.IN.NONBLOCK); + self.inotify_fd = self.inotify_fd orelse try std.posix.inotify_init1(std.os.linux.IN.NONBLOCK); errdefer { - std.os.close(self.inotify_fd.?); + std.posix.close(self.inotify_fd.?); self.inotify_fd = null; } // zig build modification pattern: open 20, close_nowrite 10, MOVED_TO (on the directory), attrib 4 // unix cp: OPEN, MODIFY, CLOSE_WRITE, ATTRIB // unix mv: MOVED_TO (on the directory) self.wds[self.nfds_t] = .{ - .wd = try std.os.inotify_add_watchZ( + .wd = try std.posix.inotify_add_watchZ( self.inotify_fd.?, path.*, std.os.linux.IN.CLOSE_WRITE, @@ -302,7 +295,7 @@ fn addDirWatch(self: *Self, path: *[]const u8) !void { return; // We are already watching this directory // We do not have a directory watch self.dir_wds[self.dir_nfds_t] = .{ - .wd = try std.os.inotify_add_watch(self.inotify_fd.?, dirname, std.os.linux.IN.MOVED_TO), + .wd = try std.posix.inotify_add_watch(self.inotify_fd.?, dirname, std.os.linux.IN.MOVED_TO), .path = path, // we store path rather than directory because doing this without an allocator is...tough }; self.dir_nfds_t += 1; @@ -342,10 +335,10 @@ fn addControlSocket(self: *Self, path: [:0]const u8) !void { // // This function theoretically should work without requiring linux...except this inotify call, // which is completely linux specific - self.inotify_fd = self.inotify_fd orelse try std.os.inotify_init1(std.os.linux.IN.NONBLOCK); + self.inotify_fd = self.inotify_fd orelse try std.posix.inotify_init1(std.os.linux.IN.NONBLOCK); log.debug("Established inotify file descriptor {d}", .{self.inotify_fd.?}); errdefer { - std.os.close(self.inotify_fd.?); + std.posix.close(self.inotify_fd.?); self.inotify_fd = null; } // this should work on all systems theoretically, but I believe would work only @@ -369,18 +362,18 @@ fn addControlSocket(self: *Self, path: [:0]const u8) !void { // 6. std.os.close: close the fd // // On end of use, we need to std.os.closeSocket() - const sock = try std.os.socket( + const sock = try std.posix.socket( std.os.linux.AF.LOCAL, - std.os.linux.SOCK.STREAM | std.os.SOCK.CLOEXEC, + std.os.linux.SOCK.STREAM | std.posix.SOCK.CLOEXEC, 0, ); - errdefer std.os.closeSocket(sock); + errdefer std.posix.close(sock); const sockaddr = try std.net.Address.initUnix(path); log.debug("binding to path: {s}", .{path}); - try std.os.bind(sock, &sockaddr.any, sockaddr.getOsSockLen()); - try std.os.listen(sock, 10); + try std.posix.bind(sock, &sockaddr.any, sockaddr.getOsSockLen()); + try std.posix.listen(sock, 10); self.control_socket = sock; log.debug("added control socket with fd={d}", .{sock}); } diff --git a/src/config.zig b/src/config.zig index a36082c..490db8f 100644 --- a/src/config.zig +++ b/src/config.zig @@ -65,17 +65,17 @@ pub fn parse(self: Self, reader: anytype) !ParsedConfig { defer self.allocator.free(line); const nocomments = std.mem.trim(u8, @constCast(&std.mem.split(u8, line, "#")).first(), ws); var data_iterator = std.mem.split(u8, nocomments, "="); - var key = std.mem.trim(u8, data_iterator.first(), ws); // first never fails + const key = std.mem.trim(u8, data_iterator.first(), ws); // first never fails if (key.len == 0) continue; - var value = std.mem.trim(u8, data_iterator.next() orelse return error.NoValueForKey, ws); + const value = std.mem.trim(u8, data_iterator.next() orelse return error.NoValueForKey, ws); // keys should be putNoClobber, but values can be put. // Because we have to dup the memory here though, we want to // manage duplicate values seperately - var dup_key = try self.allocator.dupeZ(u8, key); - var dup_value = try self.allocator.dupeZ(u8, value); + const dup_key = try self.allocator.dupeZ(u8, key); + const dup_value = try self.allocator.dupeZ(u8, value); try rc.key_value_map.putNoClobber(dup_key, dup_value); if (!rc.value_key_map.contains(value)) { - var keys = try self.allocator.create(std.ArrayList([:0]u8)); + const keys = try self.allocator.create(std.ArrayList([:0]u8)); keys.* = std.ArrayList([:0]u8).init(self.allocator); try rc.value_key_map.put(dup_value, keys); } @@ -85,7 +85,7 @@ pub fn parse(self: Self, reader: anytype) !ParsedConfig { } test "gets config from a stream" { - var allocator = std.testing.allocator; + const allocator = std.testing.allocator; var stream = std.io.fixedBufferStream( \\# This is a simple "path prefix" = dynamic library path mapping \\ # no reordering will be done, so you must do things most -> least specific diff --git a/src/interface.zig b/src/interface.zig index 8969404..6f00503 100644 --- a/src/interface.zig +++ b/src/interface.zig @@ -44,7 +44,7 @@ pub const ZigRequest = struct { target: []const u8, method: [:0]u8, content: []u8, - headers: std.http.Headers, + headers: []std.http.Header, }; pub const ZigHeader = struct { @@ -56,7 +56,7 @@ pub const ZigResponse = struct { status: std.http.Status = .ok, reason: ?[]const u8 = null, body: *std.ArrayList(u8), - headers: std.http.Headers, + headers: []std.http.Header, request: ZigRequest, prepend: std.ArrayList(u8), @@ -94,9 +94,9 @@ pub fn zigInit(parent_allocator: *anyopaque) callconv(.C) void { /// Converts a StringHashMap to the structure necessary for passing through the /// C boundary. This will be called automatically for you via the handleRequest function /// and is also used by the main processing loop to coerce request headers -fn toHeaders(alloc: std.mem.Allocator, headers: std.http.Headers) ![*]Header { - var header_array = try std.ArrayList(Header).initCapacity(alloc, headers.list.items.len); - for (headers.list.items) |*field| { +fn toHeaders(alloc: std.mem.Allocator, headers: []const std.http.Header) ![*]Header { + var header_array = try std.ArrayList(Header).initCapacity(alloc, headers.len); + for (headers) |*field| { header_array.appendAssumeCapacity(.{ .name_ptr = @constCast(field.name.ptr), .name_len = field.name.len, @@ -114,7 +114,7 @@ fn toHeaders(alloc: std.mem.Allocator, headers: std.http.Headers) ![*]Header { pub fn handleRequest(request: *Request, zigRequestHandler: ZigRequestHandler) ?*Response { // TODO: implement another library in C or Rust or something to show // that anything using a C ABI can be successful - var alloc = if (allocator) |a| a.* else { + const alloc = if (allocator) |a| a.* else { log.err("zigInit not called prior to handle_request. This is a coding error", .{}); return null; }; @@ -123,12 +123,12 @@ pub fn handleRequest(request: *Request, zigRequestHandler: ZigRequestHandler) ?* var response = std.ArrayList(u8).init(alloc); // setup headers - var request_headers = std.http.Headers.init(alloc); + var request_headers = std.ArrayList(std.http.Header).init(alloc); for (0..request.headers_len) |i| - request_headers.append( - request.headers[i].name_ptr[0..request.headers[i].name_len], - request.headers[i].value_ptr[0..request.headers[i].value_len], - ) catch |e| { + request_headers.append(.{ + .name = request.headers[i].name_ptr[0..request.headers[i].name_len], + .value = request.headers[i].value_ptr[0..request.headers[i].value_len], + }) catch |e| { log.err("Unexpected error processing request: {any}", .{e}); if (@errorReturnTrace()) |trace| { std.debug.dumpStackTrace(trace.*); @@ -136,16 +136,22 @@ pub fn handleRequest(request: *Request, zigRequestHandler: ZigRequestHandler) ?* return null; }; - var prepend = std.ArrayList(u8).init(alloc); + const prepend = std.ArrayList(u8).init(alloc); var zig_response = ZigResponse{ - .headers = .{ .allocator = alloc }, + .headers = &.{}, .body = &response, .prepend = prepend, .request = .{ .content = request.content[0..request.content_len], .target = request.target[0..request.target_len], .method = request.method[0..request.method_len :0], - .headers = request_headers, + .headers = request_headers.toOwnedSlice() catch |e| { + log.err("Unexpected error processing request: {any}", .{e}); + if (@errorReturnTrace()) |trace| { + std.debug.dumpStackTrace(trace.*); + } + return null; + }, }, }; zigRequestHandler( @@ -184,7 +190,7 @@ fn buildResponse(alloc: std.mem.Allocator, zig_response: *ZigResponse) ?*Respons } return null; }; - rc.headers_len = zig_response.headers.list.items.len; + rc.headers_len = zig_response.headers.len; rc.status = if (zig_response.status == .ok) 0 else @intFromEnum(zig_response.status); rc.reason_len = 0; if (zig_response.reason) |*r| { diff --git a/src/main-lib.zig b/src/main-lib.zig index 2053b4d..48f6b11 100644 --- a/src/main-lib.zig +++ b/src/main-lib.zig @@ -24,7 +24,7 @@ const log = std.log.scoped(.@"main-lib"); // } // comptime { - @export(interface.zigInit, .{ .name = "zigInit", .linkage = .Strong }); + @export(interface.zigInit, .{ .name = "zigInit", .linkage = .strong }); } /// handle_request will be called on a single request, but due to the preservation @@ -58,32 +58,46 @@ export fn handle_request(request: *interface.Request) callconv(.C) ?*interface.R // handleRequest function here is the last line of boilerplate and the // entry to a request fn handleRequest(allocator: std.mem.Allocator, response: *interface.ZigResponse) !void { - _ = allocator; // setup var response_writer = response.body.writer(); // real work - if (response.request.headers.getFirstValue("host")) |host| { - if (std.mem.startsWith(u8, host, "iam")) { - try response_writer.print("iam response", .{}); + for (response.request.headers) |h| { + if (std.ascii.eqlIgnoreCase(h.name, "host")) { + if (std.mem.startsWith(u8, h.value, "iam")) { + try response_writer.print("iam response", .{}); + return; + } + break; + } + } + for (response.request.headers) |h| { + if (std.ascii.eqlIgnoreCase(h.name, "x-slow")) { + std.time.sleep(std.time.ns_per_ms * (std.fmt.parseInt(usize, h.value, 10) catch 1000)); + try response_writer.print("i am slow\n\n", .{}); return; } } - if (response.request.headers.getFirstValue("x-slow")) |ms| { - std.time.sleep(std.time.ns_per_ms * (std.fmt.parseInt(usize, ms, 10) catch 1000)); - try response_writer.print("i am slow\n\n", .{}); - return; + for (response.request.headers) |h| { + if (std.ascii.eqlIgnoreCase(h.name, "x-log-this")) { + try response.writeAll(h.value); + break; + } } - if (response.request.headers.getFirstValue("x-log-this")) |l| { - try response.writeAll(l); + for (response.request.headers) |h| { + if (std.ascii.eqlIgnoreCase(h.name, "x-status")) { + response.status = @enumFromInt(std.fmt.parseInt(u10, h.value, 10) catch 500); + break; + } } - if (response.request.headers.getFirstValue("x-status")) |s| { - response.status = @enumFromInt(std.fmt.parseInt(u10, s, 10) catch 500); + for (response.request.headers) |h| { + if (std.ascii.eqlIgnoreCase(h.name, "x-throw")) + return error.Thrown; } - if (response.request.headers.getFirstValue("x-throw")) |_| { - return error.Thrown; - } - try response_writer.print(" {d}", .{response.request.headers.list.items.len}); - try response.headers.append("X-custom-foo", "bar"); + try response_writer.print(" {d}", .{response.request.headers.len}); + var headers = std.ArrayList(std.http.Header).init(allocator); + try headers.appendSlice(response.headers); + try headers.append(.{ .name = "X-custom-foo", .value = "bar" }); + response.headers = try headers.toOwnedSlice(); } test "handle_request" { @@ -91,7 +105,7 @@ test "handle_request" { defer arena.deinit(); var aa = arena.allocator(); interface.zigInit(&aa); - var headers: []interface.Header = @constCast(&[_]interface.Header{.{ + const headers: []interface.Header = @constCast(&[_]interface.Header{.{ .name_ptr = @ptrCast(@constCast("GET".ptr)), .name_len = 3, .value_ptr = @ptrCast(@constCast("GET".ptr)), @@ -119,7 +133,7 @@ test "lib can write data directly" { defer arena.deinit(); var aa = arena.allocator(); interface.zigInit(&aa); - var headers: []interface.Header = @constCast(&[_]interface.Header{.{ + const headers: []interface.Header = @constCast(&[_]interface.Header{.{ .name_ptr = @ptrCast(@constCast("x-log-this".ptr)), .name_len = "x-log-this".len, .value_ptr = @ptrCast(@constCast("I am a teapot".ptr)), @@ -146,7 +160,7 @@ test "lib can write data directly and still throw" { defer arena.deinit(); var aa = arena.allocator(); interface.zigInit(&aa); - var headers: []interface.Header = @constCast(&[_]interface.Header{ .{ + const headers: []interface.Header = @constCast(&[_]interface.Header{ .{ .name_ptr = @ptrCast(@constCast("x-log-this".ptr)), .name_len = "x-log-this".len, .value_ptr = @ptrCast(@constCast("I am a teapot".ptr)), @@ -176,7 +190,7 @@ test "lib can set status, update data directly and still throw" { defer arena.deinit(); var aa = arena.allocator(); interface.zigInit(&aa); - var headers: []interface.Header = @constCast(&[_]interface.Header{ .{ + const headers: []interface.Header = @constCast(&[_]interface.Header{ .{ .name_ptr = @ptrCast(@constCast("x-log-this".ptr)), .name_len = "x-log-this".len, .value_ptr = @ptrCast(@constCast("I am a teapot".ptr)), diff --git a/src/main.zig b/src/main.zig index d933f7e..52f818b 100644 --- a/src/main.zig +++ b/src/main.zig @@ -7,16 +7,17 @@ const config = @import("config.zig"); const log = std.log.scoped(.main); // logging options -pub const std_options = struct { - pub const log_level = switch (builtin.mode) { - .Debug => .debug, - .ReleaseSafe => .info, - .ReleaseFast, .ReleaseSmall => .err, - }; +pub const std_options = .{ + .log_level = if (!builtin.is_test) + switch (builtin.mode) { + .Debug => .debug, + .ReleaseSafe => .info, + .ReleaseFast, .ReleaseSmall => .err, + }, - pub const log_scope_levels = &[_]std.log.ScopeLevel{ + .log_scope_levels = &[_]std.log.ScopeLevel{ .{ .scope = .watch, .level = .info }, - }; + }, }; const serveFn = *const fn (*interface.Request) ?*interface.Response; @@ -32,8 +33,9 @@ var watcher_thread: ?std.Thread = null; var timer: ?std.time.Timer = null; const FullReturn = struct { - response: []u8, - executor: *Executor, + content: []const u8, + executor: *Executor = undefined, + respond_options: std.http.Server.Request.RespondOptions, }; // Executor structure, including functions that were found in the library @@ -52,9 +54,9 @@ const Executor = struct { // fields used for internal accounting watch: ?usize = null, - drain_in_progress: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false), - load_in_progress: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false), - requests_in_flight: std.atomic.Atomic(usize) = std.atomic.Atomic(usize).init(0), + drain_in_progress: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), + load_in_progress: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), + requests_in_flight: std.atomic.Value(usize) = std.atomic.Value(usize).init(0), }; const SERVE_FN_NAME = "handle_request"; @@ -67,68 +69,68 @@ var parsed_config: config.ParsedConfig = undefined; /// Serves a single request. Finds executor, marshalls request data for the C /// interface, calls the executor and marshalls data back -fn serve(allocator: *std.mem.Allocator, response: *std.http.Server.Response) !*FullReturn { - const executor = try getExecutor(response.request.target, response.request.headers); - errdefer _ = executor.requests_in_flight.fetchSub(1, .Monotonic); +fn serve(allocator: *std.mem.Allocator, request: *std.http.Server.Request) !*FullReturn { + var head_it = request.iterateHeaders(); + const headers = try toHeaders(allocator.*, &head_it); + const executor = try getExecutor(request.head.target, headers); + errdefer _ = executor.requests_in_flight.fetchSub(1, .monotonic); if (executor.zigInitFn) |f| f(allocator); // Call external library - const method_tag = @tagName(response.request.method); - const headers = try toHeaders(allocator.*, response.request.headers); - var request_content: []u8 = &[_]u8{}; - if (response.request.content_length) |l| { - request_content = try response.reader().readAllAlloc(allocator.*, @as(usize, l)); - } + const method_tag = @tagName(request.head.method); + const request_content: []u8 = try (try request.reader()).readAllAlloc(allocator.*, std.math.maxInt(usize)); log.debug("{d} bytes read from request", .{request_content.len}); - var request = interface.Request{ - .target = response.request.target.ptr, - .target_len = response.request.target.len, + var i_request = interface.Request{ + .target = request.head.target.ptr, + .target_len = request.head.target.len, .method = @constCast(method_tag[0..].ptr), .method_len = method_tag.len, - .headers = headers, - .headers_len = response.request.headers.list.items.len, + .headers = headers.ptr, + .headers_len = headers.len, .content = request_content.ptr, .content_len = request_content.len, }; - var serve_result = executor.serveFn.?(&request).?; // ok for this pointer deref to fail - log.debug("target: {s}", .{response.request.target}); + var serve_result = executor.serveFn.?(&i_request).?; // ok for this pointer deref to fail + log.debug("target: {s}", .{request.head.target}); log.debug("response ptr: {*}", .{serve_result.ptr}); - var slice: []u8 = serve_result.ptr[0..serve_result.len]; - log.debug("response body: {s}", .{slice}); + var response = try allocator.create(FullReturn); // allocator is arena here + response.content = serve_result.ptr[0..serve_result.len]; + response.executor = executor; + response.respond_options = .{}; + log.debug("response body: {s}", .{response.content}); if (serve_result.status != 0) { - response.status = @enumFromInt(serve_result.status); + response.respond_options.status = @enumFromInt(serve_result.status); if (serve_result.reason_len > 0) { - response.reason = serve_result.reason_ptr[0..serve_result.reason_len]; + response.respond_options.reason = serve_result.reason_ptr[0..serve_result.reason_len]; } } - // Deal with results + // Deal with response headers + var response_headers = try std.ArrayList(std.http.Header).initCapacity(allocator.*, serve_result.headers_len + 1); + defer response_headers.deinit(); var content_type_added = false; for (0..serve_result.headers_len) |inx| { const head = serve_result.headers[inx]; - try response.headers.append( - head.name_ptr[0..head.name_len], - head.value_ptr[0..head.value_len], - ); + response_headers.appendAssumeCapacity(.{ + .name = head.name_ptr[0..head.name_len], + .value = head.value_ptr[0..head.value_len], + }); // headers are case insensitive content_type_added = std.ascii.eqlIgnoreCase(head.name_ptr[0..head.name_len], "content-type"); } if (!content_type_added) - try response.headers.append("content-type", "text/plain"); - // target is path - var rc = try allocator.create(FullReturn); - rc.executor = executor; - rc.response = slice; - return rc; + response_headers.appendAssumeCapacity(.{ .name = "content-type", .value = "text/plain" }); + response.respond_options.extra_headers = try response_headers.toOwnedSlice(); + return response; } /// Gets and executor based on request data -fn getExecutor(requested_path: []const u8, headers: std.http.Headers) !*Executor { +fn getExecutor(requested_path: []const u8, headers: []interface.Header) !*Executor { var executor = blk: { for (executors) |*exec| { if (executorIsMatch(exec.match_data, requested_path, headers)) { @@ -138,15 +140,15 @@ fn getExecutor(requested_path: []const u8, headers: std.http.Headers) !*Executor log.err("Could not find executor for target path '{s}'", .{requested_path}); return error.NoApplicableExecutor; }; - while (executor.drain_in_progress.load(.Acquire)) { + while (executor.drain_in_progress.load(.acquire)) { // we need to stand down and stand by std.atomic.spinLoopHint(); // Let CPU know we're just hanging out std.time.sleep(1 * std.time.ns_per_ms / 2); } // Tell everyone we're about to use this bad boy // While requests_in_flight is >= 0, nobody should unload the library - _ = executor.requests_in_flight.fetchAdd(1, .Acquire); - errdefer _ = executor.requests_in_flight.fetchSub(1, .Release); + _ = executor.requests_in_flight.fetchAdd(1, .acquire); + errdefer _ = executor.requests_in_flight.fetchSub(1, .release); if (executor.serveFn != null) return executor; @@ -157,13 +159,13 @@ fn getExecutor(requested_path: []const u8, headers: std.http.Headers) !*Executor // If the library is being reloaded and a bunch of requests come in, // we could have multiple threads racing to load // NOTE: I am not confident of the memory ordering here on tryCompareAndSwap - while (executor.load_in_progress.tryCompareAndSwap(false, true, .Acquire, .Acquire)) |_| { + while (@cmpxchgWeak(bool, &executor.load_in_progress.raw, false, true, .acquire, .acquire)) |_| { // we need to stand down and stand by std.atomic.spinLoopHint(); // Let CPU know we're just hanging out std.time.sleep(1 * std.time.ns_per_ms / 2); } // we have the conch...lock others out - defer executor.load_in_progress.store(false, .Release); + defer executor.load_in_progress.store(false, .release); if (executor.library) |l| break :blk l; // someone beat us to the race..our defer above will take care of unlocking @@ -185,7 +187,7 @@ fn getExecutor(requested_path: []const u8, headers: std.http.Headers) !*Executor return executor; } -fn executorIsMatch(match_data: []const u8, requested_path: []const u8, headers: std.http.Headers) bool { +fn executorIsMatch(match_data: []const u8, requested_path: []const u8, headers: []interface.Header) bool { if (!std.mem.containsAtLeast(u8, match_data, 1, ":")) { // match_data does not have a ':'. This means this is a straight path, without // any header requirement. We can simply return a match prefix on the @@ -196,7 +198,13 @@ fn executorIsMatch(match_data: []const u8, requested_path: []const u8, headers: } const colon = std.mem.indexOf(u8, match_data, ":").?; const header_needle = match_data[0..colon]; - const header_inx = headers.firstIndexOf(header_needle) orelse return false; + const header_inx = blk: { + for (headers, 0..) |h, i| { + if (std.ascii.eqlIgnoreCase(h.name_ptr[0..h.name_len], header_needle)) + break :blk i; + } + return false; + }; // Apparently std.mem.split will return an empty first when the haystack starts // with the delimiter var split = std.mem.split(u8, std.mem.trim(u8, match_data[colon + 1 ..], "\t "), " "); @@ -211,7 +219,7 @@ fn executorIsMatch(match_data: []const u8, requested_path: []const u8, headers: // match_data includes some sort of header match as well. We assume the // header match is a full match on the key (handled above) // but a prefix match on the value - const request_header_value = headers.list.items[header_inx].value; + const request_header_value = headers[header_inx].value_ptr[0..headers[header_inx].value_len]; // (shoud this be case insensitive?) if (!std.mem.startsWith(u8, request_header_value, header_value_needle)) return false; // header value matches...return the path prefix match @@ -243,9 +251,9 @@ fn executorChanged(watch: usize) void { if (executor.watch) |w| { if (w == watch) { if (executor.library) |l| { - executor.drain_in_progress.store(true, .Release); - defer executor.drain_in_progress.store(false, .Release); - while (executor.requests_in_flight.load(.Acquire) > 0) { + executor.drain_in_progress.store(true, .release); + defer executor.drain_in_progress.store(false, .release); + while (executor.requests_in_flight.load(.acquire) > 0) { std.atomic.spinLoopHint(); std.time.sleep(1 * std.time.ns_per_ms / 2); } @@ -257,7 +265,7 @@ fn executorChanged(watch: usize) void { log.warn("could not reload! error opening library", .{}); return; }; - var symbol = std.c.dlsym(executor.library.?, SERVE_FN_NAME); + const symbol = std.c.dlsym(executor.library.?, SERVE_FN_NAME); if (symbol == null) { log.warn("could not reload! error finding symbol", .{}); if (std.c.dlclose(executor.library.?) != 0) @@ -284,11 +292,11 @@ fn dlopen(path: [:0]const u8) !*anyopaque { /// exit from the application as the main function has an infinite loop fn exitApplication( _: i32, - _: *const std.os.siginfo_t, + _: *const std.posix.siginfo_t, _: ?*const anyopaque, ) callconv(.C) noreturn { exitApp(0); - std.os.exit(0); + std.posix.exit(0); } /// exitApp handles deinitialization for the application and any reporting @@ -302,7 +310,7 @@ fn exitApp(exitcode: u8) void { watcher.stopWatch() catch @panic("could not stop watcher"); std.io.getStdOut().writer().print("exiting application\n", .{}) catch {}; watcher.deinit(); - std.os.exit(exitcode); + std.posix.exit(exitcode); // joining threads will hang...we're ultimately in a signal handler. // But everything is shut down cleanly now, so I don't think it hurts to // just kill it all (NOTE: from a practical perspective, we do not seem @@ -316,7 +324,7 @@ fn exitApp(exitcode: u8) void { /// has been implemented. Operates off the main thread fn reloadConfig( _: i32, - _: *const std.os.siginfo_t, + _: *const std.posix.siginfo_t, _: ?*const anyopaque, ) callconv(.C) void { // TODO: Gracefully drain in flight requests and hold a lock here while @@ -334,36 +342,36 @@ fn reloadConfig( /// Installs all signal handlers for shutdown and configuration reload fn installSignalHandler() !void { - var act = std.os.Sigaction{ + var act = std.posix.Sigaction{ .handler = .{ .sigaction = exitApplication }, - .mask = std.os.empty_sigset, - .flags = (std.os.SA.SIGINFO | std.os.SA.RESTART | std.os.SA.RESETHAND), + .mask = std.posix.empty_sigset, + .flags = (std.posix.SA.SIGINFO | std.posix.SA.RESTART | std.posix.SA.RESETHAND), }; - try std.os.sigaction(std.os.SIG.INT, &act, null); - try std.os.sigaction(std.os.SIG.TERM, &act, null); + try std.posix.sigaction(std.posix.SIG.INT, &act, null); + try std.posix.sigaction(std.posix.SIG.TERM, &act, null); - var hup_act = std.os.Sigaction{ + var hup_act = std.posix.Sigaction{ .handler = .{ .sigaction = reloadConfig }, - .mask = std.os.empty_sigset, - .flags = (std.os.SA.SIGINFO | std.os.SA.RESTART | std.os.SA.RESETHAND), + .mask = std.posix.empty_sigset, + .flags = (std.posix.SA.SIGINFO | std.posix.SA.RESTART | std.posix.SA.RESETHAND), }; - try std.os.sigaction(std.os.SIG.HUP, &hup_act, null); + try std.posix.sigaction(std.posix.SIG.HUP, &hup_act, null); } pub fn main() !void { // We are linked to libc, and primarily using the arena allocator // raw allocator recommended for use in arenas - var raw_allocator = std.heap.raw_c_allocator; + const raw_allocator = std.heap.raw_c_allocator; // Our child process will also need an allocator, and is using the // same pattern, so we will hand the child a raw allocator as well - var child_allocator = std.heap.raw_c_allocator; + const child_allocator = std.heap.raw_c_allocator; // Lastly, we need some of our own operations var arena = std.heap.ArenaAllocator.init(raw_allocator); defer arena.deinit(); - var parent_allocator = arena.allocator(); + const parent_allocator = arena.allocator(); var al = std.ArrayList([]const u8).init(parent_allocator); defer al.deinit(); @@ -408,25 +416,22 @@ fn childMain(allocator: std.mem.Allocator) !void { watcher = Watch.init(executorChanged); watcher_thread = try std.Thread.spawn(.{}, Watch.startWatch, .{&watcher}); - var server = std.http.Server.init(allocator, .{ .reuse_address = true }); - defer server.deinit(); - const address = try std.net.Address.parseIp( "0.0.0.0", - if (std.os.getenv("PORT")) |p| try std.fmt.parseInt(u16, p, 10) else PORT, + if (std.posix.getenv("PORT")) |p| try std.fmt.parseInt(u16, p, 10) else PORT, ); - try server.listen(address); - const server_port = server.socket.listen_address.in.getPort(); + var server = try address.listen(.{ .reuse_address = true }); + const server_port = server.listen_address.in.getPort(); log.info("listening on port: {d}", .{server_port}); if (builtin.os.tag == .linux) log.info("pid: {d}", .{std.os.linux.getpid()}); try installSignalHandler(); - response_preallocation_kb = if (std.os.getenv("RESPONSE_PREALLOCATION_KB")) |kb| + response_preallocation_kb = if (std.posix.getenv("RESPONSE_PREALLOCATION_KB")) |kb| try std.fmt.parseInt(usize, kb, 10) else response_preallocation_kb; - var server_thread_count = if (std.os.getenv("SERVER_THREAD_COUNT")) |count| + const server_thread_count = if (std.posix.getenv("SERVER_THREAD_COUNT")) |count| try std.fmt.parseInt(usize, count, 10) else switch (builtin.mode) { .Debug => @min(4, try std.Thread.getCpuCount()), @@ -451,7 +456,7 @@ fn childMain(allocator: std.mem.Allocator) !void { for (server_threads.items) |thread| thread.join(); } -fn threadMain(allocator: std.mem.Allocator, server: *std.http.Server, thread_number: usize) !void { +fn threadMain(allocator: std.mem.Allocator, server: *std.net.Server, thread_number: usize) !void { // TODO: If we're in a thread pool we need to be careful with this... const stdout_file = std.io.getStdOut().writer(); var bw = std.io.bufferedWriter(stdout_file); @@ -505,37 +510,42 @@ fn loadConfig(allocator: std.mem.Allocator) ![]Executor { /// main loop calls processRequest, which is responsible for the interface /// with logs, connection accounting, etc. The work dealing with the request /// itself is delegated to the serve function to work with the executor -fn processRequest(allocator: *std.mem.Allocator, server: *std.http.Server, writer: anytype) !void { +fn processRequest(allocator: *std.mem.Allocator, server: *std.net.Server, writer: anytype) !void { if (timer == null) timer = try std.time.Timer.start(); var tm = timer.?; - var res = try server.accept(.{ .allocator = allocator.* }); - defer res.deinit(); - defer _ = res.reset(); - try res.wait(); // wait for client to send a complete request head + var connection = try server.accept(); + defer connection.stream.close(); + var read_buffer: [1024 * 16]u8 = undefined; // TODO: fix this + var server_connection = std.http.Server.init(connection, &read_buffer); + var req = try server_connection.receiveHead(); + + // TODO: should we check for server_connection.state == .ready? // I believe it's fair to start our timer after this is done tm.reset(); + const err_return = FullReturn{ + .content = "Internal Server Error\n", + .respond_options = .{ .status = .internal_server_error }, + }; // This is an nginx log: // git.lerch.org 50.39.111.175 - - [16/May/2023:02:56:31 +0000] "POST /api/actions/runner.v1.RunnerService/FetchTask HTTP/2.0" 200 0 "-" "connect-go/1.2.0-dev (go1.20.1)" "172.20.0.5:3000" // TODO: replicate this try writer.print("{} - - \"{s} {s} {s}\"", .{ - res.address, - @tagName(res.request.method), - res.request.target, - @tagName(res.request.version), + server_connection.connection.address, + @tagName(req.head.method), + req.head.target, + @tagName(req.head.version), }); - const errstr = "Internal Server Error\n"; - var errbuf: [errstr.len]u8 = undefined; - var response_bytes = try std.fmt.bufPrint(&errbuf, errstr, .{}); - - var full_response = serve(allocator, &res) catch |e| brk: { - res.status = .internal_server_error; - // TODO: more about this particular request + var caught_err = false; + const full_response = serve(allocator, &req) catch |e| brk: { log.err("Unexpected error from executor processing request: {any}", .{e}); + // TODO: more about this particular request if (@errorReturnTrace()) |trace| { std.debug.dumpStackTrace(trace.*); } - break :brk null; + + caught_err = true; + break :brk &err_return; }; defer { // The call above converts uncaught errors to a null @@ -545,30 +555,29 @@ fn processRequest(allocator: *std.mem.Allocator, server: *std.http.Server, write // This leaves this defer block as the only place we can/should decrement // under normal conditions - if (full_response) |f| { - if (f.executor.requestDeinitFn) |d| d(); - _ = f.executor.requests_in_flight.fetchSub(1, .Release); + if (!caught_err) { + if (full_response.executor.requestDeinitFn) |d| d(); + _ = full_response.executor.requests_in_flight.fetchSub(1, .release); } } - if (full_response) |f| - response_bytes = f.response; - res.transfer_encoding = .{ .content_length = response_bytes.len }; - try res.headers.append("connection", "close"); - try writer.print(" {d} ttfb {d:.3}ms", .{ @intFromEnum(res.status), @as(f64, @floatFromInt(tm.read())) / std.time.ns_per_ms }); - if (builtin.is_test) writeToTestBuffers(response_bytes, &res); - try res.do(); - _ = try res.writer().writeAll(response_bytes); - try res.finish(); + try writer.print(" {d} ttfb {d:.3}ms", .{ + @intFromEnum(full_response.respond_options.status), + @as(f64, @floatFromInt(tm.read())) / std.time.ns_per_ms, + }); + if (builtin.is_test) writeToTestBuffers(full_response.content); + req.respond(full_response.content, full_response.respond_options) catch |e| { + log.err("Unexpected error responding to client: {any}", .{e}); + }; try writer.print(" {d} ttlb {d:.3}ms", .{ - response_bytes.len, + full_response.content.len, @as(f64, @floatFromInt(tm.read())) / std.time.ns_per_ms, }); } -fn toHeaders(allocator: std.mem.Allocator, headers: std.http.Headers) ![*]interface.Header { - var header_array = try std.ArrayList(interface.Header).initCapacity(allocator, headers.list.items.len); - for (headers.list.items) |kv| { - header_array.appendAssumeCapacity(.{ +fn toHeaders(allocator: std.mem.Allocator, headers: *std.http.HeaderIterator) ![]interface.Header { + var header_array = std.ArrayList(interface.Header).init(allocator); + while (headers.next()) |kv| { + try header_array.append(.{ .name_ptr = @constCast(kv.name).ptr, .name_len = kv.name.len, @@ -576,7 +585,7 @@ fn toHeaders(allocator: std.mem.Allocator, headers: std.http.Headers) ![*]interf .value_len = kv.value.len, }); } - return header_array.items.ptr; + return try header_array.toOwnedSlice(); } /// Allocates at least preallocated_kb kilobytes of ram for usage. Some overhead @@ -592,12 +601,11 @@ fn preWarmArena(aa: std.mem.Allocator, arena: *std.heap.ArenaAllocator, prealloc ); if (!arena.reset(.{ .retain_with_limit = (arena.queryCapacity() + @as(usize, 1023)) / @as(usize, 1024) * 1024 })) log.warn("arena reset failed, arena degraded", .{}); - var bytes_allocated = arena.queryCapacity(); + const bytes_allocated = arena.queryCapacity(); log.debug("preallocated {d} bytes", .{bytes_allocated}); return bytes_allocated; } -fn writeToTestBuffers(response: []const u8, res: *std.http.Server.Response) void { - _ = res; +fn writeToTestBuffers(response: []const u8) void { log.debug("writing to test buffers", .{}); // This performs core dump...because we're in a separate thread? // @memset(test_resp_buf, 0); @@ -615,16 +623,13 @@ fn testRequest(request_bytes: []const u8) !void { var arena = std.heap.ArenaAllocator.init(allocator); defer arena.deinit(); - var server = std.http.Server.init(allocator, .{ .reuse_address = true }); - defer server.deinit(); - const address = try std.net.Address.parseIp("127.0.0.1", 0); - try server.listen(address); - const server_port = server.socket.listen_address.in.getPort(); + var http_server = try address.listen(.{ .reuse_address = true }); + const server_port = http_server.listen_address.in.getPort(); var al = std.ArrayList(u8).init(allocator); defer al.deinit(); - var writer = al.writer(); + const writer = al.writer(); var aa = arena.allocator(); var bytes_allocated: usize = 0; // pre-warm @@ -632,7 +637,7 @@ fn testRequest(request_bytes: []const u8) !void { const server_thread = try std.Thread.spawn( .{}, processRequest, - .{ &aa, &server, writer }, + .{ &aa, &http_server, writer }, ); const stream = try std.net.tcpConnectToHost(allocator, "127.0.0.1", server_port); @@ -670,14 +675,12 @@ test { var test_resp_buf: [1024]u8 = undefined; var test_resp_buf_len: usize = undefined; test "root path get" { - std.testing.log_level = .debug; log.debug("", .{}); try testGet("/"); try std.testing.expectEqual(@as(usize, 2), test_resp_buf_len); try std.testing.expectEqualStrings(" 1", test_resp_buf[0..test_resp_buf_len]); } test "root path, alternative host get" { - std.testing.log_level = .debug; log.debug("", .{}); try testHostGet("iam.aws.lerch.org", "/"); try std.testing.expectEqualStrings("iam response", test_resp_buf[0..test_resp_buf_len]);