Compare commits
2 Commits
bf09470abd
...
b187fa80eb
Author | SHA1 | Date | |
---|---|---|---|
b187fa80eb | |||
c06bda918b |
|
@ -4,7 +4,7 @@ on: [push]
|
|||
env:
|
||||
ACTIONS_RUNTIME_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
ACTIONS_RUNTIME_URL: https://git.lerch.org/api/actions_pipeline/
|
||||
ZIG_URL: https://ziglang.org/download/0.11.0/zig-linux-x86_64-0.11.0.tar.xz
|
||||
ZIG_URL: https://ziglang.org/download/0.12.0/zig-linux-x86_64-0.12.0.tar.xz
|
||||
BUILD_TARGET: x86_64-linux-gnu # Needs to be gnu since we're using dlopen
|
||||
BUILD_OPTIMIZATION: ReleaseSafe # Safety is usually a good thing
|
||||
jobs:
|
||||
|
|
|
@ -27,8 +27,7 @@ pub fn build(b: *std.Build) void {
|
|||
exe.linkLibC();
|
||||
|
||||
_ = b.addModule("flexilib-interface", .{
|
||||
.source_file = .{ .path = "src/interface.zig" },
|
||||
.dependencies = &[_]std.build.ModuleDependency{},
|
||||
.root_source_file = b.path("src/interface.zig"),
|
||||
});
|
||||
|
||||
const lib = b.addSharedLibrary(.{
|
||||
|
|
|
@ -12,13 +12,13 @@ const Wd = struct {
|
|||
};
|
||||
|
||||
fileChanged: *const fn (usize) void,
|
||||
inotify_fd: ?std.os.fd_t = null,
|
||||
inotify_fd: ?std.posix.fd_t = null,
|
||||
|
||||
nfds_t: usize = 0,
|
||||
wds: [MAX_FDS]Wd = [_]Wd{.{}} ** MAX_FDS,
|
||||
dir_nfds_t: usize = 0,
|
||||
dir_wds: [MAX_FDS]Wd = [_]Wd{.{}} ** MAX_FDS,
|
||||
control_socket: ?std.os.socket_t = null,
|
||||
control_socket: ?std.posix.socket_t = null,
|
||||
watch_started: bool = false,
|
||||
sock_name: [:0]const u8,
|
||||
|
||||
|
@ -35,21 +35,14 @@ pub fn deinit(self: *Self) void {
|
|||
if (self.control_socket) |s| {
|
||||
// Sockets...where Unix still pretends everything is a file, but it's not...
|
||||
log.debug("closing control socket", .{});
|
||||
std.os.closeSocket(s);
|
||||
std.posix.close(s);
|
||||
}
|
||||
if (self.inotify_fd) |fd| {
|
||||
for (0..self.nfds_t + self.dir_nfds_t) |inx| {
|
||||
const wd = if (inx < self.nfds_t) self.wds[inx].wd else self.dir_wds[inx - self.nfds_t].wd;
|
||||
switch (std.os.errno(std.os.linux.inotify_rm_watch(fd, wd))) {
|
||||
.SUCCESS => {},
|
||||
.BADF => unreachable,
|
||||
// NOTE: Getting EINVAL, but the call looks valid to me?
|
||||
// ...and wait...not all the time?
|
||||
.INVAL => log.err("error removing watch (EINVAL). OS claims fd ({d}) or wd ({d}) is invalid", .{ self.inotify_fd.?, wd }),
|
||||
else => unreachable,
|
||||
std.posix.inotify_rm_watch(fd, wd);
|
||||
}
|
||||
}
|
||||
std.os.close(fd);
|
||||
std.posix.close(fd);
|
||||
}
|
||||
const cwd = std.fs.cwd();
|
||||
cwd.deleteFileZ(self.sock_name) catch |e|
|
||||
|
@ -77,12 +70,12 @@ pub fn startWatch(self: *Self) void {
|
|||
while (true) {
|
||||
self.watch_started = true;
|
||||
|
||||
var fds = if (self.inotify_fd == null)
|
||||
@constCast(&[_]std.os.pollfd{.{ .fd = self.control_socket.?, .events = std.os.POLL.IN, .revents = undefined }})
|
||||
const fds = if (self.inotify_fd == null)
|
||||
@constCast(&[_]std.posix.pollfd{.{ .fd = self.control_socket.?, .events = std.posix.POLL.IN, .revents = undefined }})
|
||||
else
|
||||
@constCast(&[_]std.os.pollfd{
|
||||
.{ .fd = self.control_socket.?, .events = std.os.POLL.IN, .revents = undefined },
|
||||
.{ .fd = self.inotify_fd.?, .events = std.os.POLL.IN, .revents = undefined },
|
||||
@constCast(&[_]std.posix.pollfd{
|
||||
.{ .fd = self.control_socket.?, .events = std.posix.POLL.IN, .revents = undefined },
|
||||
.{ .fd = self.inotify_fd.?, .events = std.posix.POLL.IN, .revents = undefined },
|
||||
});
|
||||
|
||||
const control_fd_inx = 0;
|
||||
|
@ -97,11 +90,11 @@ pub fn startWatch(self: *Self) void {
|
|||
// std.fs.watch looks really good...but it requires event based I/O,
|
||||
// which is not yet ready to be (re)added.
|
||||
log.debug("tid={d} start poll with {d} fds", .{ std.Thread.getCurrentId(), fds.len });
|
||||
if ((std.os.poll(
|
||||
if ((std.posix.poll(
|
||||
fds,
|
||||
-1, // Infinite timeout
|
||||
) catch @panic("poll error")) > 0) {
|
||||
if (fds[control_fd_inx].revents & std.os.POLL.IN == std.os.POLL.IN) { // POLLIN means "there is data to read"
|
||||
if (fds[control_fd_inx].revents & std.posix.POLL.IN == std.posix.POLL.IN) { // POLLIN means "there is data to read"
|
||||
log.debug("tid={d} control event", .{std.Thread.getCurrentId()});
|
||||
// we only need one byte for what we're doing
|
||||
var control_buf: [1]u8 = undefined;
|
||||
|
@ -109,9 +102,9 @@ pub fn startWatch(self: *Self) void {
|
|||
// self.control_socket_accepted_fd = self.control_socket_accepted_fd orelse acceptSocket(self.control_socket.?);
|
||||
// const fd = self.control_socket_accepted_fd.?; // let's save some typing
|
||||
const fd = acceptSocket(self.sock_name, self.control_socket.?);
|
||||
defer std.os.close(fd);
|
||||
defer std.posix.close(fd);
|
||||
|
||||
var readcount = std.os.recv(fd, &control_buf, 0) catch unreachable;
|
||||
const readcount = std.posix.recv(fd, &control_buf, 0) catch unreachable;
|
||||
// var other_buf: [1]u8 = undefined;
|
||||
// if (std.os.recv(fd, &other_buf, 0) catch unreachable != 0)
|
||||
// @panic("socket contains more data than expected");
|
||||
|
@ -142,11 +135,11 @@ pub fn startWatch(self: *Self) void {
|
|||
|
||||
// fds[1] is inotify, so if we have data in that file descriptor,
|
||||
// we can force the data into an inotify_event structure and act on it
|
||||
if (self.inotify_fd != null and fds[inotify_fd_inx].revents & std.os.POLL.IN == std.os.POLL.IN) {
|
||||
if (self.inotify_fd != null and fds[inotify_fd_inx].revents & std.posix.POLL.IN == std.posix.POLL.IN) {
|
||||
log.debug("tid={d} inotify event", .{std.Thread.getCurrentId()});
|
||||
var event_buf: [4096]u8 align(@alignOf(std.os.linux.inotify_event)) = undefined;
|
||||
// "borrowed" from https://ziglang.org/documentation/master/std/src/std/fs/watch.zig.html#L588
|
||||
const bytes_read = std.os.read(self.inotify_fd.?, &event_buf) catch unreachable;
|
||||
const bytes_read = std.posix.read(self.inotify_fd.?, &event_buf) catch unreachable;
|
||||
|
||||
var ptr: [*]u8 = &event_buf;
|
||||
const end_ptr = ptr + bytes_read;
|
||||
|
@ -162,11 +155,11 @@ pub fn startWatch(self: *Self) void {
|
|||
}
|
||||
}
|
||||
|
||||
fn acceptSocket(name: [:0]const u8, socket: std.os.socket_t) std.os.socket_t {
|
||||
fn acceptSocket(name: [:0]const u8, socket: std.posix.socket_t) std.posix.socket_t {
|
||||
var sockaddr = std.net.Address.initUnix(name) catch @panic("could not get sockaddr");
|
||||
var sockaddr_len: std.os.socklen_t = sockaddr.getOsSockLen();
|
||||
var sockaddr_len: std.posix.socklen_t = sockaddr.getOsSockLen();
|
||||
log.debug("tid={d} accepting on socket fd {d}", .{ std.Thread.getCurrentId(), socket });
|
||||
return std.os.accept(
|
||||
return std.posix.accept(
|
||||
socket,
|
||||
&sockaddr.any,
|
||||
&sockaddr_len,
|
||||
|
@ -266,16 +259,16 @@ test "nameMatch" {
|
|||
/// adds a file to watch. The return will be a handle that will be returned
|
||||
/// in the fileChanged event triffered from startWatch
|
||||
pub fn addFileWatch(self: *Self, path: *[:0]const u8) !usize {
|
||||
self.inotify_fd = self.inotify_fd orelse try std.os.inotify_init1(std.os.linux.IN.NONBLOCK);
|
||||
self.inotify_fd = self.inotify_fd orelse try std.posix.inotify_init1(std.os.linux.IN.NONBLOCK);
|
||||
errdefer {
|
||||
std.os.close(self.inotify_fd.?);
|
||||
std.posix.close(self.inotify_fd.?);
|
||||
self.inotify_fd = null;
|
||||
}
|
||||
// zig build modification pattern: open 20, close_nowrite 10, MOVED_TO (on the directory), attrib 4
|
||||
// unix cp: OPEN, MODIFY, CLOSE_WRITE, ATTRIB
|
||||
// unix mv: MOVED_TO (on the directory)
|
||||
self.wds[self.nfds_t] = .{
|
||||
.wd = try std.os.inotify_add_watchZ(
|
||||
.wd = try std.posix.inotify_add_watchZ(
|
||||
self.inotify_fd.?,
|
||||
path.*,
|
||||
std.os.linux.IN.CLOSE_WRITE,
|
||||
|
@ -302,7 +295,7 @@ fn addDirWatch(self: *Self, path: *[]const u8) !void {
|
|||
return; // We are already watching this directory
|
||||
// We do not have a directory watch
|
||||
self.dir_wds[self.dir_nfds_t] = .{
|
||||
.wd = try std.os.inotify_add_watch(self.inotify_fd.?, dirname, std.os.linux.IN.MOVED_TO),
|
||||
.wd = try std.posix.inotify_add_watch(self.inotify_fd.?, dirname, std.os.linux.IN.MOVED_TO),
|
||||
.path = path, // we store path rather than directory because doing this without an allocator is...tough
|
||||
};
|
||||
self.dir_nfds_t += 1;
|
||||
|
@ -342,10 +335,10 @@ fn addControlSocket(self: *Self, path: [:0]const u8) !void {
|
|||
//
|
||||
// This function theoretically should work without requiring linux...except this inotify call,
|
||||
// which is completely linux specific
|
||||
self.inotify_fd = self.inotify_fd orelse try std.os.inotify_init1(std.os.linux.IN.NONBLOCK);
|
||||
self.inotify_fd = self.inotify_fd orelse try std.posix.inotify_init1(std.os.linux.IN.NONBLOCK);
|
||||
log.debug("Established inotify file descriptor {d}", .{self.inotify_fd.?});
|
||||
errdefer {
|
||||
std.os.close(self.inotify_fd.?);
|
||||
std.posix.close(self.inotify_fd.?);
|
||||
self.inotify_fd = null;
|
||||
}
|
||||
// this should work on all systems theoretically, but I believe would work only
|
||||
|
@ -369,18 +362,18 @@ fn addControlSocket(self: *Self, path: [:0]const u8) !void {
|
|||
// 6. std.os.close: close the fd
|
||||
//
|
||||
// On end of use, we need to std.os.closeSocket()
|
||||
const sock = try std.os.socket(
|
||||
const sock = try std.posix.socket(
|
||||
std.os.linux.AF.LOCAL,
|
||||
std.os.linux.SOCK.STREAM | std.os.SOCK.CLOEXEC,
|
||||
std.os.linux.SOCK.STREAM | std.posix.SOCK.CLOEXEC,
|
||||
0,
|
||||
);
|
||||
errdefer std.os.closeSocket(sock);
|
||||
errdefer std.posix.close(sock);
|
||||
|
||||
const sockaddr = try std.net.Address.initUnix(path);
|
||||
|
||||
log.debug("binding to path: {s}", .{path});
|
||||
try std.os.bind(sock, &sockaddr.any, sockaddr.getOsSockLen());
|
||||
try std.os.listen(sock, 10);
|
||||
try std.posix.bind(sock, &sockaddr.any, sockaddr.getOsSockLen());
|
||||
try std.posix.listen(sock, 10);
|
||||
self.control_socket = sock;
|
||||
log.debug("added control socket with fd={d}", .{sock});
|
||||
}
|
||||
|
|
|
@ -65,17 +65,17 @@ pub fn parse(self: Self, reader: anytype) !ParsedConfig {
|
|||
defer self.allocator.free(line);
|
||||
const nocomments = std.mem.trim(u8, @constCast(&std.mem.split(u8, line, "#")).first(), ws);
|
||||
var data_iterator = std.mem.split(u8, nocomments, "=");
|
||||
var key = std.mem.trim(u8, data_iterator.first(), ws); // first never fails
|
||||
const key = std.mem.trim(u8, data_iterator.first(), ws); // first never fails
|
||||
if (key.len == 0) continue;
|
||||
var value = std.mem.trim(u8, data_iterator.next() orelse return error.NoValueForKey, ws);
|
||||
const value = std.mem.trim(u8, data_iterator.next() orelse return error.NoValueForKey, ws);
|
||||
// keys should be putNoClobber, but values can be put.
|
||||
// Because we have to dup the memory here though, we want to
|
||||
// manage duplicate values seperately
|
||||
var dup_key = try self.allocator.dupeZ(u8, key);
|
||||
var dup_value = try self.allocator.dupeZ(u8, value);
|
||||
const dup_key = try self.allocator.dupeZ(u8, key);
|
||||
const dup_value = try self.allocator.dupeZ(u8, value);
|
||||
try rc.key_value_map.putNoClobber(dup_key, dup_value);
|
||||
if (!rc.value_key_map.contains(value)) {
|
||||
var keys = try self.allocator.create(std.ArrayList([:0]u8));
|
||||
const keys = try self.allocator.create(std.ArrayList([:0]u8));
|
||||
keys.* = std.ArrayList([:0]u8).init(self.allocator);
|
||||
try rc.value_key_map.put(dup_value, keys);
|
||||
}
|
||||
|
@ -85,7 +85,7 @@ pub fn parse(self: Self, reader: anytype) !ParsedConfig {
|
|||
}
|
||||
|
||||
test "gets config from a stream" {
|
||||
var allocator = std.testing.allocator;
|
||||
const allocator = std.testing.allocator;
|
||||
var stream = std.io.fixedBufferStream(
|
||||
\\# This is a simple "path prefix" = dynamic library path mapping
|
||||
\\ # no reordering will be done, so you must do things most -> least specific
|
||||
|
|
|
@ -44,7 +44,7 @@ pub const ZigRequest = struct {
|
|||
target: []const u8,
|
||||
method: [:0]u8,
|
||||
content: []u8,
|
||||
headers: std.http.Headers,
|
||||
headers: []std.http.Header,
|
||||
};
|
||||
|
||||
pub const ZigHeader = struct {
|
||||
|
@ -56,7 +56,7 @@ pub const ZigResponse = struct {
|
|||
status: std.http.Status = .ok,
|
||||
reason: ?[]const u8 = null,
|
||||
body: *std.ArrayList(u8),
|
||||
headers: std.http.Headers,
|
||||
headers: []std.http.Header,
|
||||
request: ZigRequest,
|
||||
prepend: std.ArrayList(u8),
|
||||
|
||||
|
@ -94,9 +94,9 @@ pub fn zigInit(parent_allocator: *anyopaque) callconv(.C) void {
|
|||
/// Converts a StringHashMap to the structure necessary for passing through the
|
||||
/// C boundary. This will be called automatically for you via the handleRequest function
|
||||
/// and is also used by the main processing loop to coerce request headers
|
||||
fn toHeaders(alloc: std.mem.Allocator, headers: std.http.Headers) ![*]Header {
|
||||
var header_array = try std.ArrayList(Header).initCapacity(alloc, headers.list.items.len);
|
||||
for (headers.list.items) |*field| {
|
||||
fn toHeaders(alloc: std.mem.Allocator, headers: []const std.http.Header) ![*]Header {
|
||||
var header_array = try std.ArrayList(Header).initCapacity(alloc, headers.len);
|
||||
for (headers) |*field| {
|
||||
header_array.appendAssumeCapacity(.{
|
||||
.name_ptr = @constCast(field.name.ptr),
|
||||
.name_len = field.name.len,
|
||||
|
@ -114,7 +114,7 @@ fn toHeaders(alloc: std.mem.Allocator, headers: std.http.Headers) ![*]Header {
|
|||
pub fn handleRequest(request: *Request, zigRequestHandler: ZigRequestHandler) ?*Response {
|
||||
// TODO: implement another library in C or Rust or something to show
|
||||
// that anything using a C ABI can be successful
|
||||
var alloc = if (allocator) |a| a.* else {
|
||||
const alloc = if (allocator) |a| a.* else {
|
||||
log.err("zigInit not called prior to handle_request. This is a coding error", .{});
|
||||
return null;
|
||||
};
|
||||
|
@ -123,12 +123,12 @@ pub fn handleRequest(request: *Request, zigRequestHandler: ZigRequestHandler) ?*
|
|||
var response = std.ArrayList(u8).init(alloc);
|
||||
|
||||
// setup headers
|
||||
var request_headers = std.http.Headers.init(alloc);
|
||||
var request_headers = std.ArrayList(std.http.Header).init(alloc);
|
||||
for (0..request.headers_len) |i|
|
||||
request_headers.append(
|
||||
request.headers[i].name_ptr[0..request.headers[i].name_len],
|
||||
request.headers[i].value_ptr[0..request.headers[i].value_len],
|
||||
) catch |e| {
|
||||
request_headers.append(.{
|
||||
.name = request.headers[i].name_ptr[0..request.headers[i].name_len],
|
||||
.value = request.headers[i].value_ptr[0..request.headers[i].value_len],
|
||||
}) catch |e| {
|
||||
log.err("Unexpected error processing request: {any}", .{e});
|
||||
if (@errorReturnTrace()) |trace| {
|
||||
std.debug.dumpStackTrace(trace.*);
|
||||
|
@ -136,16 +136,22 @@ pub fn handleRequest(request: *Request, zigRequestHandler: ZigRequestHandler) ?*
|
|||
return null;
|
||||
};
|
||||
|
||||
var prepend = std.ArrayList(u8).init(alloc);
|
||||
const prepend = std.ArrayList(u8).init(alloc);
|
||||
var zig_response = ZigResponse{
|
||||
.headers = .{ .allocator = alloc },
|
||||
.headers = &.{},
|
||||
.body = &response,
|
||||
.prepend = prepend,
|
||||
.request = .{
|
||||
.content = request.content[0..request.content_len],
|
||||
.target = request.target[0..request.target_len],
|
||||
.method = request.method[0..request.method_len :0],
|
||||
.headers = request_headers,
|
||||
.headers = request_headers.toOwnedSlice() catch |e| {
|
||||
log.err("Unexpected error processing request: {any}", .{e});
|
||||
if (@errorReturnTrace()) |trace| {
|
||||
std.debug.dumpStackTrace(trace.*);
|
||||
}
|
||||
return null;
|
||||
},
|
||||
},
|
||||
};
|
||||
zigRequestHandler(
|
||||
|
@ -184,7 +190,7 @@ fn buildResponse(alloc: std.mem.Allocator, zig_response: *ZigResponse) ?*Respons
|
|||
}
|
||||
return null;
|
||||
};
|
||||
rc.headers_len = zig_response.headers.list.items.len;
|
||||
rc.headers_len = zig_response.headers.len;
|
||||
rc.status = if (zig_response.status == .ok) 0 else @intFromEnum(zig_response.status);
|
||||
rc.reason_len = 0;
|
||||
if (zig_response.reason) |*r| {
|
||||
|
|
|
@ -24,7 +24,7 @@ const log = std.log.scoped(.@"main-lib");
|
|||
// }
|
||||
//
|
||||
comptime {
|
||||
@export(interface.zigInit, .{ .name = "zigInit", .linkage = .Strong });
|
||||
@export(interface.zigInit, .{ .name = "zigInit", .linkage = .strong });
|
||||
}
|
||||
|
||||
/// handle_request will be called on a single request, but due to the preservation
|
||||
|
@ -58,32 +58,46 @@ export fn handle_request(request: *interface.Request) callconv(.C) ?*interface.R
|
|||
// handleRequest function here is the last line of boilerplate and the
|
||||
// entry to a request
|
||||
fn handleRequest(allocator: std.mem.Allocator, response: *interface.ZigResponse) !void {
|
||||
_ = allocator;
|
||||
// setup
|
||||
var response_writer = response.body.writer();
|
||||
// real work
|
||||
if (response.request.headers.getFirstValue("host")) |host| {
|
||||
if (std.mem.startsWith(u8, host, "iam")) {
|
||||
for (response.request.headers) |h| {
|
||||
if (std.ascii.eqlIgnoreCase(h.name, "host")) {
|
||||
if (std.mem.startsWith(u8, h.value, "iam")) {
|
||||
try response_writer.print("iam response", .{});
|
||||
return;
|
||||
}
|
||||
break;
|
||||
}
|
||||
if (response.request.headers.getFirstValue("x-slow")) |ms| {
|
||||
std.time.sleep(std.time.ns_per_ms * (std.fmt.parseInt(usize, ms, 10) catch 1000));
|
||||
}
|
||||
for (response.request.headers) |h| {
|
||||
if (std.ascii.eqlIgnoreCase(h.name, "x-slow")) {
|
||||
std.time.sleep(std.time.ns_per_ms * (std.fmt.parseInt(usize, h.value, 10) catch 1000));
|
||||
try response_writer.print("i am slow\n\n", .{});
|
||||
return;
|
||||
}
|
||||
if (response.request.headers.getFirstValue("x-log-this")) |l| {
|
||||
try response.writeAll(l);
|
||||
}
|
||||
if (response.request.headers.getFirstValue("x-status")) |s| {
|
||||
response.status = @enumFromInt(std.fmt.parseInt(u10, s, 10) catch 500);
|
||||
for (response.request.headers) |h| {
|
||||
if (std.ascii.eqlIgnoreCase(h.name, "x-log-this")) {
|
||||
try response.writeAll(h.value);
|
||||
break;
|
||||
}
|
||||
if (response.request.headers.getFirstValue("x-throw")) |_| {
|
||||
}
|
||||
for (response.request.headers) |h| {
|
||||
if (std.ascii.eqlIgnoreCase(h.name, "x-status")) {
|
||||
response.status = @enumFromInt(std.fmt.parseInt(u10, h.value, 10) catch 500);
|
||||
break;
|
||||
}
|
||||
}
|
||||
for (response.request.headers) |h| {
|
||||
if (std.ascii.eqlIgnoreCase(h.name, "x-throw"))
|
||||
return error.Thrown;
|
||||
}
|
||||
try response_writer.print(" {d}", .{response.request.headers.list.items.len});
|
||||
try response.headers.append("X-custom-foo", "bar");
|
||||
try response_writer.print(" {d}", .{response.request.headers.len});
|
||||
var headers = std.ArrayList(std.http.Header).init(allocator);
|
||||
try headers.appendSlice(response.headers);
|
||||
try headers.append(.{ .name = "X-custom-foo", .value = "bar" });
|
||||
response.headers = try headers.toOwnedSlice();
|
||||
}
|
||||
|
||||
test "handle_request" {
|
||||
|
@ -91,7 +105,7 @@ test "handle_request" {
|
|||
defer arena.deinit();
|
||||
var aa = arena.allocator();
|
||||
interface.zigInit(&aa);
|
||||
var headers: []interface.Header = @constCast(&[_]interface.Header{.{
|
||||
const headers: []interface.Header = @constCast(&[_]interface.Header{.{
|
||||
.name_ptr = @ptrCast(@constCast("GET".ptr)),
|
||||
.name_len = 3,
|
||||
.value_ptr = @ptrCast(@constCast("GET".ptr)),
|
||||
|
@ -119,7 +133,7 @@ test "lib can write data directly" {
|
|||
defer arena.deinit();
|
||||
var aa = arena.allocator();
|
||||
interface.zigInit(&aa);
|
||||
var headers: []interface.Header = @constCast(&[_]interface.Header{.{
|
||||
const headers: []interface.Header = @constCast(&[_]interface.Header{.{
|
||||
.name_ptr = @ptrCast(@constCast("x-log-this".ptr)),
|
||||
.name_len = "x-log-this".len,
|
||||
.value_ptr = @ptrCast(@constCast("I am a teapot".ptr)),
|
||||
|
@ -146,7 +160,7 @@ test "lib can write data directly and still throw" {
|
|||
defer arena.deinit();
|
||||
var aa = arena.allocator();
|
||||
interface.zigInit(&aa);
|
||||
var headers: []interface.Header = @constCast(&[_]interface.Header{ .{
|
||||
const headers: []interface.Header = @constCast(&[_]interface.Header{ .{
|
||||
.name_ptr = @ptrCast(@constCast("x-log-this".ptr)),
|
||||
.name_len = "x-log-this".len,
|
||||
.value_ptr = @ptrCast(@constCast("I am a teapot".ptr)),
|
||||
|
@ -176,7 +190,7 @@ test "lib can set status, update data directly and still throw" {
|
|||
defer arena.deinit();
|
||||
var aa = arena.allocator();
|
||||
interface.zigInit(&aa);
|
||||
var headers: []interface.Header = @constCast(&[_]interface.Header{ .{
|
||||
const headers: []interface.Header = @constCast(&[_]interface.Header{ .{
|
||||
.name_ptr = @ptrCast(@constCast("x-log-this".ptr)),
|
||||
.name_len = "x-log-this".len,
|
||||
.value_ptr = @ptrCast(@constCast("I am a teapot".ptr)),
|
||||
|
|
255
src/main.zig
255
src/main.zig
|
@ -7,16 +7,17 @@ const config = @import("config.zig");
|
|||
const log = std.log.scoped(.main);
|
||||
|
||||
// logging options
|
||||
pub const std_options = struct {
|
||||
pub const log_level = switch (builtin.mode) {
|
||||
pub const std_options = .{
|
||||
.log_level = if (!builtin.is_test)
|
||||
switch (builtin.mode) {
|
||||
.Debug => .debug,
|
||||
.ReleaseSafe => .info,
|
||||
.ReleaseFast, .ReleaseSmall => .err,
|
||||
};
|
||||
},
|
||||
|
||||
pub const log_scope_levels = &[_]std.log.ScopeLevel{
|
||||
.log_scope_levels = &[_]std.log.ScopeLevel{
|
||||
.{ .scope = .watch, .level = .info },
|
||||
};
|
||||
},
|
||||
};
|
||||
|
||||
const serveFn = *const fn (*interface.Request) ?*interface.Response;
|
||||
|
@ -32,8 +33,9 @@ var watcher_thread: ?std.Thread = null;
|
|||
var timer: ?std.time.Timer = null;
|
||||
|
||||
const FullReturn = struct {
|
||||
response: []u8,
|
||||
executor: *Executor,
|
||||
content: []const u8,
|
||||
executor: *Executor = undefined,
|
||||
respond_options: std.http.Server.Request.RespondOptions,
|
||||
};
|
||||
|
||||
// Executor structure, including functions that were found in the library
|
||||
|
@ -52,9 +54,9 @@ const Executor = struct {
|
|||
|
||||
// fields used for internal accounting
|
||||
watch: ?usize = null,
|
||||
drain_in_progress: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
|
||||
load_in_progress: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
|
||||
requests_in_flight: std.atomic.Atomic(usize) = std.atomic.Atomic(usize).init(0),
|
||||
drain_in_progress: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
|
||||
load_in_progress: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
|
||||
requests_in_flight: std.atomic.Value(usize) = std.atomic.Value(usize).init(0),
|
||||
};
|
||||
|
||||
const SERVE_FN_NAME = "handle_request";
|
||||
|
@ -67,68 +69,68 @@ var parsed_config: config.ParsedConfig = undefined;
|
|||
|
||||
/// Serves a single request. Finds executor, marshalls request data for the C
|
||||
/// interface, calls the executor and marshalls data back
|
||||
fn serve(allocator: *std.mem.Allocator, response: *std.http.Server.Response) !*FullReturn {
|
||||
const executor = try getExecutor(response.request.target, response.request.headers);
|
||||
errdefer _ = executor.requests_in_flight.fetchSub(1, .Monotonic);
|
||||
fn serve(allocator: *std.mem.Allocator, request: *std.http.Server.Request) !*FullReturn {
|
||||
var head_it = request.iterateHeaders();
|
||||
const headers = try toHeaders(allocator.*, &head_it);
|
||||
const executor = try getExecutor(request.head.target, headers);
|
||||
errdefer _ = executor.requests_in_flight.fetchSub(1, .monotonic);
|
||||
if (executor.zigInitFn) |f|
|
||||
f(allocator);
|
||||
|
||||
// Call external library
|
||||
const method_tag = @tagName(response.request.method);
|
||||
const headers = try toHeaders(allocator.*, response.request.headers);
|
||||
var request_content: []u8 = &[_]u8{};
|
||||
if (response.request.content_length) |l| {
|
||||
request_content = try response.reader().readAllAlloc(allocator.*, @as(usize, l));
|
||||
}
|
||||
const method_tag = @tagName(request.head.method);
|
||||
const request_content: []u8 = try (try request.reader()).readAllAlloc(allocator.*, std.math.maxInt(usize));
|
||||
log.debug("{d} bytes read from request", .{request_content.len});
|
||||
var request = interface.Request{
|
||||
.target = response.request.target.ptr,
|
||||
.target_len = response.request.target.len,
|
||||
var i_request = interface.Request{
|
||||
.target = request.head.target.ptr,
|
||||
.target_len = request.head.target.len,
|
||||
|
||||
.method = @constCast(method_tag[0..].ptr),
|
||||
.method_len = method_tag.len,
|
||||
|
||||
.headers = headers,
|
||||
.headers_len = response.request.headers.list.items.len,
|
||||
.headers = headers.ptr,
|
||||
.headers_len = headers.len,
|
||||
|
||||
.content = request_content.ptr,
|
||||
.content_len = request_content.len,
|
||||
};
|
||||
var serve_result = executor.serveFn.?(&request).?; // ok for this pointer deref to fail
|
||||
log.debug("target: {s}", .{response.request.target});
|
||||
var serve_result = executor.serveFn.?(&i_request).?; // ok for this pointer deref to fail
|
||||
log.debug("target: {s}", .{request.head.target});
|
||||
log.debug("response ptr: {*}", .{serve_result.ptr});
|
||||
var slice: []u8 = serve_result.ptr[0..serve_result.len];
|
||||
log.debug("response body: {s}", .{slice});
|
||||
var response = try allocator.create(FullReturn); // allocator is arena here
|
||||
response.content = serve_result.ptr[0..serve_result.len];
|
||||
response.executor = executor;
|
||||
response.respond_options = .{};
|
||||
log.debug("response body: {s}", .{response.content});
|
||||
|
||||
if (serve_result.status != 0) {
|
||||
response.status = @enumFromInt(serve_result.status);
|
||||
response.respond_options.status = @enumFromInt(serve_result.status);
|
||||
if (serve_result.reason_len > 0) {
|
||||
response.reason = serve_result.reason_ptr[0..serve_result.reason_len];
|
||||
response.respond_options.reason = serve_result.reason_ptr[0..serve_result.reason_len];
|
||||
}
|
||||
}
|
||||
// Deal with results
|
||||
// Deal with response headers
|
||||
var response_headers = try std.ArrayList(std.http.Header).initCapacity(allocator.*, serve_result.headers_len + 1);
|
||||
defer response_headers.deinit();
|
||||
var content_type_added = false;
|
||||
for (0..serve_result.headers_len) |inx| {
|
||||
const head = serve_result.headers[inx];
|
||||
try response.headers.append(
|
||||
head.name_ptr[0..head.name_len],
|
||||
head.value_ptr[0..head.value_len],
|
||||
);
|
||||
response_headers.appendAssumeCapacity(.{
|
||||
.name = head.name_ptr[0..head.name_len],
|
||||
.value = head.value_ptr[0..head.value_len],
|
||||
});
|
||||
|
||||
// headers are case insensitive
|
||||
content_type_added = std.ascii.eqlIgnoreCase(head.name_ptr[0..head.name_len], "content-type");
|
||||
}
|
||||
if (!content_type_added)
|
||||
try response.headers.append("content-type", "text/plain");
|
||||
// target is path
|
||||
var rc = try allocator.create(FullReturn);
|
||||
rc.executor = executor;
|
||||
rc.response = slice;
|
||||
return rc;
|
||||
response_headers.appendAssumeCapacity(.{ .name = "content-type", .value = "text/plain" });
|
||||
response.respond_options.extra_headers = try response_headers.toOwnedSlice();
|
||||
return response;
|
||||
}
|
||||
|
||||
/// Gets and executor based on request data
|
||||
fn getExecutor(requested_path: []const u8, headers: std.http.Headers) !*Executor {
|
||||
fn getExecutor(requested_path: []const u8, headers: []interface.Header) !*Executor {
|
||||
var executor = blk: {
|
||||
for (executors) |*exec| {
|
||||
if (executorIsMatch(exec.match_data, requested_path, headers)) {
|
||||
|
@ -138,15 +140,15 @@ fn getExecutor(requested_path: []const u8, headers: std.http.Headers) !*Executor
|
|||
log.err("Could not find executor for target path '{s}'", .{requested_path});
|
||||
return error.NoApplicableExecutor;
|
||||
};
|
||||
while (executor.drain_in_progress.load(.Acquire)) {
|
||||
while (executor.drain_in_progress.load(.acquire)) {
|
||||
// we need to stand down and stand by
|
||||
std.atomic.spinLoopHint(); // Let CPU know we're just hanging out
|
||||
std.time.sleep(1 * std.time.ns_per_ms / 2);
|
||||
}
|
||||
// Tell everyone we're about to use this bad boy
|
||||
// While requests_in_flight is >= 0, nobody should unload the library
|
||||
_ = executor.requests_in_flight.fetchAdd(1, .Acquire);
|
||||
errdefer _ = executor.requests_in_flight.fetchSub(1, .Release);
|
||||
_ = executor.requests_in_flight.fetchAdd(1, .acquire);
|
||||
errdefer _ = executor.requests_in_flight.fetchSub(1, .release);
|
||||
|
||||
if (executor.serveFn != null) return executor;
|
||||
|
||||
|
@ -157,13 +159,13 @@ fn getExecutor(requested_path: []const u8, headers: std.http.Headers) !*Executor
|
|||
// If the library is being reloaded and a bunch of requests come in,
|
||||
// we could have multiple threads racing to load
|
||||
// NOTE: I am not confident of the memory ordering here on tryCompareAndSwap
|
||||
while (executor.load_in_progress.tryCompareAndSwap(false, true, .Acquire, .Acquire)) |_| {
|
||||
while (@cmpxchgWeak(bool, &executor.load_in_progress.raw, false, true, .acquire, .acquire)) |_| {
|
||||
// we need to stand down and stand by
|
||||
std.atomic.spinLoopHint(); // Let CPU know we're just hanging out
|
||||
std.time.sleep(1 * std.time.ns_per_ms / 2);
|
||||
}
|
||||
// we have the conch...lock others out
|
||||
defer executor.load_in_progress.store(false, .Release);
|
||||
defer executor.load_in_progress.store(false, .release);
|
||||
|
||||
if (executor.library) |l|
|
||||
break :blk l; // someone beat us to the race..our defer above will take care of unlocking
|
||||
|
@ -185,7 +187,7 @@ fn getExecutor(requested_path: []const u8, headers: std.http.Headers) !*Executor
|
|||
return executor;
|
||||
}
|
||||
|
||||
fn executorIsMatch(match_data: []const u8, requested_path: []const u8, headers: std.http.Headers) bool {
|
||||
fn executorIsMatch(match_data: []const u8, requested_path: []const u8, headers: []interface.Header) bool {
|
||||
if (!std.mem.containsAtLeast(u8, match_data, 1, ":")) {
|
||||
// match_data does not have a ':'. This means this is a straight path, without
|
||||
// any header requirement. We can simply return a match prefix on the
|
||||
|
@ -196,7 +198,13 @@ fn executorIsMatch(match_data: []const u8, requested_path: []const u8, headers:
|
|||
}
|
||||
const colon = std.mem.indexOf(u8, match_data, ":").?;
|
||||
const header_needle = match_data[0..colon];
|
||||
const header_inx = headers.firstIndexOf(header_needle) orelse return false;
|
||||
const header_inx = blk: {
|
||||
for (headers, 0..) |h, i| {
|
||||
if (std.ascii.eqlIgnoreCase(h.name_ptr[0..h.name_len], header_needle))
|
||||
break :blk i;
|
||||
}
|
||||
return false;
|
||||
};
|
||||
// Apparently std.mem.split will return an empty first when the haystack starts
|
||||
// with the delimiter
|
||||
var split = std.mem.split(u8, std.mem.trim(u8, match_data[colon + 1 ..], "\t "), " ");
|
||||
|
@ -211,7 +219,7 @@ fn executorIsMatch(match_data: []const u8, requested_path: []const u8, headers:
|
|||
// match_data includes some sort of header match as well. We assume the
|
||||
// header match is a full match on the key (handled above)
|
||||
// but a prefix match on the value
|
||||
const request_header_value = headers.list.items[header_inx].value;
|
||||
const request_header_value = headers[header_inx].value_ptr[0..headers[header_inx].value_len];
|
||||
// (shoud this be case insensitive?)
|
||||
if (!std.mem.startsWith(u8, request_header_value, header_value_needle)) return false;
|
||||
// header value matches...return the path prefix match
|
||||
|
@ -243,9 +251,9 @@ fn executorChanged(watch: usize) void {
|
|||
if (executor.watch) |w| {
|
||||
if (w == watch) {
|
||||
if (executor.library) |l| {
|
||||
executor.drain_in_progress.store(true, .Release);
|
||||
defer executor.drain_in_progress.store(false, .Release);
|
||||
while (executor.requests_in_flight.load(.Acquire) > 0) {
|
||||
executor.drain_in_progress.store(true, .release);
|
||||
defer executor.drain_in_progress.store(false, .release);
|
||||
while (executor.requests_in_flight.load(.acquire) > 0) {
|
||||
std.atomic.spinLoopHint();
|
||||
std.time.sleep(1 * std.time.ns_per_ms / 2);
|
||||
}
|
||||
|
@ -257,7 +265,7 @@ fn executorChanged(watch: usize) void {
|
|||
log.warn("could not reload! error opening library", .{});
|
||||
return;
|
||||
};
|
||||
var symbol = std.c.dlsym(executor.library.?, SERVE_FN_NAME);
|
||||
const symbol = std.c.dlsym(executor.library.?, SERVE_FN_NAME);
|
||||
if (symbol == null) {
|
||||
log.warn("could not reload! error finding symbol", .{});
|
||||
if (std.c.dlclose(executor.library.?) != 0)
|
||||
|
@ -284,11 +292,11 @@ fn dlopen(path: [:0]const u8) !*anyopaque {
|
|||
/// exit from the application as the main function has an infinite loop
|
||||
fn exitApplication(
|
||||
_: i32,
|
||||
_: *const std.os.siginfo_t,
|
||||
_: *const std.posix.siginfo_t,
|
||||
_: ?*const anyopaque,
|
||||
) callconv(.C) noreturn {
|
||||
exitApp(0);
|
||||
std.os.exit(0);
|
||||
std.posix.exit(0);
|
||||
}
|
||||
|
||||
/// exitApp handles deinitialization for the application and any reporting
|
||||
|
@ -302,7 +310,7 @@ fn exitApp(exitcode: u8) void {
|
|||
watcher.stopWatch() catch @panic("could not stop watcher");
|
||||
std.io.getStdOut().writer().print("exiting application\n", .{}) catch {};
|
||||
watcher.deinit();
|
||||
std.os.exit(exitcode);
|
||||
std.posix.exit(exitcode);
|
||||
// joining threads will hang...we're ultimately in a signal handler.
|
||||
// But everything is shut down cleanly now, so I don't think it hurts to
|
||||
// just kill it all (NOTE: from a practical perspective, we do not seem
|
||||
|
@ -316,7 +324,7 @@ fn exitApp(exitcode: u8) void {
|
|||
/// has been implemented. Operates off the main thread
|
||||
fn reloadConfig(
|
||||
_: i32,
|
||||
_: *const std.os.siginfo_t,
|
||||
_: *const std.posix.siginfo_t,
|
||||
_: ?*const anyopaque,
|
||||
) callconv(.C) void {
|
||||
// TODO: Gracefully drain in flight requests and hold a lock here while
|
||||
|
@ -334,36 +342,36 @@ fn reloadConfig(
|
|||
|
||||
/// Installs all signal handlers for shutdown and configuration reload
|
||||
fn installSignalHandler() !void {
|
||||
var act = std.os.Sigaction{
|
||||
var act = std.posix.Sigaction{
|
||||
.handler = .{ .sigaction = exitApplication },
|
||||
.mask = std.os.empty_sigset,
|
||||
.flags = (std.os.SA.SIGINFO | std.os.SA.RESTART | std.os.SA.RESETHAND),
|
||||
.mask = std.posix.empty_sigset,
|
||||
.flags = (std.posix.SA.SIGINFO | std.posix.SA.RESTART | std.posix.SA.RESETHAND),
|
||||
};
|
||||
|
||||
try std.os.sigaction(std.os.SIG.INT, &act, null);
|
||||
try std.os.sigaction(std.os.SIG.TERM, &act, null);
|
||||
try std.posix.sigaction(std.posix.SIG.INT, &act, null);
|
||||
try std.posix.sigaction(std.posix.SIG.TERM, &act, null);
|
||||
|
||||
var hup_act = std.os.Sigaction{
|
||||
var hup_act = std.posix.Sigaction{
|
||||
.handler = .{ .sigaction = reloadConfig },
|
||||
.mask = std.os.empty_sigset,
|
||||
.flags = (std.os.SA.SIGINFO | std.os.SA.RESTART | std.os.SA.RESETHAND),
|
||||
.mask = std.posix.empty_sigset,
|
||||
.flags = (std.posix.SA.SIGINFO | std.posix.SA.RESTART | std.posix.SA.RESETHAND),
|
||||
};
|
||||
try std.os.sigaction(std.os.SIG.HUP, &hup_act, null);
|
||||
try std.posix.sigaction(std.posix.SIG.HUP, &hup_act, null);
|
||||
}
|
||||
|
||||
pub fn main() !void {
|
||||
// We are linked to libc, and primarily using the arena allocator
|
||||
// raw allocator recommended for use in arenas
|
||||
var raw_allocator = std.heap.raw_c_allocator;
|
||||
const raw_allocator = std.heap.raw_c_allocator;
|
||||
|
||||
// Our child process will also need an allocator, and is using the
|
||||
// same pattern, so we will hand the child a raw allocator as well
|
||||
var child_allocator = std.heap.raw_c_allocator;
|
||||
const child_allocator = std.heap.raw_c_allocator;
|
||||
|
||||
// Lastly, we need some of our own operations
|
||||
var arena = std.heap.ArenaAllocator.init(raw_allocator);
|
||||
defer arena.deinit();
|
||||
var parent_allocator = arena.allocator();
|
||||
const parent_allocator = arena.allocator();
|
||||
|
||||
var al = std.ArrayList([]const u8).init(parent_allocator);
|
||||
defer al.deinit();
|
||||
|
@ -408,25 +416,22 @@ fn childMain(allocator: std.mem.Allocator) !void {
|
|||
watcher = Watch.init(executorChanged);
|
||||
watcher_thread = try std.Thread.spawn(.{}, Watch.startWatch, .{&watcher});
|
||||
|
||||
var server = std.http.Server.init(allocator, .{ .reuse_address = true });
|
||||
defer server.deinit();
|
||||
|
||||
const address = try std.net.Address.parseIp(
|
||||
"0.0.0.0",
|
||||
if (std.os.getenv("PORT")) |p| try std.fmt.parseInt(u16, p, 10) else PORT,
|
||||
if (std.posix.getenv("PORT")) |p| try std.fmt.parseInt(u16, p, 10) else PORT,
|
||||
);
|
||||
try server.listen(address);
|
||||
const server_port = server.socket.listen_address.in.getPort();
|
||||
var server = try address.listen(.{ .reuse_address = true });
|
||||
const server_port = server.listen_address.in.getPort();
|
||||
log.info("listening on port: {d}", .{server_port});
|
||||
if (builtin.os.tag == .linux)
|
||||
log.info("pid: {d}", .{std.os.linux.getpid()});
|
||||
|
||||
try installSignalHandler();
|
||||
response_preallocation_kb = if (std.os.getenv("RESPONSE_PREALLOCATION_KB")) |kb|
|
||||
response_preallocation_kb = if (std.posix.getenv("RESPONSE_PREALLOCATION_KB")) |kb|
|
||||
try std.fmt.parseInt(usize, kb, 10)
|
||||
else
|
||||
response_preallocation_kb;
|
||||
var server_thread_count = if (std.os.getenv("SERVER_THREAD_COUNT")) |count|
|
||||
const server_thread_count = if (std.posix.getenv("SERVER_THREAD_COUNT")) |count|
|
||||
try std.fmt.parseInt(usize, count, 10)
|
||||
else switch (builtin.mode) {
|
||||
.Debug => @min(4, try std.Thread.getCpuCount()),
|
||||
|
@ -451,7 +456,7 @@ fn childMain(allocator: std.mem.Allocator) !void {
|
|||
for (server_threads.items) |thread| thread.join();
|
||||
}
|
||||
|
||||
fn threadMain(allocator: std.mem.Allocator, server: *std.http.Server, thread_number: usize) !void {
|
||||
fn threadMain(allocator: std.mem.Allocator, server: *std.net.Server, thread_number: usize) !void {
|
||||
// TODO: If we're in a thread pool we need to be careful with this...
|
||||
const stdout_file = std.io.getStdOut().writer();
|
||||
var bw = std.io.bufferedWriter(stdout_file);
|
||||
|
@ -505,37 +510,42 @@ fn loadConfig(allocator: std.mem.Allocator) ![]Executor {
|
|||
/// main loop calls processRequest, which is responsible for the interface
|
||||
/// with logs, connection accounting, etc. The work dealing with the request
|
||||
/// itself is delegated to the serve function to work with the executor
|
||||
fn processRequest(allocator: *std.mem.Allocator, server: *std.http.Server, writer: anytype) !void {
|
||||
fn processRequest(allocator: *std.mem.Allocator, server: *std.net.Server, writer: anytype) !void {
|
||||
if (timer == null) timer = try std.time.Timer.start();
|
||||
var tm = timer.?;
|
||||
var res = try server.accept(.{ .allocator = allocator.* });
|
||||
defer res.deinit();
|
||||
defer _ = res.reset();
|
||||
try res.wait(); // wait for client to send a complete request head
|
||||
var connection = try server.accept();
|
||||
defer connection.stream.close();
|
||||
var read_buffer: [1024 * 16]u8 = undefined; // TODO: fix this
|
||||
var server_connection = std.http.Server.init(connection, &read_buffer);
|
||||
var req = try server_connection.receiveHead();
|
||||
|
||||
// TODO: should we check for server_connection.state == .ready?
|
||||
// I believe it's fair to start our timer after this is done
|
||||
tm.reset();
|
||||
const err_return = FullReturn{
|
||||
.content = "Internal Server Error\n",
|
||||
.respond_options = .{ .status = .internal_server_error },
|
||||
};
|
||||
|
||||
// This is an nginx log:
|
||||
// git.lerch.org 50.39.111.175 - - [16/May/2023:02:56:31 +0000] "POST /api/actions/runner.v1.RunnerService/FetchTask HTTP/2.0" 200 0 "-" "connect-go/1.2.0-dev (go1.20.1)" "172.20.0.5:3000"
|
||||
// TODO: replicate this
|
||||
try writer.print("{} - - \"{s} {s} {s}\"", .{
|
||||
res.address,
|
||||
@tagName(res.request.method),
|
||||
res.request.target,
|
||||
@tagName(res.request.version),
|
||||
server_connection.connection.address,
|
||||
@tagName(req.head.method),
|
||||
req.head.target,
|
||||
@tagName(req.head.version),
|
||||
});
|
||||
const errstr = "Internal Server Error\n";
|
||||
var errbuf: [errstr.len]u8 = undefined;
|
||||
var response_bytes = try std.fmt.bufPrint(&errbuf, errstr, .{});
|
||||
|
||||
var full_response = serve(allocator, &res) catch |e| brk: {
|
||||
res.status = .internal_server_error;
|
||||
// TODO: more about this particular request
|
||||
var caught_err = false;
|
||||
const full_response = serve(allocator, &req) catch |e| brk: {
|
||||
log.err("Unexpected error from executor processing request: {any}", .{e});
|
||||
// TODO: more about this particular request
|
||||
if (@errorReturnTrace()) |trace| {
|
||||
std.debug.dumpStackTrace(trace.*);
|
||||
}
|
||||
break :brk null;
|
||||
|
||||
caught_err = true;
|
||||
break :brk &err_return;
|
||||
};
|
||||
defer {
|
||||
// The call above converts uncaught errors to a null
|
||||
|
@ -545,30 +555,29 @@ fn processRequest(allocator: *std.mem.Allocator, server: *std.http.Server, write
|
|||
// This leaves this defer block as the only place we can/should decrement
|
||||
// under normal conditions
|
||||
|
||||
if (full_response) |f| {
|
||||
if (f.executor.requestDeinitFn) |d| d();
|
||||
_ = f.executor.requests_in_flight.fetchSub(1, .Release);
|
||||
if (!caught_err) {
|
||||
if (full_response.executor.requestDeinitFn) |d| d();
|
||||
_ = full_response.executor.requests_in_flight.fetchSub(1, .release);
|
||||
}
|
||||
}
|
||||
if (full_response) |f|
|
||||
response_bytes = f.response;
|
||||
res.transfer_encoding = .{ .content_length = response_bytes.len };
|
||||
try res.headers.append("connection", "close");
|
||||
try writer.print(" {d} ttfb {d:.3}ms", .{ @intFromEnum(res.status), @as(f64, @floatFromInt(tm.read())) / std.time.ns_per_ms });
|
||||
if (builtin.is_test) writeToTestBuffers(response_bytes, &res);
|
||||
try res.do();
|
||||
_ = try res.writer().writeAll(response_bytes);
|
||||
try res.finish();
|
||||
try writer.print(" {d} ttfb {d:.3}ms", .{
|
||||
@intFromEnum(full_response.respond_options.status),
|
||||
@as(f64, @floatFromInt(tm.read())) / std.time.ns_per_ms,
|
||||
});
|
||||
if (builtin.is_test) writeToTestBuffers(full_response.content);
|
||||
req.respond(full_response.content, full_response.respond_options) catch |e| {
|
||||
log.err("Unexpected error responding to client: {any}", .{e});
|
||||
};
|
||||
try writer.print(" {d} ttlb {d:.3}ms", .{
|
||||
response_bytes.len,
|
||||
full_response.content.len,
|
||||
@as(f64, @floatFromInt(tm.read())) / std.time.ns_per_ms,
|
||||
});
|
||||
}
|
||||
|
||||
fn toHeaders(allocator: std.mem.Allocator, headers: std.http.Headers) ![*]interface.Header {
|
||||
var header_array = try std.ArrayList(interface.Header).initCapacity(allocator, headers.list.items.len);
|
||||
for (headers.list.items) |kv| {
|
||||
header_array.appendAssumeCapacity(.{
|
||||
fn toHeaders(allocator: std.mem.Allocator, headers: *std.http.HeaderIterator) ![]interface.Header {
|
||||
var header_array = std.ArrayList(interface.Header).init(allocator);
|
||||
while (headers.next()) |kv| {
|
||||
try header_array.append(.{
|
||||
.name_ptr = @constCast(kv.name).ptr,
|
||||
.name_len = kv.name.len,
|
||||
|
||||
|
@ -576,7 +585,7 @@ fn toHeaders(allocator: std.mem.Allocator, headers: std.http.Headers) ![*]interf
|
|||
.value_len = kv.value.len,
|
||||
});
|
||||
}
|
||||
return header_array.items.ptr;
|
||||
return try header_array.toOwnedSlice();
|
||||
}
|
||||
|
||||
/// Allocates at least preallocated_kb kilobytes of ram for usage. Some overhead
|
||||
|
@ -592,12 +601,11 @@ fn preWarmArena(aa: std.mem.Allocator, arena: *std.heap.ArenaAllocator, prealloc
|
|||
);
|
||||
if (!arena.reset(.{ .retain_with_limit = (arena.queryCapacity() + @as(usize, 1023)) / @as(usize, 1024) * 1024 }))
|
||||
log.warn("arena reset failed, arena degraded", .{});
|
||||
var bytes_allocated = arena.queryCapacity();
|
||||
const bytes_allocated = arena.queryCapacity();
|
||||
log.debug("preallocated {d} bytes", .{bytes_allocated});
|
||||
return bytes_allocated;
|
||||
}
|
||||
fn writeToTestBuffers(response: []const u8, res: *std.http.Server.Response) void {
|
||||
_ = res;
|
||||
fn writeToTestBuffers(response: []const u8) void {
|
||||
log.debug("writing to test buffers", .{});
|
||||
// This performs core dump...because we're in a separate thread?
|
||||
// @memset(test_resp_buf, 0);
|
||||
|
@ -615,16 +623,13 @@ fn testRequest(request_bytes: []const u8) !void {
|
|||
var arena = std.heap.ArenaAllocator.init(allocator);
|
||||
defer arena.deinit();
|
||||
|
||||
var server = std.http.Server.init(allocator, .{ .reuse_address = true });
|
||||
defer server.deinit();
|
||||
|
||||
const address = try std.net.Address.parseIp("127.0.0.1", 0);
|
||||
try server.listen(address);
|
||||
const server_port = server.socket.listen_address.in.getPort();
|
||||
var http_server = try address.listen(.{ .reuse_address = true });
|
||||
const server_port = http_server.listen_address.in.getPort();
|
||||
|
||||
var al = std.ArrayList(u8).init(allocator);
|
||||
defer al.deinit();
|
||||
var writer = al.writer();
|
||||
const writer = al.writer();
|
||||
var aa = arena.allocator();
|
||||
var bytes_allocated: usize = 0;
|
||||
// pre-warm
|
||||
|
@ -632,7 +637,7 @@ fn testRequest(request_bytes: []const u8) !void {
|
|||
const server_thread = try std.Thread.spawn(
|
||||
.{},
|
||||
processRequest,
|
||||
.{ &aa, &server, writer },
|
||||
.{ &aa, &http_server, writer },
|
||||
);
|
||||
|
||||
const stream = try std.net.tcpConnectToHost(allocator, "127.0.0.1", server_port);
|
||||
|
@ -670,14 +675,12 @@ test {
|
|||
var test_resp_buf: [1024]u8 = undefined;
|
||||
var test_resp_buf_len: usize = undefined;
|
||||
test "root path get" {
|
||||
std.testing.log_level = .debug;
|
||||
log.debug("", .{});
|
||||
try testGet("/");
|
||||
try std.testing.expectEqual(@as(usize, 2), test_resp_buf_len);
|
||||
try std.testing.expectEqualStrings(" 1", test_resp_buf[0..test_resp_buf_len]);
|
||||
}
|
||||
test "root path, alternative host get" {
|
||||
std.testing.log_level = .debug;
|
||||
log.debug("", .{});
|
||||
try testHostGet("iam.aws.lerch.org", "/");
|
||||
try std.testing.expectEqualStrings("iam response", test_resp_buf[0..test_resp_buf_len]);
|
||||
|
|
Loading…
Reference in New Issue
Block a user