Compare commits

..

2 Commits

Author SHA1 Message Date
b187fa80eb
update CI to zig 0.12.0
Some checks failed
Build / build (push) Failing after 1m5s
Build / sign (push) Has been skipped
Build / deploy (push) Has been skipped
2024-05-02 14:43:18 -07:00
c06bda918b
upgrade to zig 0.12.0 2024-05-02 14:38:04 -07:00
7 changed files with 227 additions and 212 deletions

View File

@ -4,7 +4,7 @@ on: [push]
env: env:
ACTIONS_RUNTIME_TOKEN: ${{ secrets.GITHUB_TOKEN }} ACTIONS_RUNTIME_TOKEN: ${{ secrets.GITHUB_TOKEN }}
ACTIONS_RUNTIME_URL: https://git.lerch.org/api/actions_pipeline/ ACTIONS_RUNTIME_URL: https://git.lerch.org/api/actions_pipeline/
ZIG_URL: https://ziglang.org/download/0.11.0/zig-linux-x86_64-0.11.0.tar.xz ZIG_URL: https://ziglang.org/download/0.12.0/zig-linux-x86_64-0.12.0.tar.xz
BUILD_TARGET: x86_64-linux-gnu # Needs to be gnu since we're using dlopen BUILD_TARGET: x86_64-linux-gnu # Needs to be gnu since we're using dlopen
BUILD_OPTIMIZATION: ReleaseSafe # Safety is usually a good thing BUILD_OPTIMIZATION: ReleaseSafe # Safety is usually a good thing
jobs: jobs:

View File

@ -27,8 +27,7 @@ pub fn build(b: *std.Build) void {
exe.linkLibC(); exe.linkLibC();
_ = b.addModule("flexilib-interface", .{ _ = b.addModule("flexilib-interface", .{
.source_file = .{ .path = "src/interface.zig" }, .root_source_file = b.path("src/interface.zig"),
.dependencies = &[_]std.build.ModuleDependency{},
}); });
const lib = b.addSharedLibrary(.{ const lib = b.addSharedLibrary(.{

View File

@ -12,13 +12,13 @@ const Wd = struct {
}; };
fileChanged: *const fn (usize) void, fileChanged: *const fn (usize) void,
inotify_fd: ?std.os.fd_t = null, inotify_fd: ?std.posix.fd_t = null,
nfds_t: usize = 0, nfds_t: usize = 0,
wds: [MAX_FDS]Wd = [_]Wd{.{}} ** MAX_FDS, wds: [MAX_FDS]Wd = [_]Wd{.{}} ** MAX_FDS,
dir_nfds_t: usize = 0, dir_nfds_t: usize = 0,
dir_wds: [MAX_FDS]Wd = [_]Wd{.{}} ** MAX_FDS, dir_wds: [MAX_FDS]Wd = [_]Wd{.{}} ** MAX_FDS,
control_socket: ?std.os.socket_t = null, control_socket: ?std.posix.socket_t = null,
watch_started: bool = false, watch_started: bool = false,
sock_name: [:0]const u8, sock_name: [:0]const u8,
@ -35,21 +35,14 @@ pub fn deinit(self: *Self) void {
if (self.control_socket) |s| { if (self.control_socket) |s| {
// Sockets...where Unix still pretends everything is a file, but it's not... // Sockets...where Unix still pretends everything is a file, but it's not...
log.debug("closing control socket", .{}); log.debug("closing control socket", .{});
std.os.closeSocket(s); std.posix.close(s);
} }
if (self.inotify_fd) |fd| { if (self.inotify_fd) |fd| {
for (0..self.nfds_t + self.dir_nfds_t) |inx| { for (0..self.nfds_t + self.dir_nfds_t) |inx| {
const wd = if (inx < self.nfds_t) self.wds[inx].wd else self.dir_wds[inx - self.nfds_t].wd; const wd = if (inx < self.nfds_t) self.wds[inx].wd else self.dir_wds[inx - self.nfds_t].wd;
switch (std.os.errno(std.os.linux.inotify_rm_watch(fd, wd))) { std.posix.inotify_rm_watch(fd, wd);
.SUCCESS => {},
.BADF => unreachable,
// NOTE: Getting EINVAL, but the call looks valid to me?
// ...and wait...not all the time?
.INVAL => log.err("error removing watch (EINVAL). OS claims fd ({d}) or wd ({d}) is invalid", .{ self.inotify_fd.?, wd }),
else => unreachable,
}
} }
std.os.close(fd); std.posix.close(fd);
} }
const cwd = std.fs.cwd(); const cwd = std.fs.cwd();
cwd.deleteFileZ(self.sock_name) catch |e| cwd.deleteFileZ(self.sock_name) catch |e|
@ -77,12 +70,12 @@ pub fn startWatch(self: *Self) void {
while (true) { while (true) {
self.watch_started = true; self.watch_started = true;
var fds = if (self.inotify_fd == null) const fds = if (self.inotify_fd == null)
@constCast(&[_]std.os.pollfd{.{ .fd = self.control_socket.?, .events = std.os.POLL.IN, .revents = undefined }}) @constCast(&[_]std.posix.pollfd{.{ .fd = self.control_socket.?, .events = std.posix.POLL.IN, .revents = undefined }})
else else
@constCast(&[_]std.os.pollfd{ @constCast(&[_]std.posix.pollfd{
.{ .fd = self.control_socket.?, .events = std.os.POLL.IN, .revents = undefined }, .{ .fd = self.control_socket.?, .events = std.posix.POLL.IN, .revents = undefined },
.{ .fd = self.inotify_fd.?, .events = std.os.POLL.IN, .revents = undefined }, .{ .fd = self.inotify_fd.?, .events = std.posix.POLL.IN, .revents = undefined },
}); });
const control_fd_inx = 0; const control_fd_inx = 0;
@ -97,11 +90,11 @@ pub fn startWatch(self: *Self) void {
// std.fs.watch looks really good...but it requires event based I/O, // std.fs.watch looks really good...but it requires event based I/O,
// which is not yet ready to be (re)added. // which is not yet ready to be (re)added.
log.debug("tid={d} start poll with {d} fds", .{ std.Thread.getCurrentId(), fds.len }); log.debug("tid={d} start poll with {d} fds", .{ std.Thread.getCurrentId(), fds.len });
if ((std.os.poll( if ((std.posix.poll(
fds, fds,
-1, // Infinite timeout -1, // Infinite timeout
) catch @panic("poll error")) > 0) { ) catch @panic("poll error")) > 0) {
if (fds[control_fd_inx].revents & std.os.POLL.IN == std.os.POLL.IN) { // POLLIN means "there is data to read" if (fds[control_fd_inx].revents & std.posix.POLL.IN == std.posix.POLL.IN) { // POLLIN means "there is data to read"
log.debug("tid={d} control event", .{std.Thread.getCurrentId()}); log.debug("tid={d} control event", .{std.Thread.getCurrentId()});
// we only need one byte for what we're doing // we only need one byte for what we're doing
var control_buf: [1]u8 = undefined; var control_buf: [1]u8 = undefined;
@ -109,9 +102,9 @@ pub fn startWatch(self: *Self) void {
// self.control_socket_accepted_fd = self.control_socket_accepted_fd orelse acceptSocket(self.control_socket.?); // self.control_socket_accepted_fd = self.control_socket_accepted_fd orelse acceptSocket(self.control_socket.?);
// const fd = self.control_socket_accepted_fd.?; // let's save some typing // const fd = self.control_socket_accepted_fd.?; // let's save some typing
const fd = acceptSocket(self.sock_name, self.control_socket.?); const fd = acceptSocket(self.sock_name, self.control_socket.?);
defer std.os.close(fd); defer std.posix.close(fd);
var readcount = std.os.recv(fd, &control_buf, 0) catch unreachable; const readcount = std.posix.recv(fd, &control_buf, 0) catch unreachable;
// var other_buf: [1]u8 = undefined; // var other_buf: [1]u8 = undefined;
// if (std.os.recv(fd, &other_buf, 0) catch unreachable != 0) // if (std.os.recv(fd, &other_buf, 0) catch unreachable != 0)
// @panic("socket contains more data than expected"); // @panic("socket contains more data than expected");
@ -142,11 +135,11 @@ pub fn startWatch(self: *Self) void {
// fds[1] is inotify, so if we have data in that file descriptor, // fds[1] is inotify, so if we have data in that file descriptor,
// we can force the data into an inotify_event structure and act on it // we can force the data into an inotify_event structure and act on it
if (self.inotify_fd != null and fds[inotify_fd_inx].revents & std.os.POLL.IN == std.os.POLL.IN) { if (self.inotify_fd != null and fds[inotify_fd_inx].revents & std.posix.POLL.IN == std.posix.POLL.IN) {
log.debug("tid={d} inotify event", .{std.Thread.getCurrentId()}); log.debug("tid={d} inotify event", .{std.Thread.getCurrentId()});
var event_buf: [4096]u8 align(@alignOf(std.os.linux.inotify_event)) = undefined; var event_buf: [4096]u8 align(@alignOf(std.os.linux.inotify_event)) = undefined;
// "borrowed" from https://ziglang.org/documentation/master/std/src/std/fs/watch.zig.html#L588 // "borrowed" from https://ziglang.org/documentation/master/std/src/std/fs/watch.zig.html#L588
const bytes_read = std.os.read(self.inotify_fd.?, &event_buf) catch unreachable; const bytes_read = std.posix.read(self.inotify_fd.?, &event_buf) catch unreachable;
var ptr: [*]u8 = &event_buf; var ptr: [*]u8 = &event_buf;
const end_ptr = ptr + bytes_read; const end_ptr = ptr + bytes_read;
@ -162,11 +155,11 @@ pub fn startWatch(self: *Self) void {
} }
} }
fn acceptSocket(name: [:0]const u8, socket: std.os.socket_t) std.os.socket_t { fn acceptSocket(name: [:0]const u8, socket: std.posix.socket_t) std.posix.socket_t {
var sockaddr = std.net.Address.initUnix(name) catch @panic("could not get sockaddr"); var sockaddr = std.net.Address.initUnix(name) catch @panic("could not get sockaddr");
var sockaddr_len: std.os.socklen_t = sockaddr.getOsSockLen(); var sockaddr_len: std.posix.socklen_t = sockaddr.getOsSockLen();
log.debug("tid={d} accepting on socket fd {d}", .{ std.Thread.getCurrentId(), socket }); log.debug("tid={d} accepting on socket fd {d}", .{ std.Thread.getCurrentId(), socket });
return std.os.accept( return std.posix.accept(
socket, socket,
&sockaddr.any, &sockaddr.any,
&sockaddr_len, &sockaddr_len,
@ -266,16 +259,16 @@ test "nameMatch" {
/// adds a file to watch. The return will be a handle that will be returned /// adds a file to watch. The return will be a handle that will be returned
/// in the fileChanged event triffered from startWatch /// in the fileChanged event triffered from startWatch
pub fn addFileWatch(self: *Self, path: *[:0]const u8) !usize { pub fn addFileWatch(self: *Self, path: *[:0]const u8) !usize {
self.inotify_fd = self.inotify_fd orelse try std.os.inotify_init1(std.os.linux.IN.NONBLOCK); self.inotify_fd = self.inotify_fd orelse try std.posix.inotify_init1(std.os.linux.IN.NONBLOCK);
errdefer { errdefer {
std.os.close(self.inotify_fd.?); std.posix.close(self.inotify_fd.?);
self.inotify_fd = null; self.inotify_fd = null;
} }
// zig build modification pattern: open 20, close_nowrite 10, MOVED_TO (on the directory), attrib 4 // zig build modification pattern: open 20, close_nowrite 10, MOVED_TO (on the directory), attrib 4
// unix cp: OPEN, MODIFY, CLOSE_WRITE, ATTRIB // unix cp: OPEN, MODIFY, CLOSE_WRITE, ATTRIB
// unix mv: MOVED_TO (on the directory) // unix mv: MOVED_TO (on the directory)
self.wds[self.nfds_t] = .{ self.wds[self.nfds_t] = .{
.wd = try std.os.inotify_add_watchZ( .wd = try std.posix.inotify_add_watchZ(
self.inotify_fd.?, self.inotify_fd.?,
path.*, path.*,
std.os.linux.IN.CLOSE_WRITE, std.os.linux.IN.CLOSE_WRITE,
@ -302,7 +295,7 @@ fn addDirWatch(self: *Self, path: *[]const u8) !void {
return; // We are already watching this directory return; // We are already watching this directory
// We do not have a directory watch // We do not have a directory watch
self.dir_wds[self.dir_nfds_t] = .{ self.dir_wds[self.dir_nfds_t] = .{
.wd = try std.os.inotify_add_watch(self.inotify_fd.?, dirname, std.os.linux.IN.MOVED_TO), .wd = try std.posix.inotify_add_watch(self.inotify_fd.?, dirname, std.os.linux.IN.MOVED_TO),
.path = path, // we store path rather than directory because doing this without an allocator is...tough .path = path, // we store path rather than directory because doing this without an allocator is...tough
}; };
self.dir_nfds_t += 1; self.dir_nfds_t += 1;
@ -342,10 +335,10 @@ fn addControlSocket(self: *Self, path: [:0]const u8) !void {
// //
// This function theoretically should work without requiring linux...except this inotify call, // This function theoretically should work without requiring linux...except this inotify call,
// which is completely linux specific // which is completely linux specific
self.inotify_fd = self.inotify_fd orelse try std.os.inotify_init1(std.os.linux.IN.NONBLOCK); self.inotify_fd = self.inotify_fd orelse try std.posix.inotify_init1(std.os.linux.IN.NONBLOCK);
log.debug("Established inotify file descriptor {d}", .{self.inotify_fd.?}); log.debug("Established inotify file descriptor {d}", .{self.inotify_fd.?});
errdefer { errdefer {
std.os.close(self.inotify_fd.?); std.posix.close(self.inotify_fd.?);
self.inotify_fd = null; self.inotify_fd = null;
} }
// this should work on all systems theoretically, but I believe would work only // this should work on all systems theoretically, but I believe would work only
@ -369,18 +362,18 @@ fn addControlSocket(self: *Self, path: [:0]const u8) !void {
// 6. std.os.close: close the fd // 6. std.os.close: close the fd
// //
// On end of use, we need to std.os.closeSocket() // On end of use, we need to std.os.closeSocket()
const sock = try std.os.socket( const sock = try std.posix.socket(
std.os.linux.AF.LOCAL, std.os.linux.AF.LOCAL,
std.os.linux.SOCK.STREAM | std.os.SOCK.CLOEXEC, std.os.linux.SOCK.STREAM | std.posix.SOCK.CLOEXEC,
0, 0,
); );
errdefer std.os.closeSocket(sock); errdefer std.posix.close(sock);
const sockaddr = try std.net.Address.initUnix(path); const sockaddr = try std.net.Address.initUnix(path);
log.debug("binding to path: {s}", .{path}); log.debug("binding to path: {s}", .{path});
try std.os.bind(sock, &sockaddr.any, sockaddr.getOsSockLen()); try std.posix.bind(sock, &sockaddr.any, sockaddr.getOsSockLen());
try std.os.listen(sock, 10); try std.posix.listen(sock, 10);
self.control_socket = sock; self.control_socket = sock;
log.debug("added control socket with fd={d}", .{sock}); log.debug("added control socket with fd={d}", .{sock});
} }

View File

@ -65,17 +65,17 @@ pub fn parse(self: Self, reader: anytype) !ParsedConfig {
defer self.allocator.free(line); defer self.allocator.free(line);
const nocomments = std.mem.trim(u8, @constCast(&std.mem.split(u8, line, "#")).first(), ws); const nocomments = std.mem.trim(u8, @constCast(&std.mem.split(u8, line, "#")).first(), ws);
var data_iterator = std.mem.split(u8, nocomments, "="); var data_iterator = std.mem.split(u8, nocomments, "=");
var key = std.mem.trim(u8, data_iterator.first(), ws); // first never fails const key = std.mem.trim(u8, data_iterator.first(), ws); // first never fails
if (key.len == 0) continue; if (key.len == 0) continue;
var value = std.mem.trim(u8, data_iterator.next() orelse return error.NoValueForKey, ws); const value = std.mem.trim(u8, data_iterator.next() orelse return error.NoValueForKey, ws);
// keys should be putNoClobber, but values can be put. // keys should be putNoClobber, but values can be put.
// Because we have to dup the memory here though, we want to // Because we have to dup the memory here though, we want to
// manage duplicate values seperately // manage duplicate values seperately
var dup_key = try self.allocator.dupeZ(u8, key); const dup_key = try self.allocator.dupeZ(u8, key);
var dup_value = try self.allocator.dupeZ(u8, value); const dup_value = try self.allocator.dupeZ(u8, value);
try rc.key_value_map.putNoClobber(dup_key, dup_value); try rc.key_value_map.putNoClobber(dup_key, dup_value);
if (!rc.value_key_map.contains(value)) { if (!rc.value_key_map.contains(value)) {
var keys = try self.allocator.create(std.ArrayList([:0]u8)); const keys = try self.allocator.create(std.ArrayList([:0]u8));
keys.* = std.ArrayList([:0]u8).init(self.allocator); keys.* = std.ArrayList([:0]u8).init(self.allocator);
try rc.value_key_map.put(dup_value, keys); try rc.value_key_map.put(dup_value, keys);
} }
@ -85,7 +85,7 @@ pub fn parse(self: Self, reader: anytype) !ParsedConfig {
} }
test "gets config from a stream" { test "gets config from a stream" {
var allocator = std.testing.allocator; const allocator = std.testing.allocator;
var stream = std.io.fixedBufferStream( var stream = std.io.fixedBufferStream(
\\# This is a simple "path prefix" = dynamic library path mapping \\# This is a simple "path prefix" = dynamic library path mapping
\\ # no reordering will be done, so you must do things most -> least specific \\ # no reordering will be done, so you must do things most -> least specific

View File

@ -44,7 +44,7 @@ pub const ZigRequest = struct {
target: []const u8, target: []const u8,
method: [:0]u8, method: [:0]u8,
content: []u8, content: []u8,
headers: std.http.Headers, headers: []std.http.Header,
}; };
pub const ZigHeader = struct { pub const ZigHeader = struct {
@ -56,7 +56,7 @@ pub const ZigResponse = struct {
status: std.http.Status = .ok, status: std.http.Status = .ok,
reason: ?[]const u8 = null, reason: ?[]const u8 = null,
body: *std.ArrayList(u8), body: *std.ArrayList(u8),
headers: std.http.Headers, headers: []std.http.Header,
request: ZigRequest, request: ZigRequest,
prepend: std.ArrayList(u8), prepend: std.ArrayList(u8),
@ -94,9 +94,9 @@ pub fn zigInit(parent_allocator: *anyopaque) callconv(.C) void {
/// Converts a StringHashMap to the structure necessary for passing through the /// Converts a StringHashMap to the structure necessary for passing through the
/// C boundary. This will be called automatically for you via the handleRequest function /// C boundary. This will be called automatically for you via the handleRequest function
/// and is also used by the main processing loop to coerce request headers /// and is also used by the main processing loop to coerce request headers
fn toHeaders(alloc: std.mem.Allocator, headers: std.http.Headers) ![*]Header { fn toHeaders(alloc: std.mem.Allocator, headers: []const std.http.Header) ![*]Header {
var header_array = try std.ArrayList(Header).initCapacity(alloc, headers.list.items.len); var header_array = try std.ArrayList(Header).initCapacity(alloc, headers.len);
for (headers.list.items) |*field| { for (headers) |*field| {
header_array.appendAssumeCapacity(.{ header_array.appendAssumeCapacity(.{
.name_ptr = @constCast(field.name.ptr), .name_ptr = @constCast(field.name.ptr),
.name_len = field.name.len, .name_len = field.name.len,
@ -114,7 +114,7 @@ fn toHeaders(alloc: std.mem.Allocator, headers: std.http.Headers) ![*]Header {
pub fn handleRequest(request: *Request, zigRequestHandler: ZigRequestHandler) ?*Response { pub fn handleRequest(request: *Request, zigRequestHandler: ZigRequestHandler) ?*Response {
// TODO: implement another library in C or Rust or something to show // TODO: implement another library in C or Rust or something to show
// that anything using a C ABI can be successful // that anything using a C ABI can be successful
var alloc = if (allocator) |a| a.* else { const alloc = if (allocator) |a| a.* else {
log.err("zigInit not called prior to handle_request. This is a coding error", .{}); log.err("zigInit not called prior to handle_request. This is a coding error", .{});
return null; return null;
}; };
@ -123,12 +123,12 @@ pub fn handleRequest(request: *Request, zigRequestHandler: ZigRequestHandler) ?*
var response = std.ArrayList(u8).init(alloc); var response = std.ArrayList(u8).init(alloc);
// setup headers // setup headers
var request_headers = std.http.Headers.init(alloc); var request_headers = std.ArrayList(std.http.Header).init(alloc);
for (0..request.headers_len) |i| for (0..request.headers_len) |i|
request_headers.append( request_headers.append(.{
request.headers[i].name_ptr[0..request.headers[i].name_len], .name = request.headers[i].name_ptr[0..request.headers[i].name_len],
request.headers[i].value_ptr[0..request.headers[i].value_len], .value = request.headers[i].value_ptr[0..request.headers[i].value_len],
) catch |e| { }) catch |e| {
log.err("Unexpected error processing request: {any}", .{e}); log.err("Unexpected error processing request: {any}", .{e});
if (@errorReturnTrace()) |trace| { if (@errorReturnTrace()) |trace| {
std.debug.dumpStackTrace(trace.*); std.debug.dumpStackTrace(trace.*);
@ -136,16 +136,22 @@ pub fn handleRequest(request: *Request, zigRequestHandler: ZigRequestHandler) ?*
return null; return null;
}; };
var prepend = std.ArrayList(u8).init(alloc); const prepend = std.ArrayList(u8).init(alloc);
var zig_response = ZigResponse{ var zig_response = ZigResponse{
.headers = .{ .allocator = alloc }, .headers = &.{},
.body = &response, .body = &response,
.prepend = prepend, .prepend = prepend,
.request = .{ .request = .{
.content = request.content[0..request.content_len], .content = request.content[0..request.content_len],
.target = request.target[0..request.target_len], .target = request.target[0..request.target_len],
.method = request.method[0..request.method_len :0], .method = request.method[0..request.method_len :0],
.headers = request_headers, .headers = request_headers.toOwnedSlice() catch |e| {
log.err("Unexpected error processing request: {any}", .{e});
if (@errorReturnTrace()) |trace| {
std.debug.dumpStackTrace(trace.*);
}
return null;
},
}, },
}; };
zigRequestHandler( zigRequestHandler(
@ -184,7 +190,7 @@ fn buildResponse(alloc: std.mem.Allocator, zig_response: *ZigResponse) ?*Respons
} }
return null; return null;
}; };
rc.headers_len = zig_response.headers.list.items.len; rc.headers_len = zig_response.headers.len;
rc.status = if (zig_response.status == .ok) 0 else @intFromEnum(zig_response.status); rc.status = if (zig_response.status == .ok) 0 else @intFromEnum(zig_response.status);
rc.reason_len = 0; rc.reason_len = 0;
if (zig_response.reason) |*r| { if (zig_response.reason) |*r| {

View File

@ -24,7 +24,7 @@ const log = std.log.scoped(.@"main-lib");
// } // }
// //
comptime { comptime {
@export(interface.zigInit, .{ .name = "zigInit", .linkage = .Strong }); @export(interface.zigInit, .{ .name = "zigInit", .linkage = .strong });
} }
/// handle_request will be called on a single request, but due to the preservation /// handle_request will be called on a single request, but due to the preservation
@ -58,32 +58,46 @@ export fn handle_request(request: *interface.Request) callconv(.C) ?*interface.R
// handleRequest function here is the last line of boilerplate and the // handleRequest function here is the last line of boilerplate and the
// entry to a request // entry to a request
fn handleRequest(allocator: std.mem.Allocator, response: *interface.ZigResponse) !void { fn handleRequest(allocator: std.mem.Allocator, response: *interface.ZigResponse) !void {
_ = allocator;
// setup // setup
var response_writer = response.body.writer(); var response_writer = response.body.writer();
// real work // real work
if (response.request.headers.getFirstValue("host")) |host| { for (response.request.headers) |h| {
if (std.mem.startsWith(u8, host, "iam")) { if (std.ascii.eqlIgnoreCase(h.name, "host")) {
try response_writer.print("iam response", .{}); if (std.mem.startsWith(u8, h.value, "iam")) {
try response_writer.print("iam response", .{});
return;
}
break;
}
}
for (response.request.headers) |h| {
if (std.ascii.eqlIgnoreCase(h.name, "x-slow")) {
std.time.sleep(std.time.ns_per_ms * (std.fmt.parseInt(usize, h.value, 10) catch 1000));
try response_writer.print("i am slow\n\n", .{});
return; return;
} }
} }
if (response.request.headers.getFirstValue("x-slow")) |ms| { for (response.request.headers) |h| {
std.time.sleep(std.time.ns_per_ms * (std.fmt.parseInt(usize, ms, 10) catch 1000)); if (std.ascii.eqlIgnoreCase(h.name, "x-log-this")) {
try response_writer.print("i am slow\n\n", .{}); try response.writeAll(h.value);
return; break;
}
} }
if (response.request.headers.getFirstValue("x-log-this")) |l| { for (response.request.headers) |h| {
try response.writeAll(l); if (std.ascii.eqlIgnoreCase(h.name, "x-status")) {
response.status = @enumFromInt(std.fmt.parseInt(u10, h.value, 10) catch 500);
break;
}
} }
if (response.request.headers.getFirstValue("x-status")) |s| { for (response.request.headers) |h| {
response.status = @enumFromInt(std.fmt.parseInt(u10, s, 10) catch 500); if (std.ascii.eqlIgnoreCase(h.name, "x-throw"))
return error.Thrown;
} }
if (response.request.headers.getFirstValue("x-throw")) |_| { try response_writer.print(" {d}", .{response.request.headers.len});
return error.Thrown; var headers = std.ArrayList(std.http.Header).init(allocator);
} try headers.appendSlice(response.headers);
try response_writer.print(" {d}", .{response.request.headers.list.items.len}); try headers.append(.{ .name = "X-custom-foo", .value = "bar" });
try response.headers.append("X-custom-foo", "bar"); response.headers = try headers.toOwnedSlice();
} }
test "handle_request" { test "handle_request" {
@ -91,7 +105,7 @@ test "handle_request" {
defer arena.deinit(); defer arena.deinit();
var aa = arena.allocator(); var aa = arena.allocator();
interface.zigInit(&aa); interface.zigInit(&aa);
var headers: []interface.Header = @constCast(&[_]interface.Header{.{ const headers: []interface.Header = @constCast(&[_]interface.Header{.{
.name_ptr = @ptrCast(@constCast("GET".ptr)), .name_ptr = @ptrCast(@constCast("GET".ptr)),
.name_len = 3, .name_len = 3,
.value_ptr = @ptrCast(@constCast("GET".ptr)), .value_ptr = @ptrCast(@constCast("GET".ptr)),
@ -119,7 +133,7 @@ test "lib can write data directly" {
defer arena.deinit(); defer arena.deinit();
var aa = arena.allocator(); var aa = arena.allocator();
interface.zigInit(&aa); interface.zigInit(&aa);
var headers: []interface.Header = @constCast(&[_]interface.Header{.{ const headers: []interface.Header = @constCast(&[_]interface.Header{.{
.name_ptr = @ptrCast(@constCast("x-log-this".ptr)), .name_ptr = @ptrCast(@constCast("x-log-this".ptr)),
.name_len = "x-log-this".len, .name_len = "x-log-this".len,
.value_ptr = @ptrCast(@constCast("I am a teapot".ptr)), .value_ptr = @ptrCast(@constCast("I am a teapot".ptr)),
@ -146,7 +160,7 @@ test "lib can write data directly and still throw" {
defer arena.deinit(); defer arena.deinit();
var aa = arena.allocator(); var aa = arena.allocator();
interface.zigInit(&aa); interface.zigInit(&aa);
var headers: []interface.Header = @constCast(&[_]interface.Header{ .{ const headers: []interface.Header = @constCast(&[_]interface.Header{ .{
.name_ptr = @ptrCast(@constCast("x-log-this".ptr)), .name_ptr = @ptrCast(@constCast("x-log-this".ptr)),
.name_len = "x-log-this".len, .name_len = "x-log-this".len,
.value_ptr = @ptrCast(@constCast("I am a teapot".ptr)), .value_ptr = @ptrCast(@constCast("I am a teapot".ptr)),
@ -176,7 +190,7 @@ test "lib can set status, update data directly and still throw" {
defer arena.deinit(); defer arena.deinit();
var aa = arena.allocator(); var aa = arena.allocator();
interface.zigInit(&aa); interface.zigInit(&aa);
var headers: []interface.Header = @constCast(&[_]interface.Header{ .{ const headers: []interface.Header = @constCast(&[_]interface.Header{ .{
.name_ptr = @ptrCast(@constCast("x-log-this".ptr)), .name_ptr = @ptrCast(@constCast("x-log-this".ptr)),
.name_len = "x-log-this".len, .name_len = "x-log-this".len,
.value_ptr = @ptrCast(@constCast("I am a teapot".ptr)), .value_ptr = @ptrCast(@constCast("I am a teapot".ptr)),

View File

@ -7,16 +7,17 @@ const config = @import("config.zig");
const log = std.log.scoped(.main); const log = std.log.scoped(.main);
// logging options // logging options
pub const std_options = struct { pub const std_options = .{
pub const log_level = switch (builtin.mode) { .log_level = if (!builtin.is_test)
.Debug => .debug, switch (builtin.mode) {
.ReleaseSafe => .info, .Debug => .debug,
.ReleaseFast, .ReleaseSmall => .err, .ReleaseSafe => .info,
}; .ReleaseFast, .ReleaseSmall => .err,
},
pub const log_scope_levels = &[_]std.log.ScopeLevel{ .log_scope_levels = &[_]std.log.ScopeLevel{
.{ .scope = .watch, .level = .info }, .{ .scope = .watch, .level = .info },
}; },
}; };
const serveFn = *const fn (*interface.Request) ?*interface.Response; const serveFn = *const fn (*interface.Request) ?*interface.Response;
@ -32,8 +33,9 @@ var watcher_thread: ?std.Thread = null;
var timer: ?std.time.Timer = null; var timer: ?std.time.Timer = null;
const FullReturn = struct { const FullReturn = struct {
response: []u8, content: []const u8,
executor: *Executor, executor: *Executor = undefined,
respond_options: std.http.Server.Request.RespondOptions,
}; };
// Executor structure, including functions that were found in the library // Executor structure, including functions that were found in the library
@ -52,9 +54,9 @@ const Executor = struct {
// fields used for internal accounting // fields used for internal accounting
watch: ?usize = null, watch: ?usize = null,
drain_in_progress: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false), drain_in_progress: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
load_in_progress: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false), load_in_progress: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
requests_in_flight: std.atomic.Atomic(usize) = std.atomic.Atomic(usize).init(0), requests_in_flight: std.atomic.Value(usize) = std.atomic.Value(usize).init(0),
}; };
const SERVE_FN_NAME = "handle_request"; const SERVE_FN_NAME = "handle_request";
@ -67,68 +69,68 @@ var parsed_config: config.ParsedConfig = undefined;
/// Serves a single request. Finds executor, marshalls request data for the C /// Serves a single request. Finds executor, marshalls request data for the C
/// interface, calls the executor and marshalls data back /// interface, calls the executor and marshalls data back
fn serve(allocator: *std.mem.Allocator, response: *std.http.Server.Response) !*FullReturn { fn serve(allocator: *std.mem.Allocator, request: *std.http.Server.Request) !*FullReturn {
const executor = try getExecutor(response.request.target, response.request.headers); var head_it = request.iterateHeaders();
errdefer _ = executor.requests_in_flight.fetchSub(1, .Monotonic); const headers = try toHeaders(allocator.*, &head_it);
const executor = try getExecutor(request.head.target, headers);
errdefer _ = executor.requests_in_flight.fetchSub(1, .monotonic);
if (executor.zigInitFn) |f| if (executor.zigInitFn) |f|
f(allocator); f(allocator);
// Call external library // Call external library
const method_tag = @tagName(response.request.method); const method_tag = @tagName(request.head.method);
const headers = try toHeaders(allocator.*, response.request.headers); const request_content: []u8 = try (try request.reader()).readAllAlloc(allocator.*, std.math.maxInt(usize));
var request_content: []u8 = &[_]u8{};
if (response.request.content_length) |l| {
request_content = try response.reader().readAllAlloc(allocator.*, @as(usize, l));
}
log.debug("{d} bytes read from request", .{request_content.len}); log.debug("{d} bytes read from request", .{request_content.len});
var request = interface.Request{ var i_request = interface.Request{
.target = response.request.target.ptr, .target = request.head.target.ptr,
.target_len = response.request.target.len, .target_len = request.head.target.len,
.method = @constCast(method_tag[0..].ptr), .method = @constCast(method_tag[0..].ptr),
.method_len = method_tag.len, .method_len = method_tag.len,
.headers = headers, .headers = headers.ptr,
.headers_len = response.request.headers.list.items.len, .headers_len = headers.len,
.content = request_content.ptr, .content = request_content.ptr,
.content_len = request_content.len, .content_len = request_content.len,
}; };
var serve_result = executor.serveFn.?(&request).?; // ok for this pointer deref to fail var serve_result = executor.serveFn.?(&i_request).?; // ok for this pointer deref to fail
log.debug("target: {s}", .{response.request.target}); log.debug("target: {s}", .{request.head.target});
log.debug("response ptr: {*}", .{serve_result.ptr}); log.debug("response ptr: {*}", .{serve_result.ptr});
var slice: []u8 = serve_result.ptr[0..serve_result.len]; var response = try allocator.create(FullReturn); // allocator is arena here
log.debug("response body: {s}", .{slice}); response.content = serve_result.ptr[0..serve_result.len];
response.executor = executor;
response.respond_options = .{};
log.debug("response body: {s}", .{response.content});
if (serve_result.status != 0) { if (serve_result.status != 0) {
response.status = @enumFromInt(serve_result.status); response.respond_options.status = @enumFromInt(serve_result.status);
if (serve_result.reason_len > 0) { if (serve_result.reason_len > 0) {
response.reason = serve_result.reason_ptr[0..serve_result.reason_len]; response.respond_options.reason = serve_result.reason_ptr[0..serve_result.reason_len];
} }
} }
// Deal with results // Deal with response headers
var response_headers = try std.ArrayList(std.http.Header).initCapacity(allocator.*, serve_result.headers_len + 1);
defer response_headers.deinit();
var content_type_added = false; var content_type_added = false;
for (0..serve_result.headers_len) |inx| { for (0..serve_result.headers_len) |inx| {
const head = serve_result.headers[inx]; const head = serve_result.headers[inx];
try response.headers.append( response_headers.appendAssumeCapacity(.{
head.name_ptr[0..head.name_len], .name = head.name_ptr[0..head.name_len],
head.value_ptr[0..head.value_len], .value = head.value_ptr[0..head.value_len],
); });
// headers are case insensitive // headers are case insensitive
content_type_added = std.ascii.eqlIgnoreCase(head.name_ptr[0..head.name_len], "content-type"); content_type_added = std.ascii.eqlIgnoreCase(head.name_ptr[0..head.name_len], "content-type");
} }
if (!content_type_added) if (!content_type_added)
try response.headers.append("content-type", "text/plain"); response_headers.appendAssumeCapacity(.{ .name = "content-type", .value = "text/plain" });
// target is path response.respond_options.extra_headers = try response_headers.toOwnedSlice();
var rc = try allocator.create(FullReturn); return response;
rc.executor = executor;
rc.response = slice;
return rc;
} }
/// Gets and executor based on request data /// Gets and executor based on request data
fn getExecutor(requested_path: []const u8, headers: std.http.Headers) !*Executor { fn getExecutor(requested_path: []const u8, headers: []interface.Header) !*Executor {
var executor = blk: { var executor = blk: {
for (executors) |*exec| { for (executors) |*exec| {
if (executorIsMatch(exec.match_data, requested_path, headers)) { if (executorIsMatch(exec.match_data, requested_path, headers)) {
@ -138,15 +140,15 @@ fn getExecutor(requested_path: []const u8, headers: std.http.Headers) !*Executor
log.err("Could not find executor for target path '{s}'", .{requested_path}); log.err("Could not find executor for target path '{s}'", .{requested_path});
return error.NoApplicableExecutor; return error.NoApplicableExecutor;
}; };
while (executor.drain_in_progress.load(.Acquire)) { while (executor.drain_in_progress.load(.acquire)) {
// we need to stand down and stand by // we need to stand down and stand by
std.atomic.spinLoopHint(); // Let CPU know we're just hanging out std.atomic.spinLoopHint(); // Let CPU know we're just hanging out
std.time.sleep(1 * std.time.ns_per_ms / 2); std.time.sleep(1 * std.time.ns_per_ms / 2);
} }
// Tell everyone we're about to use this bad boy // Tell everyone we're about to use this bad boy
// While requests_in_flight is >= 0, nobody should unload the library // While requests_in_flight is >= 0, nobody should unload the library
_ = executor.requests_in_flight.fetchAdd(1, .Acquire); _ = executor.requests_in_flight.fetchAdd(1, .acquire);
errdefer _ = executor.requests_in_flight.fetchSub(1, .Release); errdefer _ = executor.requests_in_flight.fetchSub(1, .release);
if (executor.serveFn != null) return executor; if (executor.serveFn != null) return executor;
@ -157,13 +159,13 @@ fn getExecutor(requested_path: []const u8, headers: std.http.Headers) !*Executor
// If the library is being reloaded and a bunch of requests come in, // If the library is being reloaded and a bunch of requests come in,
// we could have multiple threads racing to load // we could have multiple threads racing to load
// NOTE: I am not confident of the memory ordering here on tryCompareAndSwap // NOTE: I am not confident of the memory ordering here on tryCompareAndSwap
while (executor.load_in_progress.tryCompareAndSwap(false, true, .Acquire, .Acquire)) |_| { while (@cmpxchgWeak(bool, &executor.load_in_progress.raw, false, true, .acquire, .acquire)) |_| {
// we need to stand down and stand by // we need to stand down and stand by
std.atomic.spinLoopHint(); // Let CPU know we're just hanging out std.atomic.spinLoopHint(); // Let CPU know we're just hanging out
std.time.sleep(1 * std.time.ns_per_ms / 2); std.time.sleep(1 * std.time.ns_per_ms / 2);
} }
// we have the conch...lock others out // we have the conch...lock others out
defer executor.load_in_progress.store(false, .Release); defer executor.load_in_progress.store(false, .release);
if (executor.library) |l| if (executor.library) |l|
break :blk l; // someone beat us to the race..our defer above will take care of unlocking break :blk l; // someone beat us to the race..our defer above will take care of unlocking
@ -185,7 +187,7 @@ fn getExecutor(requested_path: []const u8, headers: std.http.Headers) !*Executor
return executor; return executor;
} }
fn executorIsMatch(match_data: []const u8, requested_path: []const u8, headers: std.http.Headers) bool { fn executorIsMatch(match_data: []const u8, requested_path: []const u8, headers: []interface.Header) bool {
if (!std.mem.containsAtLeast(u8, match_data, 1, ":")) { if (!std.mem.containsAtLeast(u8, match_data, 1, ":")) {
// match_data does not have a ':'. This means this is a straight path, without // match_data does not have a ':'. This means this is a straight path, without
// any header requirement. We can simply return a match prefix on the // any header requirement. We can simply return a match prefix on the
@ -196,7 +198,13 @@ fn executorIsMatch(match_data: []const u8, requested_path: []const u8, headers:
} }
const colon = std.mem.indexOf(u8, match_data, ":").?; const colon = std.mem.indexOf(u8, match_data, ":").?;
const header_needle = match_data[0..colon]; const header_needle = match_data[0..colon];
const header_inx = headers.firstIndexOf(header_needle) orelse return false; const header_inx = blk: {
for (headers, 0..) |h, i| {
if (std.ascii.eqlIgnoreCase(h.name_ptr[0..h.name_len], header_needle))
break :blk i;
}
return false;
};
// Apparently std.mem.split will return an empty first when the haystack starts // Apparently std.mem.split will return an empty first when the haystack starts
// with the delimiter // with the delimiter
var split = std.mem.split(u8, std.mem.trim(u8, match_data[colon + 1 ..], "\t "), " "); var split = std.mem.split(u8, std.mem.trim(u8, match_data[colon + 1 ..], "\t "), " ");
@ -211,7 +219,7 @@ fn executorIsMatch(match_data: []const u8, requested_path: []const u8, headers:
// match_data includes some sort of header match as well. We assume the // match_data includes some sort of header match as well. We assume the
// header match is a full match on the key (handled above) // header match is a full match on the key (handled above)
// but a prefix match on the value // but a prefix match on the value
const request_header_value = headers.list.items[header_inx].value; const request_header_value = headers[header_inx].value_ptr[0..headers[header_inx].value_len];
// (shoud this be case insensitive?) // (shoud this be case insensitive?)
if (!std.mem.startsWith(u8, request_header_value, header_value_needle)) return false; if (!std.mem.startsWith(u8, request_header_value, header_value_needle)) return false;
// header value matches...return the path prefix match // header value matches...return the path prefix match
@ -243,9 +251,9 @@ fn executorChanged(watch: usize) void {
if (executor.watch) |w| { if (executor.watch) |w| {
if (w == watch) { if (w == watch) {
if (executor.library) |l| { if (executor.library) |l| {
executor.drain_in_progress.store(true, .Release); executor.drain_in_progress.store(true, .release);
defer executor.drain_in_progress.store(false, .Release); defer executor.drain_in_progress.store(false, .release);
while (executor.requests_in_flight.load(.Acquire) > 0) { while (executor.requests_in_flight.load(.acquire) > 0) {
std.atomic.spinLoopHint(); std.atomic.spinLoopHint();
std.time.sleep(1 * std.time.ns_per_ms / 2); std.time.sleep(1 * std.time.ns_per_ms / 2);
} }
@ -257,7 +265,7 @@ fn executorChanged(watch: usize) void {
log.warn("could not reload! error opening library", .{}); log.warn("could not reload! error opening library", .{});
return; return;
}; };
var symbol = std.c.dlsym(executor.library.?, SERVE_FN_NAME); const symbol = std.c.dlsym(executor.library.?, SERVE_FN_NAME);
if (symbol == null) { if (symbol == null) {
log.warn("could not reload! error finding symbol", .{}); log.warn("could not reload! error finding symbol", .{});
if (std.c.dlclose(executor.library.?) != 0) if (std.c.dlclose(executor.library.?) != 0)
@ -284,11 +292,11 @@ fn dlopen(path: [:0]const u8) !*anyopaque {
/// exit from the application as the main function has an infinite loop /// exit from the application as the main function has an infinite loop
fn exitApplication( fn exitApplication(
_: i32, _: i32,
_: *const std.os.siginfo_t, _: *const std.posix.siginfo_t,
_: ?*const anyopaque, _: ?*const anyopaque,
) callconv(.C) noreturn { ) callconv(.C) noreturn {
exitApp(0); exitApp(0);
std.os.exit(0); std.posix.exit(0);
} }
/// exitApp handles deinitialization for the application and any reporting /// exitApp handles deinitialization for the application and any reporting
@ -302,7 +310,7 @@ fn exitApp(exitcode: u8) void {
watcher.stopWatch() catch @panic("could not stop watcher"); watcher.stopWatch() catch @panic("could not stop watcher");
std.io.getStdOut().writer().print("exiting application\n", .{}) catch {}; std.io.getStdOut().writer().print("exiting application\n", .{}) catch {};
watcher.deinit(); watcher.deinit();
std.os.exit(exitcode); std.posix.exit(exitcode);
// joining threads will hang...we're ultimately in a signal handler. // joining threads will hang...we're ultimately in a signal handler.
// But everything is shut down cleanly now, so I don't think it hurts to // But everything is shut down cleanly now, so I don't think it hurts to
// just kill it all (NOTE: from a practical perspective, we do not seem // just kill it all (NOTE: from a practical perspective, we do not seem
@ -316,7 +324,7 @@ fn exitApp(exitcode: u8) void {
/// has been implemented. Operates off the main thread /// has been implemented. Operates off the main thread
fn reloadConfig( fn reloadConfig(
_: i32, _: i32,
_: *const std.os.siginfo_t, _: *const std.posix.siginfo_t,
_: ?*const anyopaque, _: ?*const anyopaque,
) callconv(.C) void { ) callconv(.C) void {
// TODO: Gracefully drain in flight requests and hold a lock here while // TODO: Gracefully drain in flight requests and hold a lock here while
@ -334,36 +342,36 @@ fn reloadConfig(
/// Installs all signal handlers for shutdown and configuration reload /// Installs all signal handlers for shutdown and configuration reload
fn installSignalHandler() !void { fn installSignalHandler() !void {
var act = std.os.Sigaction{ var act = std.posix.Sigaction{
.handler = .{ .sigaction = exitApplication }, .handler = .{ .sigaction = exitApplication },
.mask = std.os.empty_sigset, .mask = std.posix.empty_sigset,
.flags = (std.os.SA.SIGINFO | std.os.SA.RESTART | std.os.SA.RESETHAND), .flags = (std.posix.SA.SIGINFO | std.posix.SA.RESTART | std.posix.SA.RESETHAND),
}; };
try std.os.sigaction(std.os.SIG.INT, &act, null); try std.posix.sigaction(std.posix.SIG.INT, &act, null);
try std.os.sigaction(std.os.SIG.TERM, &act, null); try std.posix.sigaction(std.posix.SIG.TERM, &act, null);
var hup_act = std.os.Sigaction{ var hup_act = std.posix.Sigaction{
.handler = .{ .sigaction = reloadConfig }, .handler = .{ .sigaction = reloadConfig },
.mask = std.os.empty_sigset, .mask = std.posix.empty_sigset,
.flags = (std.os.SA.SIGINFO | std.os.SA.RESTART | std.os.SA.RESETHAND), .flags = (std.posix.SA.SIGINFO | std.posix.SA.RESTART | std.posix.SA.RESETHAND),
}; };
try std.os.sigaction(std.os.SIG.HUP, &hup_act, null); try std.posix.sigaction(std.posix.SIG.HUP, &hup_act, null);
} }
pub fn main() !void { pub fn main() !void {
// We are linked to libc, and primarily using the arena allocator // We are linked to libc, and primarily using the arena allocator
// raw allocator recommended for use in arenas // raw allocator recommended for use in arenas
var raw_allocator = std.heap.raw_c_allocator; const raw_allocator = std.heap.raw_c_allocator;
// Our child process will also need an allocator, and is using the // Our child process will also need an allocator, and is using the
// same pattern, so we will hand the child a raw allocator as well // same pattern, so we will hand the child a raw allocator as well
var child_allocator = std.heap.raw_c_allocator; const child_allocator = std.heap.raw_c_allocator;
// Lastly, we need some of our own operations // Lastly, we need some of our own operations
var arena = std.heap.ArenaAllocator.init(raw_allocator); var arena = std.heap.ArenaAllocator.init(raw_allocator);
defer arena.deinit(); defer arena.deinit();
var parent_allocator = arena.allocator(); const parent_allocator = arena.allocator();
var al = std.ArrayList([]const u8).init(parent_allocator); var al = std.ArrayList([]const u8).init(parent_allocator);
defer al.deinit(); defer al.deinit();
@ -408,25 +416,22 @@ fn childMain(allocator: std.mem.Allocator) !void {
watcher = Watch.init(executorChanged); watcher = Watch.init(executorChanged);
watcher_thread = try std.Thread.spawn(.{}, Watch.startWatch, .{&watcher}); watcher_thread = try std.Thread.spawn(.{}, Watch.startWatch, .{&watcher});
var server = std.http.Server.init(allocator, .{ .reuse_address = true });
defer server.deinit();
const address = try std.net.Address.parseIp( const address = try std.net.Address.parseIp(
"0.0.0.0", "0.0.0.0",
if (std.os.getenv("PORT")) |p| try std.fmt.parseInt(u16, p, 10) else PORT, if (std.posix.getenv("PORT")) |p| try std.fmt.parseInt(u16, p, 10) else PORT,
); );
try server.listen(address); var server = try address.listen(.{ .reuse_address = true });
const server_port = server.socket.listen_address.in.getPort(); const server_port = server.listen_address.in.getPort();
log.info("listening on port: {d}", .{server_port}); log.info("listening on port: {d}", .{server_port});
if (builtin.os.tag == .linux) if (builtin.os.tag == .linux)
log.info("pid: {d}", .{std.os.linux.getpid()}); log.info("pid: {d}", .{std.os.linux.getpid()});
try installSignalHandler(); try installSignalHandler();
response_preallocation_kb = if (std.os.getenv("RESPONSE_PREALLOCATION_KB")) |kb| response_preallocation_kb = if (std.posix.getenv("RESPONSE_PREALLOCATION_KB")) |kb|
try std.fmt.parseInt(usize, kb, 10) try std.fmt.parseInt(usize, kb, 10)
else else
response_preallocation_kb; response_preallocation_kb;
var server_thread_count = if (std.os.getenv("SERVER_THREAD_COUNT")) |count| const server_thread_count = if (std.posix.getenv("SERVER_THREAD_COUNT")) |count|
try std.fmt.parseInt(usize, count, 10) try std.fmt.parseInt(usize, count, 10)
else switch (builtin.mode) { else switch (builtin.mode) {
.Debug => @min(4, try std.Thread.getCpuCount()), .Debug => @min(4, try std.Thread.getCpuCount()),
@ -451,7 +456,7 @@ fn childMain(allocator: std.mem.Allocator) !void {
for (server_threads.items) |thread| thread.join(); for (server_threads.items) |thread| thread.join();
} }
fn threadMain(allocator: std.mem.Allocator, server: *std.http.Server, thread_number: usize) !void { fn threadMain(allocator: std.mem.Allocator, server: *std.net.Server, thread_number: usize) !void {
// TODO: If we're in a thread pool we need to be careful with this... // TODO: If we're in a thread pool we need to be careful with this...
const stdout_file = std.io.getStdOut().writer(); const stdout_file = std.io.getStdOut().writer();
var bw = std.io.bufferedWriter(stdout_file); var bw = std.io.bufferedWriter(stdout_file);
@ -505,37 +510,42 @@ fn loadConfig(allocator: std.mem.Allocator) ![]Executor {
/// main loop calls processRequest, which is responsible for the interface /// main loop calls processRequest, which is responsible for the interface
/// with logs, connection accounting, etc. The work dealing with the request /// with logs, connection accounting, etc. The work dealing with the request
/// itself is delegated to the serve function to work with the executor /// itself is delegated to the serve function to work with the executor
fn processRequest(allocator: *std.mem.Allocator, server: *std.http.Server, writer: anytype) !void { fn processRequest(allocator: *std.mem.Allocator, server: *std.net.Server, writer: anytype) !void {
if (timer == null) timer = try std.time.Timer.start(); if (timer == null) timer = try std.time.Timer.start();
var tm = timer.?; var tm = timer.?;
var res = try server.accept(.{ .allocator = allocator.* }); var connection = try server.accept();
defer res.deinit(); defer connection.stream.close();
defer _ = res.reset(); var read_buffer: [1024 * 16]u8 = undefined; // TODO: fix this
try res.wait(); // wait for client to send a complete request head var server_connection = std.http.Server.init(connection, &read_buffer);
var req = try server_connection.receiveHead();
// TODO: should we check for server_connection.state == .ready?
// I believe it's fair to start our timer after this is done // I believe it's fair to start our timer after this is done
tm.reset(); tm.reset();
const err_return = FullReturn{
.content = "Internal Server Error\n",
.respond_options = .{ .status = .internal_server_error },
};
// This is an nginx log: // This is an nginx log:
// git.lerch.org 50.39.111.175 - - [16/May/2023:02:56:31 +0000] "POST /api/actions/runner.v1.RunnerService/FetchTask HTTP/2.0" 200 0 "-" "connect-go/1.2.0-dev (go1.20.1)" "172.20.0.5:3000" // git.lerch.org 50.39.111.175 - - [16/May/2023:02:56:31 +0000] "POST /api/actions/runner.v1.RunnerService/FetchTask HTTP/2.0" 200 0 "-" "connect-go/1.2.0-dev (go1.20.1)" "172.20.0.5:3000"
// TODO: replicate this // TODO: replicate this
try writer.print("{} - - \"{s} {s} {s}\"", .{ try writer.print("{} - - \"{s} {s} {s}\"", .{
res.address, server_connection.connection.address,
@tagName(res.request.method), @tagName(req.head.method),
res.request.target, req.head.target,
@tagName(res.request.version), @tagName(req.head.version),
}); });
const errstr = "Internal Server Error\n"; var caught_err = false;
var errbuf: [errstr.len]u8 = undefined; const full_response = serve(allocator, &req) catch |e| brk: {
var response_bytes = try std.fmt.bufPrint(&errbuf, errstr, .{});
var full_response = serve(allocator, &res) catch |e| brk: {
res.status = .internal_server_error;
// TODO: more about this particular request
log.err("Unexpected error from executor processing request: {any}", .{e}); log.err("Unexpected error from executor processing request: {any}", .{e});
// TODO: more about this particular request
if (@errorReturnTrace()) |trace| { if (@errorReturnTrace()) |trace| {
std.debug.dumpStackTrace(trace.*); std.debug.dumpStackTrace(trace.*);
} }
break :brk null;
caught_err = true;
break :brk &err_return;
}; };
defer { defer {
// The call above converts uncaught errors to a null // The call above converts uncaught errors to a null
@ -545,30 +555,29 @@ fn processRequest(allocator: *std.mem.Allocator, server: *std.http.Server, write
// This leaves this defer block as the only place we can/should decrement // This leaves this defer block as the only place we can/should decrement
// under normal conditions // under normal conditions
if (full_response) |f| { if (!caught_err) {
if (f.executor.requestDeinitFn) |d| d(); if (full_response.executor.requestDeinitFn) |d| d();
_ = f.executor.requests_in_flight.fetchSub(1, .Release); _ = full_response.executor.requests_in_flight.fetchSub(1, .release);
} }
} }
if (full_response) |f| try writer.print(" {d} ttfb {d:.3}ms", .{
response_bytes = f.response; @intFromEnum(full_response.respond_options.status),
res.transfer_encoding = .{ .content_length = response_bytes.len }; @as(f64, @floatFromInt(tm.read())) / std.time.ns_per_ms,
try res.headers.append("connection", "close"); });
try writer.print(" {d} ttfb {d:.3}ms", .{ @intFromEnum(res.status), @as(f64, @floatFromInt(tm.read())) / std.time.ns_per_ms }); if (builtin.is_test) writeToTestBuffers(full_response.content);
if (builtin.is_test) writeToTestBuffers(response_bytes, &res); req.respond(full_response.content, full_response.respond_options) catch |e| {
try res.do(); log.err("Unexpected error responding to client: {any}", .{e});
_ = try res.writer().writeAll(response_bytes); };
try res.finish();
try writer.print(" {d} ttlb {d:.3}ms", .{ try writer.print(" {d} ttlb {d:.3}ms", .{
response_bytes.len, full_response.content.len,
@as(f64, @floatFromInt(tm.read())) / std.time.ns_per_ms, @as(f64, @floatFromInt(tm.read())) / std.time.ns_per_ms,
}); });
} }
fn toHeaders(allocator: std.mem.Allocator, headers: std.http.Headers) ![*]interface.Header { fn toHeaders(allocator: std.mem.Allocator, headers: *std.http.HeaderIterator) ![]interface.Header {
var header_array = try std.ArrayList(interface.Header).initCapacity(allocator, headers.list.items.len); var header_array = std.ArrayList(interface.Header).init(allocator);
for (headers.list.items) |kv| { while (headers.next()) |kv| {
header_array.appendAssumeCapacity(.{ try header_array.append(.{
.name_ptr = @constCast(kv.name).ptr, .name_ptr = @constCast(kv.name).ptr,
.name_len = kv.name.len, .name_len = kv.name.len,
@ -576,7 +585,7 @@ fn toHeaders(allocator: std.mem.Allocator, headers: std.http.Headers) ![*]interf
.value_len = kv.value.len, .value_len = kv.value.len,
}); });
} }
return header_array.items.ptr; return try header_array.toOwnedSlice();
} }
/// Allocates at least preallocated_kb kilobytes of ram for usage. Some overhead /// Allocates at least preallocated_kb kilobytes of ram for usage. Some overhead
@ -592,12 +601,11 @@ fn preWarmArena(aa: std.mem.Allocator, arena: *std.heap.ArenaAllocator, prealloc
); );
if (!arena.reset(.{ .retain_with_limit = (arena.queryCapacity() + @as(usize, 1023)) / @as(usize, 1024) * 1024 })) if (!arena.reset(.{ .retain_with_limit = (arena.queryCapacity() + @as(usize, 1023)) / @as(usize, 1024) * 1024 }))
log.warn("arena reset failed, arena degraded", .{}); log.warn("arena reset failed, arena degraded", .{});
var bytes_allocated = arena.queryCapacity(); const bytes_allocated = arena.queryCapacity();
log.debug("preallocated {d} bytes", .{bytes_allocated}); log.debug("preallocated {d} bytes", .{bytes_allocated});
return bytes_allocated; return bytes_allocated;
} }
fn writeToTestBuffers(response: []const u8, res: *std.http.Server.Response) void { fn writeToTestBuffers(response: []const u8) void {
_ = res;
log.debug("writing to test buffers", .{}); log.debug("writing to test buffers", .{});
// This performs core dump...because we're in a separate thread? // This performs core dump...because we're in a separate thread?
// @memset(test_resp_buf, 0); // @memset(test_resp_buf, 0);
@ -615,16 +623,13 @@ fn testRequest(request_bytes: []const u8) !void {
var arena = std.heap.ArenaAllocator.init(allocator); var arena = std.heap.ArenaAllocator.init(allocator);
defer arena.deinit(); defer arena.deinit();
var server = std.http.Server.init(allocator, .{ .reuse_address = true });
defer server.deinit();
const address = try std.net.Address.parseIp("127.0.0.1", 0); const address = try std.net.Address.parseIp("127.0.0.1", 0);
try server.listen(address); var http_server = try address.listen(.{ .reuse_address = true });
const server_port = server.socket.listen_address.in.getPort(); const server_port = http_server.listen_address.in.getPort();
var al = std.ArrayList(u8).init(allocator); var al = std.ArrayList(u8).init(allocator);
defer al.deinit(); defer al.deinit();
var writer = al.writer(); const writer = al.writer();
var aa = arena.allocator(); var aa = arena.allocator();
var bytes_allocated: usize = 0; var bytes_allocated: usize = 0;
// pre-warm // pre-warm
@ -632,7 +637,7 @@ fn testRequest(request_bytes: []const u8) !void {
const server_thread = try std.Thread.spawn( const server_thread = try std.Thread.spawn(
.{}, .{},
processRequest, processRequest,
.{ &aa, &server, writer }, .{ &aa, &http_server, writer },
); );
const stream = try std.net.tcpConnectToHost(allocator, "127.0.0.1", server_port); const stream = try std.net.tcpConnectToHost(allocator, "127.0.0.1", server_port);
@ -670,14 +675,12 @@ test {
var test_resp_buf: [1024]u8 = undefined; var test_resp_buf: [1024]u8 = undefined;
var test_resp_buf_len: usize = undefined; var test_resp_buf_len: usize = undefined;
test "root path get" { test "root path get" {
std.testing.log_level = .debug;
log.debug("", .{}); log.debug("", .{});
try testGet("/"); try testGet("/");
try std.testing.expectEqual(@as(usize, 2), test_resp_buf_len); try std.testing.expectEqual(@as(usize, 2), test_resp_buf_len);
try std.testing.expectEqualStrings(" 1", test_resp_buf[0..test_resp_buf_len]); try std.testing.expectEqualStrings(" 1", test_resp_buf[0..test_resp_buf_len]);
} }
test "root path, alternative host get" { test "root path, alternative host get" {
std.testing.log_level = .debug;
log.debug("", .{}); log.debug("", .{});
try testHostGet("iam.aws.lerch.org", "/"); try testHostGet("iam.aws.lerch.org", "/");
try std.testing.expectEqualStrings("iam response", test_resp_buf[0..test_resp_buf_len]); try std.testing.expectEqualStrings("iam response", test_resp_buf[0..test_resp_buf_len]);