upgrade to zig 0.12.0

This commit is contained in:
Emil Lerch 2024-05-02 14:38:04 -07:00
parent bf09470abd
commit c06bda918b
Signed by: lobo
GPG Key ID: A7B62D657EF764F8
6 changed files with 226 additions and 211 deletions

View File

@ -27,8 +27,7 @@ pub fn build(b: *std.Build) void {
exe.linkLibC();
_ = b.addModule("flexilib-interface", .{
.source_file = .{ .path = "src/interface.zig" },
.dependencies = &[_]std.build.ModuleDependency{},
.root_source_file = b.path("src/interface.zig"),
});
const lib = b.addSharedLibrary(.{

View File

@ -12,13 +12,13 @@ const Wd = struct {
};
fileChanged: *const fn (usize) void,
inotify_fd: ?std.os.fd_t = null,
inotify_fd: ?std.posix.fd_t = null,
nfds_t: usize = 0,
wds: [MAX_FDS]Wd = [_]Wd{.{}} ** MAX_FDS,
dir_nfds_t: usize = 0,
dir_wds: [MAX_FDS]Wd = [_]Wd{.{}} ** MAX_FDS,
control_socket: ?std.os.socket_t = null,
control_socket: ?std.posix.socket_t = null,
watch_started: bool = false,
sock_name: [:0]const u8,
@ -35,21 +35,14 @@ pub fn deinit(self: *Self) void {
if (self.control_socket) |s| {
// Sockets...where Unix still pretends everything is a file, but it's not...
log.debug("closing control socket", .{});
std.os.closeSocket(s);
std.posix.close(s);
}
if (self.inotify_fd) |fd| {
for (0..self.nfds_t + self.dir_nfds_t) |inx| {
const wd = if (inx < self.nfds_t) self.wds[inx].wd else self.dir_wds[inx - self.nfds_t].wd;
switch (std.os.errno(std.os.linux.inotify_rm_watch(fd, wd))) {
.SUCCESS => {},
.BADF => unreachable,
// NOTE: Getting EINVAL, but the call looks valid to me?
// ...and wait...not all the time?
.INVAL => log.err("error removing watch (EINVAL). OS claims fd ({d}) or wd ({d}) is invalid", .{ self.inotify_fd.?, wd }),
else => unreachable,
}
std.posix.inotify_rm_watch(fd, wd);
}
std.os.close(fd);
std.posix.close(fd);
}
const cwd = std.fs.cwd();
cwd.deleteFileZ(self.sock_name) catch |e|
@ -77,12 +70,12 @@ pub fn startWatch(self: *Self) void {
while (true) {
self.watch_started = true;
var fds = if (self.inotify_fd == null)
@constCast(&[_]std.os.pollfd{.{ .fd = self.control_socket.?, .events = std.os.POLL.IN, .revents = undefined }})
const fds = if (self.inotify_fd == null)
@constCast(&[_]std.posix.pollfd{.{ .fd = self.control_socket.?, .events = std.posix.POLL.IN, .revents = undefined }})
else
@constCast(&[_]std.os.pollfd{
.{ .fd = self.control_socket.?, .events = std.os.POLL.IN, .revents = undefined },
.{ .fd = self.inotify_fd.?, .events = std.os.POLL.IN, .revents = undefined },
@constCast(&[_]std.posix.pollfd{
.{ .fd = self.control_socket.?, .events = std.posix.POLL.IN, .revents = undefined },
.{ .fd = self.inotify_fd.?, .events = std.posix.POLL.IN, .revents = undefined },
});
const control_fd_inx = 0;
@ -97,11 +90,11 @@ pub fn startWatch(self: *Self) void {
// std.fs.watch looks really good...but it requires event based I/O,
// which is not yet ready to be (re)added.
log.debug("tid={d} start poll with {d} fds", .{ std.Thread.getCurrentId(), fds.len });
if ((std.os.poll(
if ((std.posix.poll(
fds,
-1, // Infinite timeout
) catch @panic("poll error")) > 0) {
if (fds[control_fd_inx].revents & std.os.POLL.IN == std.os.POLL.IN) { // POLLIN means "there is data to read"
if (fds[control_fd_inx].revents & std.posix.POLL.IN == std.posix.POLL.IN) { // POLLIN means "there is data to read"
log.debug("tid={d} control event", .{std.Thread.getCurrentId()});
// we only need one byte for what we're doing
var control_buf: [1]u8 = undefined;
@ -109,9 +102,9 @@ pub fn startWatch(self: *Self) void {
// self.control_socket_accepted_fd = self.control_socket_accepted_fd orelse acceptSocket(self.control_socket.?);
// const fd = self.control_socket_accepted_fd.?; // let's save some typing
const fd = acceptSocket(self.sock_name, self.control_socket.?);
defer std.os.close(fd);
defer std.posix.close(fd);
var readcount = std.os.recv(fd, &control_buf, 0) catch unreachable;
const readcount = std.posix.recv(fd, &control_buf, 0) catch unreachable;
// var other_buf: [1]u8 = undefined;
// if (std.os.recv(fd, &other_buf, 0) catch unreachable != 0)
// @panic("socket contains more data than expected");
@ -142,11 +135,11 @@ pub fn startWatch(self: *Self) void {
// fds[1] is inotify, so if we have data in that file descriptor,
// we can force the data into an inotify_event structure and act on it
if (self.inotify_fd != null and fds[inotify_fd_inx].revents & std.os.POLL.IN == std.os.POLL.IN) {
if (self.inotify_fd != null and fds[inotify_fd_inx].revents & std.posix.POLL.IN == std.posix.POLL.IN) {
log.debug("tid={d} inotify event", .{std.Thread.getCurrentId()});
var event_buf: [4096]u8 align(@alignOf(std.os.linux.inotify_event)) = undefined;
// "borrowed" from https://ziglang.org/documentation/master/std/src/std/fs/watch.zig.html#L588
const bytes_read = std.os.read(self.inotify_fd.?, &event_buf) catch unreachable;
const bytes_read = std.posix.read(self.inotify_fd.?, &event_buf) catch unreachable;
var ptr: [*]u8 = &event_buf;
const end_ptr = ptr + bytes_read;
@ -162,11 +155,11 @@ pub fn startWatch(self: *Self) void {
}
}
fn acceptSocket(name: [:0]const u8, socket: std.os.socket_t) std.os.socket_t {
fn acceptSocket(name: [:0]const u8, socket: std.posix.socket_t) std.posix.socket_t {
var sockaddr = std.net.Address.initUnix(name) catch @panic("could not get sockaddr");
var sockaddr_len: std.os.socklen_t = sockaddr.getOsSockLen();
var sockaddr_len: std.posix.socklen_t = sockaddr.getOsSockLen();
log.debug("tid={d} accepting on socket fd {d}", .{ std.Thread.getCurrentId(), socket });
return std.os.accept(
return std.posix.accept(
socket,
&sockaddr.any,
&sockaddr_len,
@ -266,16 +259,16 @@ test "nameMatch" {
/// adds a file to watch. The return will be a handle that will be returned
/// in the fileChanged event triffered from startWatch
pub fn addFileWatch(self: *Self, path: *[:0]const u8) !usize {
self.inotify_fd = self.inotify_fd orelse try std.os.inotify_init1(std.os.linux.IN.NONBLOCK);
self.inotify_fd = self.inotify_fd orelse try std.posix.inotify_init1(std.os.linux.IN.NONBLOCK);
errdefer {
std.os.close(self.inotify_fd.?);
std.posix.close(self.inotify_fd.?);
self.inotify_fd = null;
}
// zig build modification pattern: open 20, close_nowrite 10, MOVED_TO (on the directory), attrib 4
// unix cp: OPEN, MODIFY, CLOSE_WRITE, ATTRIB
// unix mv: MOVED_TO (on the directory)
self.wds[self.nfds_t] = .{
.wd = try std.os.inotify_add_watchZ(
.wd = try std.posix.inotify_add_watchZ(
self.inotify_fd.?,
path.*,
std.os.linux.IN.CLOSE_WRITE,
@ -302,7 +295,7 @@ fn addDirWatch(self: *Self, path: *[]const u8) !void {
return; // We are already watching this directory
// We do not have a directory watch
self.dir_wds[self.dir_nfds_t] = .{
.wd = try std.os.inotify_add_watch(self.inotify_fd.?, dirname, std.os.linux.IN.MOVED_TO),
.wd = try std.posix.inotify_add_watch(self.inotify_fd.?, dirname, std.os.linux.IN.MOVED_TO),
.path = path, // we store path rather than directory because doing this without an allocator is...tough
};
self.dir_nfds_t += 1;
@ -342,10 +335,10 @@ fn addControlSocket(self: *Self, path: [:0]const u8) !void {
//
// This function theoretically should work without requiring linux...except this inotify call,
// which is completely linux specific
self.inotify_fd = self.inotify_fd orelse try std.os.inotify_init1(std.os.linux.IN.NONBLOCK);
self.inotify_fd = self.inotify_fd orelse try std.posix.inotify_init1(std.os.linux.IN.NONBLOCK);
log.debug("Established inotify file descriptor {d}", .{self.inotify_fd.?});
errdefer {
std.os.close(self.inotify_fd.?);
std.posix.close(self.inotify_fd.?);
self.inotify_fd = null;
}
// this should work on all systems theoretically, but I believe would work only
@ -369,18 +362,18 @@ fn addControlSocket(self: *Self, path: [:0]const u8) !void {
// 6. std.os.close: close the fd
//
// On end of use, we need to std.os.closeSocket()
const sock = try std.os.socket(
const sock = try std.posix.socket(
std.os.linux.AF.LOCAL,
std.os.linux.SOCK.STREAM | std.os.SOCK.CLOEXEC,
std.os.linux.SOCK.STREAM | std.posix.SOCK.CLOEXEC,
0,
);
errdefer std.os.closeSocket(sock);
errdefer std.posix.close(sock);
const sockaddr = try std.net.Address.initUnix(path);
log.debug("binding to path: {s}", .{path});
try std.os.bind(sock, &sockaddr.any, sockaddr.getOsSockLen());
try std.os.listen(sock, 10);
try std.posix.bind(sock, &sockaddr.any, sockaddr.getOsSockLen());
try std.posix.listen(sock, 10);
self.control_socket = sock;
log.debug("added control socket with fd={d}", .{sock});
}

View File

@ -65,17 +65,17 @@ pub fn parse(self: Self, reader: anytype) !ParsedConfig {
defer self.allocator.free(line);
const nocomments = std.mem.trim(u8, @constCast(&std.mem.split(u8, line, "#")).first(), ws);
var data_iterator = std.mem.split(u8, nocomments, "=");
var key = std.mem.trim(u8, data_iterator.first(), ws); // first never fails
const key = std.mem.trim(u8, data_iterator.first(), ws); // first never fails
if (key.len == 0) continue;
var value = std.mem.trim(u8, data_iterator.next() orelse return error.NoValueForKey, ws);
const value = std.mem.trim(u8, data_iterator.next() orelse return error.NoValueForKey, ws);
// keys should be putNoClobber, but values can be put.
// Because we have to dup the memory here though, we want to
// manage duplicate values seperately
var dup_key = try self.allocator.dupeZ(u8, key);
var dup_value = try self.allocator.dupeZ(u8, value);
const dup_key = try self.allocator.dupeZ(u8, key);
const dup_value = try self.allocator.dupeZ(u8, value);
try rc.key_value_map.putNoClobber(dup_key, dup_value);
if (!rc.value_key_map.contains(value)) {
var keys = try self.allocator.create(std.ArrayList([:0]u8));
const keys = try self.allocator.create(std.ArrayList([:0]u8));
keys.* = std.ArrayList([:0]u8).init(self.allocator);
try rc.value_key_map.put(dup_value, keys);
}
@ -85,7 +85,7 @@ pub fn parse(self: Self, reader: anytype) !ParsedConfig {
}
test "gets config from a stream" {
var allocator = std.testing.allocator;
const allocator = std.testing.allocator;
var stream = std.io.fixedBufferStream(
\\# This is a simple "path prefix" = dynamic library path mapping
\\ # no reordering will be done, so you must do things most -> least specific

View File

@ -44,7 +44,7 @@ pub const ZigRequest = struct {
target: []const u8,
method: [:0]u8,
content: []u8,
headers: std.http.Headers,
headers: []std.http.Header,
};
pub const ZigHeader = struct {
@ -56,7 +56,7 @@ pub const ZigResponse = struct {
status: std.http.Status = .ok,
reason: ?[]const u8 = null,
body: *std.ArrayList(u8),
headers: std.http.Headers,
headers: []std.http.Header,
request: ZigRequest,
prepend: std.ArrayList(u8),
@ -94,9 +94,9 @@ pub fn zigInit(parent_allocator: *anyopaque) callconv(.C) void {
/// Converts a StringHashMap to the structure necessary for passing through the
/// C boundary. This will be called automatically for you via the handleRequest function
/// and is also used by the main processing loop to coerce request headers
fn toHeaders(alloc: std.mem.Allocator, headers: std.http.Headers) ![*]Header {
var header_array = try std.ArrayList(Header).initCapacity(alloc, headers.list.items.len);
for (headers.list.items) |*field| {
fn toHeaders(alloc: std.mem.Allocator, headers: []const std.http.Header) ![*]Header {
var header_array = try std.ArrayList(Header).initCapacity(alloc, headers.len);
for (headers) |*field| {
header_array.appendAssumeCapacity(.{
.name_ptr = @constCast(field.name.ptr),
.name_len = field.name.len,
@ -114,7 +114,7 @@ fn toHeaders(alloc: std.mem.Allocator, headers: std.http.Headers) ![*]Header {
pub fn handleRequest(request: *Request, zigRequestHandler: ZigRequestHandler) ?*Response {
// TODO: implement another library in C or Rust or something to show
// that anything using a C ABI can be successful
var alloc = if (allocator) |a| a.* else {
const alloc = if (allocator) |a| a.* else {
log.err("zigInit not called prior to handle_request. This is a coding error", .{});
return null;
};
@ -123,12 +123,12 @@ pub fn handleRequest(request: *Request, zigRequestHandler: ZigRequestHandler) ?*
var response = std.ArrayList(u8).init(alloc);
// setup headers
var request_headers = std.http.Headers.init(alloc);
var request_headers = std.ArrayList(std.http.Header).init(alloc);
for (0..request.headers_len) |i|
request_headers.append(
request.headers[i].name_ptr[0..request.headers[i].name_len],
request.headers[i].value_ptr[0..request.headers[i].value_len],
) catch |e| {
request_headers.append(.{
.name = request.headers[i].name_ptr[0..request.headers[i].name_len],
.value = request.headers[i].value_ptr[0..request.headers[i].value_len],
}) catch |e| {
log.err("Unexpected error processing request: {any}", .{e});
if (@errorReturnTrace()) |trace| {
std.debug.dumpStackTrace(trace.*);
@ -136,16 +136,22 @@ pub fn handleRequest(request: *Request, zigRequestHandler: ZigRequestHandler) ?*
return null;
};
var prepend = std.ArrayList(u8).init(alloc);
const prepend = std.ArrayList(u8).init(alloc);
var zig_response = ZigResponse{
.headers = .{ .allocator = alloc },
.headers = &.{},
.body = &response,
.prepend = prepend,
.request = .{
.content = request.content[0..request.content_len],
.target = request.target[0..request.target_len],
.method = request.method[0..request.method_len :0],
.headers = request_headers,
.headers = request_headers.toOwnedSlice() catch |e| {
log.err("Unexpected error processing request: {any}", .{e});
if (@errorReturnTrace()) |trace| {
std.debug.dumpStackTrace(trace.*);
}
return null;
},
},
};
zigRequestHandler(
@ -184,7 +190,7 @@ fn buildResponse(alloc: std.mem.Allocator, zig_response: *ZigResponse) ?*Respons
}
return null;
};
rc.headers_len = zig_response.headers.list.items.len;
rc.headers_len = zig_response.headers.len;
rc.status = if (zig_response.status == .ok) 0 else @intFromEnum(zig_response.status);
rc.reason_len = 0;
if (zig_response.reason) |*r| {

View File

@ -24,7 +24,7 @@ const log = std.log.scoped(.@"main-lib");
// }
//
comptime {
@export(interface.zigInit, .{ .name = "zigInit", .linkage = .Strong });
@export(interface.zigInit, .{ .name = "zigInit", .linkage = .strong });
}
/// handle_request will be called on a single request, but due to the preservation
@ -58,32 +58,46 @@ export fn handle_request(request: *interface.Request) callconv(.C) ?*interface.R
// handleRequest function here is the last line of boilerplate and the
// entry to a request
fn handleRequest(allocator: std.mem.Allocator, response: *interface.ZigResponse) !void {
_ = allocator;
// setup
var response_writer = response.body.writer();
// real work
if (response.request.headers.getFirstValue("host")) |host| {
if (std.mem.startsWith(u8, host, "iam")) {
try response_writer.print("iam response", .{});
for (response.request.headers) |h| {
if (std.ascii.eqlIgnoreCase(h.name, "host")) {
if (std.mem.startsWith(u8, h.value, "iam")) {
try response_writer.print("iam response", .{});
return;
}
break;
}
}
for (response.request.headers) |h| {
if (std.ascii.eqlIgnoreCase(h.name, "x-slow")) {
std.time.sleep(std.time.ns_per_ms * (std.fmt.parseInt(usize, h.value, 10) catch 1000));
try response_writer.print("i am slow\n\n", .{});
return;
}
}
if (response.request.headers.getFirstValue("x-slow")) |ms| {
std.time.sleep(std.time.ns_per_ms * (std.fmt.parseInt(usize, ms, 10) catch 1000));
try response_writer.print("i am slow\n\n", .{});
return;
for (response.request.headers) |h| {
if (std.ascii.eqlIgnoreCase(h.name, "x-log-this")) {
try response.writeAll(h.value);
break;
}
}
if (response.request.headers.getFirstValue("x-log-this")) |l| {
try response.writeAll(l);
for (response.request.headers) |h| {
if (std.ascii.eqlIgnoreCase(h.name, "x-status")) {
response.status = @enumFromInt(std.fmt.parseInt(u10, h.value, 10) catch 500);
break;
}
}
if (response.request.headers.getFirstValue("x-status")) |s| {
response.status = @enumFromInt(std.fmt.parseInt(u10, s, 10) catch 500);
for (response.request.headers) |h| {
if (std.ascii.eqlIgnoreCase(h.name, "x-throw"))
return error.Thrown;
}
if (response.request.headers.getFirstValue("x-throw")) |_| {
return error.Thrown;
}
try response_writer.print(" {d}", .{response.request.headers.list.items.len});
try response.headers.append("X-custom-foo", "bar");
try response_writer.print(" {d}", .{response.request.headers.len});
var headers = std.ArrayList(std.http.Header).init(allocator);
try headers.appendSlice(response.headers);
try headers.append(.{ .name = "X-custom-foo", .value = "bar" });
response.headers = try headers.toOwnedSlice();
}
test "handle_request" {
@ -91,7 +105,7 @@ test "handle_request" {
defer arena.deinit();
var aa = arena.allocator();
interface.zigInit(&aa);
var headers: []interface.Header = @constCast(&[_]interface.Header{.{
const headers: []interface.Header = @constCast(&[_]interface.Header{.{
.name_ptr = @ptrCast(@constCast("GET".ptr)),
.name_len = 3,
.value_ptr = @ptrCast(@constCast("GET".ptr)),
@ -119,7 +133,7 @@ test "lib can write data directly" {
defer arena.deinit();
var aa = arena.allocator();
interface.zigInit(&aa);
var headers: []interface.Header = @constCast(&[_]interface.Header{.{
const headers: []interface.Header = @constCast(&[_]interface.Header{.{
.name_ptr = @ptrCast(@constCast("x-log-this".ptr)),
.name_len = "x-log-this".len,
.value_ptr = @ptrCast(@constCast("I am a teapot".ptr)),
@ -146,7 +160,7 @@ test "lib can write data directly and still throw" {
defer arena.deinit();
var aa = arena.allocator();
interface.zigInit(&aa);
var headers: []interface.Header = @constCast(&[_]interface.Header{ .{
const headers: []interface.Header = @constCast(&[_]interface.Header{ .{
.name_ptr = @ptrCast(@constCast("x-log-this".ptr)),
.name_len = "x-log-this".len,
.value_ptr = @ptrCast(@constCast("I am a teapot".ptr)),
@ -176,7 +190,7 @@ test "lib can set status, update data directly and still throw" {
defer arena.deinit();
var aa = arena.allocator();
interface.zigInit(&aa);
var headers: []interface.Header = @constCast(&[_]interface.Header{ .{
const headers: []interface.Header = @constCast(&[_]interface.Header{ .{
.name_ptr = @ptrCast(@constCast("x-log-this".ptr)),
.name_len = "x-log-this".len,
.value_ptr = @ptrCast(@constCast("I am a teapot".ptr)),

View File

@ -7,16 +7,17 @@ const config = @import("config.zig");
const log = std.log.scoped(.main);
// logging options
pub const std_options = struct {
pub const log_level = switch (builtin.mode) {
.Debug => .debug,
.ReleaseSafe => .info,
.ReleaseFast, .ReleaseSmall => .err,
};
pub const std_options = .{
.log_level = if (!builtin.is_test)
switch (builtin.mode) {
.Debug => .debug,
.ReleaseSafe => .info,
.ReleaseFast, .ReleaseSmall => .err,
},
pub const log_scope_levels = &[_]std.log.ScopeLevel{
.log_scope_levels = &[_]std.log.ScopeLevel{
.{ .scope = .watch, .level = .info },
};
},
};
const serveFn = *const fn (*interface.Request) ?*interface.Response;
@ -32,8 +33,9 @@ var watcher_thread: ?std.Thread = null;
var timer: ?std.time.Timer = null;
const FullReturn = struct {
response: []u8,
executor: *Executor,
content: []const u8,
executor: *Executor = undefined,
respond_options: std.http.Server.Request.RespondOptions,
};
// Executor structure, including functions that were found in the library
@ -52,9 +54,9 @@ const Executor = struct {
// fields used for internal accounting
watch: ?usize = null,
drain_in_progress: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
load_in_progress: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
requests_in_flight: std.atomic.Atomic(usize) = std.atomic.Atomic(usize).init(0),
drain_in_progress: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
load_in_progress: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
requests_in_flight: std.atomic.Value(usize) = std.atomic.Value(usize).init(0),
};
const SERVE_FN_NAME = "handle_request";
@ -67,68 +69,68 @@ var parsed_config: config.ParsedConfig = undefined;
/// Serves a single request. Finds executor, marshalls request data for the C
/// interface, calls the executor and marshalls data back
fn serve(allocator: *std.mem.Allocator, response: *std.http.Server.Response) !*FullReturn {
const executor = try getExecutor(response.request.target, response.request.headers);
errdefer _ = executor.requests_in_flight.fetchSub(1, .Monotonic);
fn serve(allocator: *std.mem.Allocator, request: *std.http.Server.Request) !*FullReturn {
var head_it = request.iterateHeaders();
const headers = try toHeaders(allocator.*, &head_it);
const executor = try getExecutor(request.head.target, headers);
errdefer _ = executor.requests_in_flight.fetchSub(1, .monotonic);
if (executor.zigInitFn) |f|
f(allocator);
// Call external library
const method_tag = @tagName(response.request.method);
const headers = try toHeaders(allocator.*, response.request.headers);
var request_content: []u8 = &[_]u8{};
if (response.request.content_length) |l| {
request_content = try response.reader().readAllAlloc(allocator.*, @as(usize, l));
}
const method_tag = @tagName(request.head.method);
const request_content: []u8 = try (try request.reader()).readAllAlloc(allocator.*, std.math.maxInt(usize));
log.debug("{d} bytes read from request", .{request_content.len});
var request = interface.Request{
.target = response.request.target.ptr,
.target_len = response.request.target.len,
var i_request = interface.Request{
.target = request.head.target.ptr,
.target_len = request.head.target.len,
.method = @constCast(method_tag[0..].ptr),
.method_len = method_tag.len,
.headers = headers,
.headers_len = response.request.headers.list.items.len,
.headers = headers.ptr,
.headers_len = headers.len,
.content = request_content.ptr,
.content_len = request_content.len,
};
var serve_result = executor.serveFn.?(&request).?; // ok for this pointer deref to fail
log.debug("target: {s}", .{response.request.target});
var serve_result = executor.serveFn.?(&i_request).?; // ok for this pointer deref to fail
log.debug("target: {s}", .{request.head.target});
log.debug("response ptr: {*}", .{serve_result.ptr});
var slice: []u8 = serve_result.ptr[0..serve_result.len];
log.debug("response body: {s}", .{slice});
var response = try allocator.create(FullReturn); // allocator is arena here
response.content = serve_result.ptr[0..serve_result.len];
response.executor = executor;
response.respond_options = .{};
log.debug("response body: {s}", .{response.content});
if (serve_result.status != 0) {
response.status = @enumFromInt(serve_result.status);
response.respond_options.status = @enumFromInt(serve_result.status);
if (serve_result.reason_len > 0) {
response.reason = serve_result.reason_ptr[0..serve_result.reason_len];
response.respond_options.reason = serve_result.reason_ptr[0..serve_result.reason_len];
}
}
// Deal with results
// Deal with response headers
var response_headers = try std.ArrayList(std.http.Header).initCapacity(allocator.*, serve_result.headers_len + 1);
defer response_headers.deinit();
var content_type_added = false;
for (0..serve_result.headers_len) |inx| {
const head = serve_result.headers[inx];
try response.headers.append(
head.name_ptr[0..head.name_len],
head.value_ptr[0..head.value_len],
);
response_headers.appendAssumeCapacity(.{
.name = head.name_ptr[0..head.name_len],
.value = head.value_ptr[0..head.value_len],
});
// headers are case insensitive
content_type_added = std.ascii.eqlIgnoreCase(head.name_ptr[0..head.name_len], "content-type");
}
if (!content_type_added)
try response.headers.append("content-type", "text/plain");
// target is path
var rc = try allocator.create(FullReturn);
rc.executor = executor;
rc.response = slice;
return rc;
response_headers.appendAssumeCapacity(.{ .name = "content-type", .value = "text/plain" });
response.respond_options.extra_headers = try response_headers.toOwnedSlice();
return response;
}
/// Gets and executor based on request data
fn getExecutor(requested_path: []const u8, headers: std.http.Headers) !*Executor {
fn getExecutor(requested_path: []const u8, headers: []interface.Header) !*Executor {
var executor = blk: {
for (executors) |*exec| {
if (executorIsMatch(exec.match_data, requested_path, headers)) {
@ -138,15 +140,15 @@ fn getExecutor(requested_path: []const u8, headers: std.http.Headers) !*Executor
log.err("Could not find executor for target path '{s}'", .{requested_path});
return error.NoApplicableExecutor;
};
while (executor.drain_in_progress.load(.Acquire)) {
while (executor.drain_in_progress.load(.acquire)) {
// we need to stand down and stand by
std.atomic.spinLoopHint(); // Let CPU know we're just hanging out
std.time.sleep(1 * std.time.ns_per_ms / 2);
}
// Tell everyone we're about to use this bad boy
// While requests_in_flight is >= 0, nobody should unload the library
_ = executor.requests_in_flight.fetchAdd(1, .Acquire);
errdefer _ = executor.requests_in_flight.fetchSub(1, .Release);
_ = executor.requests_in_flight.fetchAdd(1, .acquire);
errdefer _ = executor.requests_in_flight.fetchSub(1, .release);
if (executor.serveFn != null) return executor;
@ -157,13 +159,13 @@ fn getExecutor(requested_path: []const u8, headers: std.http.Headers) !*Executor
// If the library is being reloaded and a bunch of requests come in,
// we could have multiple threads racing to load
// NOTE: I am not confident of the memory ordering here on tryCompareAndSwap
while (executor.load_in_progress.tryCompareAndSwap(false, true, .Acquire, .Acquire)) |_| {
while (@cmpxchgWeak(bool, &executor.load_in_progress.raw, false, true, .acquire, .acquire)) |_| {
// we need to stand down and stand by
std.atomic.spinLoopHint(); // Let CPU know we're just hanging out
std.time.sleep(1 * std.time.ns_per_ms / 2);
}
// we have the conch...lock others out
defer executor.load_in_progress.store(false, .Release);
defer executor.load_in_progress.store(false, .release);
if (executor.library) |l|
break :blk l; // someone beat us to the race..our defer above will take care of unlocking
@ -185,7 +187,7 @@ fn getExecutor(requested_path: []const u8, headers: std.http.Headers) !*Executor
return executor;
}
fn executorIsMatch(match_data: []const u8, requested_path: []const u8, headers: std.http.Headers) bool {
fn executorIsMatch(match_data: []const u8, requested_path: []const u8, headers: []interface.Header) bool {
if (!std.mem.containsAtLeast(u8, match_data, 1, ":")) {
// match_data does not have a ':'. This means this is a straight path, without
// any header requirement. We can simply return a match prefix on the
@ -196,7 +198,13 @@ fn executorIsMatch(match_data: []const u8, requested_path: []const u8, headers:
}
const colon = std.mem.indexOf(u8, match_data, ":").?;
const header_needle = match_data[0..colon];
const header_inx = headers.firstIndexOf(header_needle) orelse return false;
const header_inx = blk: {
for (headers, 0..) |h, i| {
if (std.ascii.eqlIgnoreCase(h.name_ptr[0..h.name_len], header_needle))
break :blk i;
}
return false;
};
// Apparently std.mem.split will return an empty first when the haystack starts
// with the delimiter
var split = std.mem.split(u8, std.mem.trim(u8, match_data[colon + 1 ..], "\t "), " ");
@ -211,7 +219,7 @@ fn executorIsMatch(match_data: []const u8, requested_path: []const u8, headers:
// match_data includes some sort of header match as well. We assume the
// header match is a full match on the key (handled above)
// but a prefix match on the value
const request_header_value = headers.list.items[header_inx].value;
const request_header_value = headers[header_inx].value_ptr[0..headers[header_inx].value_len];
// (shoud this be case insensitive?)
if (!std.mem.startsWith(u8, request_header_value, header_value_needle)) return false;
// header value matches...return the path prefix match
@ -243,9 +251,9 @@ fn executorChanged(watch: usize) void {
if (executor.watch) |w| {
if (w == watch) {
if (executor.library) |l| {
executor.drain_in_progress.store(true, .Release);
defer executor.drain_in_progress.store(false, .Release);
while (executor.requests_in_flight.load(.Acquire) > 0) {
executor.drain_in_progress.store(true, .release);
defer executor.drain_in_progress.store(false, .release);
while (executor.requests_in_flight.load(.acquire) > 0) {
std.atomic.spinLoopHint();
std.time.sleep(1 * std.time.ns_per_ms / 2);
}
@ -257,7 +265,7 @@ fn executorChanged(watch: usize) void {
log.warn("could not reload! error opening library", .{});
return;
};
var symbol = std.c.dlsym(executor.library.?, SERVE_FN_NAME);
const symbol = std.c.dlsym(executor.library.?, SERVE_FN_NAME);
if (symbol == null) {
log.warn("could not reload! error finding symbol", .{});
if (std.c.dlclose(executor.library.?) != 0)
@ -284,11 +292,11 @@ fn dlopen(path: [:0]const u8) !*anyopaque {
/// exit from the application as the main function has an infinite loop
fn exitApplication(
_: i32,
_: *const std.os.siginfo_t,
_: *const std.posix.siginfo_t,
_: ?*const anyopaque,
) callconv(.C) noreturn {
exitApp(0);
std.os.exit(0);
std.posix.exit(0);
}
/// exitApp handles deinitialization for the application and any reporting
@ -302,7 +310,7 @@ fn exitApp(exitcode: u8) void {
watcher.stopWatch() catch @panic("could not stop watcher");
std.io.getStdOut().writer().print("exiting application\n", .{}) catch {};
watcher.deinit();
std.os.exit(exitcode);
std.posix.exit(exitcode);
// joining threads will hang...we're ultimately in a signal handler.
// But everything is shut down cleanly now, so I don't think it hurts to
// just kill it all (NOTE: from a practical perspective, we do not seem
@ -316,7 +324,7 @@ fn exitApp(exitcode: u8) void {
/// has been implemented. Operates off the main thread
fn reloadConfig(
_: i32,
_: *const std.os.siginfo_t,
_: *const std.posix.siginfo_t,
_: ?*const anyopaque,
) callconv(.C) void {
// TODO: Gracefully drain in flight requests and hold a lock here while
@ -334,36 +342,36 @@ fn reloadConfig(
/// Installs all signal handlers for shutdown and configuration reload
fn installSignalHandler() !void {
var act = std.os.Sigaction{
var act = std.posix.Sigaction{
.handler = .{ .sigaction = exitApplication },
.mask = std.os.empty_sigset,
.flags = (std.os.SA.SIGINFO | std.os.SA.RESTART | std.os.SA.RESETHAND),
.mask = std.posix.empty_sigset,
.flags = (std.posix.SA.SIGINFO | std.posix.SA.RESTART | std.posix.SA.RESETHAND),
};
try std.os.sigaction(std.os.SIG.INT, &act, null);
try std.os.sigaction(std.os.SIG.TERM, &act, null);
try std.posix.sigaction(std.posix.SIG.INT, &act, null);
try std.posix.sigaction(std.posix.SIG.TERM, &act, null);
var hup_act = std.os.Sigaction{
var hup_act = std.posix.Sigaction{
.handler = .{ .sigaction = reloadConfig },
.mask = std.os.empty_sigset,
.flags = (std.os.SA.SIGINFO | std.os.SA.RESTART | std.os.SA.RESETHAND),
.mask = std.posix.empty_sigset,
.flags = (std.posix.SA.SIGINFO | std.posix.SA.RESTART | std.posix.SA.RESETHAND),
};
try std.os.sigaction(std.os.SIG.HUP, &hup_act, null);
try std.posix.sigaction(std.posix.SIG.HUP, &hup_act, null);
}
pub fn main() !void {
// We are linked to libc, and primarily using the arena allocator
// raw allocator recommended for use in arenas
var raw_allocator = std.heap.raw_c_allocator;
const raw_allocator = std.heap.raw_c_allocator;
// Our child process will also need an allocator, and is using the
// same pattern, so we will hand the child a raw allocator as well
var child_allocator = std.heap.raw_c_allocator;
const child_allocator = std.heap.raw_c_allocator;
// Lastly, we need some of our own operations
var arena = std.heap.ArenaAllocator.init(raw_allocator);
defer arena.deinit();
var parent_allocator = arena.allocator();
const parent_allocator = arena.allocator();
var al = std.ArrayList([]const u8).init(parent_allocator);
defer al.deinit();
@ -408,25 +416,22 @@ fn childMain(allocator: std.mem.Allocator) !void {
watcher = Watch.init(executorChanged);
watcher_thread = try std.Thread.spawn(.{}, Watch.startWatch, .{&watcher});
var server = std.http.Server.init(allocator, .{ .reuse_address = true });
defer server.deinit();
const address = try std.net.Address.parseIp(
"0.0.0.0",
if (std.os.getenv("PORT")) |p| try std.fmt.parseInt(u16, p, 10) else PORT,
if (std.posix.getenv("PORT")) |p| try std.fmt.parseInt(u16, p, 10) else PORT,
);
try server.listen(address);
const server_port = server.socket.listen_address.in.getPort();
var server = try address.listen(.{ .reuse_address = true });
const server_port = server.listen_address.in.getPort();
log.info("listening on port: {d}", .{server_port});
if (builtin.os.tag == .linux)
log.info("pid: {d}", .{std.os.linux.getpid()});
try installSignalHandler();
response_preallocation_kb = if (std.os.getenv("RESPONSE_PREALLOCATION_KB")) |kb|
response_preallocation_kb = if (std.posix.getenv("RESPONSE_PREALLOCATION_KB")) |kb|
try std.fmt.parseInt(usize, kb, 10)
else
response_preallocation_kb;
var server_thread_count = if (std.os.getenv("SERVER_THREAD_COUNT")) |count|
const server_thread_count = if (std.posix.getenv("SERVER_THREAD_COUNT")) |count|
try std.fmt.parseInt(usize, count, 10)
else switch (builtin.mode) {
.Debug => @min(4, try std.Thread.getCpuCount()),
@ -451,7 +456,7 @@ fn childMain(allocator: std.mem.Allocator) !void {
for (server_threads.items) |thread| thread.join();
}
fn threadMain(allocator: std.mem.Allocator, server: *std.http.Server, thread_number: usize) !void {
fn threadMain(allocator: std.mem.Allocator, server: *std.net.Server, thread_number: usize) !void {
// TODO: If we're in a thread pool we need to be careful with this...
const stdout_file = std.io.getStdOut().writer();
var bw = std.io.bufferedWriter(stdout_file);
@ -505,37 +510,42 @@ fn loadConfig(allocator: std.mem.Allocator) ![]Executor {
/// main loop calls processRequest, which is responsible for the interface
/// with logs, connection accounting, etc. The work dealing with the request
/// itself is delegated to the serve function to work with the executor
fn processRequest(allocator: *std.mem.Allocator, server: *std.http.Server, writer: anytype) !void {
fn processRequest(allocator: *std.mem.Allocator, server: *std.net.Server, writer: anytype) !void {
if (timer == null) timer = try std.time.Timer.start();
var tm = timer.?;
var res = try server.accept(.{ .allocator = allocator.* });
defer res.deinit();
defer _ = res.reset();
try res.wait(); // wait for client to send a complete request head
var connection = try server.accept();
defer connection.stream.close();
var read_buffer: [1024 * 16]u8 = undefined; // TODO: fix this
var server_connection = std.http.Server.init(connection, &read_buffer);
var req = try server_connection.receiveHead();
// TODO: should we check for server_connection.state == .ready?
// I believe it's fair to start our timer after this is done
tm.reset();
const err_return = FullReturn{
.content = "Internal Server Error\n",
.respond_options = .{ .status = .internal_server_error },
};
// This is an nginx log:
// git.lerch.org 50.39.111.175 - - [16/May/2023:02:56:31 +0000] "POST /api/actions/runner.v1.RunnerService/FetchTask HTTP/2.0" 200 0 "-" "connect-go/1.2.0-dev (go1.20.1)" "172.20.0.5:3000"
// TODO: replicate this
try writer.print("{} - - \"{s} {s} {s}\"", .{
res.address,
@tagName(res.request.method),
res.request.target,
@tagName(res.request.version),
server_connection.connection.address,
@tagName(req.head.method),
req.head.target,
@tagName(req.head.version),
});
const errstr = "Internal Server Error\n";
var errbuf: [errstr.len]u8 = undefined;
var response_bytes = try std.fmt.bufPrint(&errbuf, errstr, .{});
var full_response = serve(allocator, &res) catch |e| brk: {
res.status = .internal_server_error;
// TODO: more about this particular request
var caught_err = false;
const full_response = serve(allocator, &req) catch |e| brk: {
log.err("Unexpected error from executor processing request: {any}", .{e});
// TODO: more about this particular request
if (@errorReturnTrace()) |trace| {
std.debug.dumpStackTrace(trace.*);
}
break :brk null;
caught_err = true;
break :brk &err_return;
};
defer {
// The call above converts uncaught errors to a null
@ -545,30 +555,29 @@ fn processRequest(allocator: *std.mem.Allocator, server: *std.http.Server, write
// This leaves this defer block as the only place we can/should decrement
// under normal conditions
if (full_response) |f| {
if (f.executor.requestDeinitFn) |d| d();
_ = f.executor.requests_in_flight.fetchSub(1, .Release);
if (!caught_err) {
if (full_response.executor.requestDeinitFn) |d| d();
_ = full_response.executor.requests_in_flight.fetchSub(1, .release);
}
}
if (full_response) |f|
response_bytes = f.response;
res.transfer_encoding = .{ .content_length = response_bytes.len };
try res.headers.append("connection", "close");
try writer.print(" {d} ttfb {d:.3}ms", .{ @intFromEnum(res.status), @as(f64, @floatFromInt(tm.read())) / std.time.ns_per_ms });
if (builtin.is_test) writeToTestBuffers(response_bytes, &res);
try res.do();
_ = try res.writer().writeAll(response_bytes);
try res.finish();
try writer.print(" {d} ttfb {d:.3}ms", .{
@intFromEnum(full_response.respond_options.status),
@as(f64, @floatFromInt(tm.read())) / std.time.ns_per_ms,
});
if (builtin.is_test) writeToTestBuffers(full_response.content);
req.respond(full_response.content, full_response.respond_options) catch |e| {
log.err("Unexpected error responding to client: {any}", .{e});
};
try writer.print(" {d} ttlb {d:.3}ms", .{
response_bytes.len,
full_response.content.len,
@as(f64, @floatFromInt(tm.read())) / std.time.ns_per_ms,
});
}
fn toHeaders(allocator: std.mem.Allocator, headers: std.http.Headers) ![*]interface.Header {
var header_array = try std.ArrayList(interface.Header).initCapacity(allocator, headers.list.items.len);
for (headers.list.items) |kv| {
header_array.appendAssumeCapacity(.{
fn toHeaders(allocator: std.mem.Allocator, headers: *std.http.HeaderIterator) ![]interface.Header {
var header_array = std.ArrayList(interface.Header).init(allocator);
while (headers.next()) |kv| {
try header_array.append(.{
.name_ptr = @constCast(kv.name).ptr,
.name_len = kv.name.len,
@ -576,7 +585,7 @@ fn toHeaders(allocator: std.mem.Allocator, headers: std.http.Headers) ![*]interf
.value_len = kv.value.len,
});
}
return header_array.items.ptr;
return try header_array.toOwnedSlice();
}
/// Allocates at least preallocated_kb kilobytes of ram for usage. Some overhead
@ -592,12 +601,11 @@ fn preWarmArena(aa: std.mem.Allocator, arena: *std.heap.ArenaAllocator, prealloc
);
if (!arena.reset(.{ .retain_with_limit = (arena.queryCapacity() + @as(usize, 1023)) / @as(usize, 1024) * 1024 }))
log.warn("arena reset failed, arena degraded", .{});
var bytes_allocated = arena.queryCapacity();
const bytes_allocated = arena.queryCapacity();
log.debug("preallocated {d} bytes", .{bytes_allocated});
return bytes_allocated;
}
fn writeToTestBuffers(response: []const u8, res: *std.http.Server.Response) void {
_ = res;
fn writeToTestBuffers(response: []const u8) void {
log.debug("writing to test buffers", .{});
// This performs core dump...because we're in a separate thread?
// @memset(test_resp_buf, 0);
@ -615,16 +623,13 @@ fn testRequest(request_bytes: []const u8) !void {
var arena = std.heap.ArenaAllocator.init(allocator);
defer arena.deinit();
var server = std.http.Server.init(allocator, .{ .reuse_address = true });
defer server.deinit();
const address = try std.net.Address.parseIp("127.0.0.1", 0);
try server.listen(address);
const server_port = server.socket.listen_address.in.getPort();
var http_server = try address.listen(.{ .reuse_address = true });
const server_port = http_server.listen_address.in.getPort();
var al = std.ArrayList(u8).init(allocator);
defer al.deinit();
var writer = al.writer();
const writer = al.writer();
var aa = arena.allocator();
var bytes_allocated: usize = 0;
// pre-warm
@ -632,7 +637,7 @@ fn testRequest(request_bytes: []const u8) !void {
const server_thread = try std.Thread.spawn(
.{},
processRequest,
.{ &aa, &server, writer },
.{ &aa, &http_server, writer },
);
const stream = try std.net.tcpConnectToHost(allocator, "127.0.0.1", server_port);
@ -670,14 +675,12 @@ test {
var test_resp_buf: [1024]u8 = undefined;
var test_resp_buf_len: usize = undefined;
test "root path get" {
std.testing.log_level = .debug;
log.debug("", .{});
try testGet("/");
try std.testing.expectEqual(@as(usize, 2), test_resp_buf_len);
try std.testing.expectEqualStrings(" 1", test_resp_buf[0..test_resp_buf_len]);
}
test "root path, alternative host get" {
std.testing.log_level = .debug;
log.debug("", .{});
try testHostGet("iam.aws.lerch.org", "/");
try std.testing.expectEqualStrings("iam response", test_resp_buf[0..test_resp_buf_len]);