Compare commits

...

10 Commits

6 changed files with 315 additions and 65 deletions

View File

@ -0,0 +1,72 @@
[![](flexilib.png)](flexilib.png)
FlexiLib
========
This is a web server written with the following goals:
* Low memory consumption
* Low latency
* Flexible "reverse proxy" capabilities
* Ability to simulate FAAS capabilities of various online providers (AWS, CloudFlare, etc)
This last point is indirectly supported through the ability of the server to
load, at run time, dynamic libraries to support requests. It will also reload
these libraries after any in flight requests have completed, to support the
experience of developing new libaries.
Libraries can be written in any programming language that supports a standard
Linux C-Based calling convention, which is to say, nearly every programming
language.
This project provides slightly better development and performance characteristics
if the library used is written in [zig](https://ziglang.org). An example zig-based
library can be found in src/main-lib.zig.
Architecture
------------
We assume Linux.
To achieve the lowest latency possible and eliminate the proliferation, The architecture of this server is setup
Security
--------
There is little attempt to secure libraries from interfering with the current
thread or even the main process. As such, the libraries should be fully trusted.
However, libraries themselves may be hardened to run other non-trusted code.
For example: A "I run WASM code" library may be written to create a WASM VM and
run user-supplied WASM code. In that case, the "I run WASM code" library is
trusted, although the code it runs may not be.
Configuration
-------------
Very little has been done so far in terms of configuration. By default, the
number of threads created to serve requests is equal to the number of CPUs
reported by the system (although thread count is limited to 4 threads when
compiled in debug mode). This can be controlled with the environment variable
`SERVER_THREAD_COUNT`.
Future plans include an environment variable for IP address and port to listen
on, as well as the amount of pre-allocated memory for response data (currently
hardcoded to 1k/thread). Pre-allocated memory reduces the number of system
calls required for memory allocation, and pre-allocation/allocation statistics
per request are reported in the logs.
Logs
----
Request logs are sent to standard out, and are likely to change. Here is a sample:
```
127.0.0.1:59940 - - "GET / HTTP/1.1" 200 ttfb 2000.420ms 11 ttlb 2000.568ms (pre-alloc: 1569, alloc: 4350)
```
The first part mirrors common logs from Apache/nginx.
ttfb: Time to first byte. This represents the number of ms of processing within the library
ttlb: Time to last byte. This includes processing as well a transmission of data
pre-alloc: The amount of memory actually pre-allocated (1k is just a minimum and the system may allocate more)
alloc: The amount of memory actually allocated during the request

View File

@ -4,6 +4,8 @@
# for libraries to self-select whether they can handle a request, which opens # for libraries to self-select whether they can handle a request, which opens
# up additional possibilities # up additional possibilities
# Example of match based on an HTTP header. The key is space-delimited:
# <http header key>: <header match prefix> <path match prefix>
Host: iam / = zig-out/lib/libfaas-proxy-sample-lib.so
/c = zig-out/lib/libfaas-proxy-sample-lib-in-c.so /c = zig-out/lib/libfaas-proxy-sample-lib-in-c.so
/ = zig-out/lib/libfaas-proxy-sample-lib.so / = zig-out/lib/libfaas-proxy-sample-lib.so
Host: iam.aws.lerch.org / = zig-out/lib/libfaas-proxy-sample-lib.so

View File

@ -73,12 +73,12 @@ pub fn startWatch(self: *Self) void {
self.watch_started = true; self.watch_started = true;
var fds = if (self.inotify_fd == null) var fds = if (self.inotify_fd == null)
&[_]std.os.pollfd{.{ .fd = self.control_socket.?, .events = std.os.POLL.IN, .revents = undefined }} @constCast(&[_]std.os.pollfd{.{ .fd = self.control_socket.?, .events = std.os.POLL.IN, .revents = undefined }})
else else
&[_]std.os.pollfd{ @constCast(&[_]std.os.pollfd{
.{ .fd = self.control_socket.?, .events = std.os.POLL.IN, .revents = undefined }, .{ .fd = self.control_socket.?, .events = std.os.POLL.IN, .revents = undefined },
.{ .fd = self.inotify_fd.?, .events = std.os.POLL.IN, .revents = undefined }, .{ .fd = self.inotify_fd.?, .events = std.os.POLL.IN, .revents = undefined },
}; });
const control_fd_inx = 0; const control_fd_inx = 0;
const inotify_fd_inx = 1; const inotify_fd_inx = 1;

View File

@ -38,6 +38,11 @@ pub const ZigRequest = struct {
headers: []Header, headers: []Header,
}; };
pub const ZigHeader = struct {
name: []u8,
value: []u8,
};
pub const ZigResponse = struct { pub const ZigResponse = struct {
body: *std.ArrayList(u8), body: *std.ArrayList(u8),
headers: *std.StringHashMap([]const u8), headers: *std.StringHashMap([]const u8),
@ -55,10 +60,17 @@ pub fn zigInit(parent_allocator: *anyopaque) callconv(.C) void {
allocator = @ptrCast(*std.mem.Allocator, @alignCast(@alignOf(*std.mem.Allocator), parent_allocator)); allocator = @ptrCast(*std.mem.Allocator, @alignCast(@alignOf(*std.mem.Allocator), parent_allocator));
} }
pub fn toZigHeader(header: Header) ZigHeader {
return .{
.name = header.name_ptr[0..header.name_len],
.value = header.value_ptr[0..header.value_len],
};
}
/// Converts a StringHashMap to the structure necessary for passing through the /// Converts a StringHashMap to the structure necessary for passing through the
/// C boundary. This will be called automatically for you via the handleRequest function /// C boundary. This will be called automatically for you via the handleRequest function
/// and is also used by the main processing loop to coerce request headers /// and is also used by the main processing loop to coerce request headers
pub fn toHeaders(alloc: std.mem.Allocator, headers: std.StringHashMap([]const u8)) ![*]Header { fn toHeaders(alloc: std.mem.Allocator, headers: std.StringHashMap([]const u8)) ![*]Header {
var header_array = try std.ArrayList(Header).initCapacity(alloc, headers.count()); var header_array = try std.ArrayList(Header).initCapacity(alloc, headers.count());
var iterator = headers.iterator(); var iterator = headers.iterator();
while (iterator.next()) |kv| { while (iterator.next()) |kv| {

View File

@ -4,30 +4,52 @@ const testing = std.testing;
const log = std.log.scoped(.@"main-lib"); const log = std.log.scoped(.@"main-lib");
// request_deinit is an optional export and will be called a the end of the // The main program will look for exports during the request lifecycle:
// request. Useful for deallocating memory // zigInit (optional): called at the beginning of a request, includes pointer to an allocator
// export fn request_deinit() void { // handle_request: called with request data, expects response data
// request_deinit (optional): called at the end of a request to allow resource cleanup
//
// Setup for these is aided by the interface library as shown below
// zigInit is an optional export called at the beginning of a request. It will
// be passed an allocator (which...shh...is an arena allocator). Since the
// interface library provides a request handler that requires a built-in allocator,
// if you are using the interface's handleRequest function as shown above,
// you will need to also include this export. To customize, just do something
// like this:
//
// export fn zigInit(parent_allocator: *anyopaque) callconv(.C) void {
// // your code here, just include the next line
// interface.zigInit(parent_allocator);
// } // }
//
comptime {
@export(interface.zigInit, .{ .name = "zigInit", .linkage = .Strong });
}
/// handle_request will be called on a single request, but due to the preservation /// handle_request will be called on a single request, but due to the preservation
/// of restrictions imposed by the calling interface, it should generally be more /// of restrictions imposed by the calling interface, it should generally be more
/// useful to call into the interface library to let it do the conversion work /// useful to call into the interface library to let it do the conversion work
/// on your behalf /// on your behalf
export fn handle_request(request: *interface.Request) callconv(.C) ?*interface.Response { export fn handle_request(request: *interface.Request) callconv(.C) ?*interface.Response {
// The interface library provides a handleRequest function that will handle
// marshalling data back and forth from the C format used for the interface
// to a more Zig friendly format. It also allows usage of zig errors. To
// use, pass in the request and the zig function used to handle the request
// (here called "handleRequest"). The function signature must be:
//
// fn (std.mem.Allocator, interface.ZigRequest, interface.ZigResponse) !void
//
return interface.handleRequest(request, handleRequest); return interface.handleRequest(request, handleRequest);
} }
// zigInit is an optional export called at the beginning of a request. It will // request_deinit is an optional export and will be called a the end of the
// be passed an allocator (which...shh...is an arena allocator). Since the // request. Useful for deallocating memory. Since this is zig code and the
// interface library provides a request handler that requires a built-in allocator, // allocator used is an arena allocator, all allocated memory will be automatically
// if you are using the interface library, you will need to also include this // cleaned up by the main program at the end of a request
// export //
comptime { // export fn request_deinit() void {
@export( // }
interface.zigInit,
.{ .name = "zigInit", .linkage = .Strong },
);
}
// ************************************************************************ // ************************************************************************
// Boilerplate ^^, Custom code vv // Boilerplate ^^, Custom code vv
@ -40,7 +62,16 @@ fn handleRequest(allocator: std.mem.Allocator, request: interface.ZigRequest, re
// setup // setup
var response_writer = response.body.writer(); var response_writer = response.body.writer();
// real work // real work
response_writer.print(" {d}", .{request.headers.len}) catch unreachable; for (request.headers) |h| {
const header = interface.toZigHeader(h);
if (!std.ascii.eqlIgnoreCase(header.name, "host")) continue;
if (std.mem.startsWith(u8, header.value, "iam")) {
try response_writer.print("iam response", .{});
return;
}
break;
}
try response_writer.print(" {d}", .{request.headers.len});
try response.headers.put("X-custom-foo", "bar"); try response.headers.put("X-custom-foo", "bar");
log.info("handlerequest header count {d}", .{response.headers.count()}); log.info("handlerequest header count {d}", .{response.headers.count()});
} }

View File

@ -4,19 +4,44 @@ const interface = @import("interface.zig");
const Watch = @import("Watch.zig"); const Watch = @import("Watch.zig");
const config = @import("config.zig"); const config = @import("config.zig");
const log = std.log.scoped(.main);
// logging options
pub const std_options = struct {
pub const log_level = switch (builtin.mode) {
.Debug => .debug,
.ReleaseSafe => .info,
.ReleaseFast, .ReleaseSmall => .err,
};
pub const log_scope_levels = &[_]std.log.ScopeLevel{
.{ .scope = .watch, .level = .info },
};
};
const serveFn = *const fn (*interface.Request) ?*interface.Response; const serveFn = *const fn (*interface.Request) ?*interface.Response;
const zigInitFn = *const fn (*anyopaque) void; const zigInitFn = *const fn (*anyopaque) void;
const requestDeinitFn = *const fn () void; const requestDeinitFn = *const fn () void;
const timeout = 250; const timeout = 250;
var watcher = Watch.init(executorChanged);
var watcher_thread: ?std.Thread = null;
// Timer used by processRequest to provide ttfb/ttlb data in output
var timer: ?std.time.Timer = null;
const FullReturn = struct { const FullReturn = struct {
response: []u8, response: []u8,
executor: *Executor, executor: *Executor,
}; };
// Executor structure, including functions that were found in the library
// and accounting. Also contains match data to determine if an executor
// applies
const Executor = struct { const Executor = struct {
// configuration // configuration
target_prefix: []const u8, match_data: []const u8,
path: [:0]const u8, path: [:0]const u8,
// fields used at runtime to do real work // fields used at runtime to do real work
@ -27,35 +52,26 @@ const Executor = struct {
// fields used for internal accounting // fields used for internal accounting
watch: ?usize = null, watch: ?usize = null,
reload_lock: bool = false, drain_in_progress: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
in_request_lock: bool = false, load_in_progress: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
requests_in_flight: std.atomic.Atomic(usize) = std.atomic.Atomic(usize).init(0),
}; };
var watcher = Watch.init(executorChanged);
var watcher_thread: ?std.Thread = null;
var timer: ?std.time.Timer = null; // timer used by processRequest
const log = std.log.scoped(.main);
pub const std_options = struct {
pub const log_level = .debug;
pub const log_scope_levels = &[_]std.log.ScopeLevel{
.{ .scope = .watch, .level = .info },
};
};
const SERVE_FN_NAME = "handle_request"; const SERVE_FN_NAME = "handle_request";
const PORT = 8069; const PORT = 8069; // TODO: Update based on environment variable
var initial_request_buffer: usize = 1; // TODO: Update based on environment variable
var executors: []Executor = undefined; var executors: []Executor = undefined;
var parsed_config: config.ParsedConfig = undefined;
/// Serves a single request. Finds executor, marshalls request data for the C
/// interface, calls the executor and marshalls data back
fn serve(allocator: *std.mem.Allocator, response: *std.http.Server.Response) !*FullReturn { fn serve(allocator: *std.mem.Allocator, response: *std.http.Server.Response) !*FullReturn {
// if (some path routing thing) { const executor = try getExecutor(response.request.target, response.request.headers);
const executor = try getExecutor(response.request.target); errdefer _ = executor.requests_in_flight.fetchSub(1, .Monotonic);
if (executor.zigInitFn) |f| if (executor.zigInitFn) |f|
f(allocator); f(allocator);
executor.in_request_lock = true;
errdefer executor.in_request_lock = false;
// Call external library // Call external library
const method_tag = @tagName(response.request.method); const method_tag = @tagName(response.request.method);
const headers = try toHeaders(allocator.*, response.request.headers); const headers = try toHeaders(allocator.*, response.request.headers);
@ -76,7 +92,7 @@ fn serve(allocator: *std.mem.Allocator, response: *std.http.Server.Response) !*F
}; };
var serve_result = executor.serveFn.?(&request).?; // ok for this pointer deref to fail var serve_result = executor.serveFn.?(&request).?; // ok for this pointer deref to fail
log.debug("target: {s}", .{response.request.target}); log.debug("target: {s}", .{response.request.target});
log.warn("response ptr: {*}", .{serve_result.ptr}); // BUG: This works in tests, but does not when compiled (even debug mode) log.debug("response ptr: {*}", .{serve_result.ptr});
var slice: []u8 = serve_result.ptr[0..serve_result.len]; var slice: []u8 = serve_result.ptr[0..serve_result.len];
log.debug("response body: {s}", .{slice}); log.debug("response body: {s}", .{slice});
@ -100,27 +116,47 @@ fn serve(allocator: *std.mem.Allocator, response: *std.http.Server.Response) !*F
rc.response = slice; rc.response = slice;
return rc; return rc;
} }
fn getExecutor(requested_path: []const u8) !*Executor {
/// Gets and executor based on request data
fn getExecutor(requested_path: []const u8, headers: std.http.Headers) !*Executor {
var executor = blk: { var executor = blk: {
for (executors) |*exec| { for (executors) |*exec| {
if (std.mem.startsWith(u8, requested_path, exec.target_prefix)) { if (executorIsMatch(exec.match_data, requested_path, headers)) {
break :blk exec; break :blk exec;
} }
} }
log.err("Could not find executor for target path '{s}'", .{requested_path}); log.err("Could not find executor for target path '{s}'", .{requested_path});
return error.NoApplicableExecutor; return error.NoApplicableExecutor;
}; };
while (executor.drain_in_progress.load(.Acquire)) {
// we need to stand down and stand by
std.atomic.spinLoopHint(); // Let CPU know we're just hanging out
std.time.sleep(1 * std.time.ns_per_ms / 2);
}
// Tell everyone we're about to use this bad boy
// While requests_in_flight is >= 0, nobody should unload the library
_ = executor.requests_in_flight.fetchAdd(1, .Acquire);
errdefer _ = executor.requests_in_flight.fetchSub(1, .Release);
if (executor.serveFn != null) return executor; if (executor.serveFn != null) return executor;
executor.library = blk: { executor.library = blk: {
if (executor.library) |l| if (executor.library) |l|
break :blk l; break :blk l;
while (executor.reload_lock) // system is reloading the library // If the library is being reloaded and a bunch of requests come in,
// we could have multiple threads racing to load
// NOTE: I am not confident of the memory ordering here on tryCompareAndSwap
while (executor.load_in_progress.tryCompareAndSwap(false, true, .Acquire, .Acquire)) |_| {
// we need to stand down and stand by
std.atomic.spinLoopHint(); // Let CPU know we're just hanging out
std.time.sleep(1 * std.time.ns_per_ms / 2); std.time.sleep(1 * std.time.ns_per_ms / 2);
}
// we have the conch...lock others out
defer executor.load_in_progress.store(false, .Release);
if (executor.library) |l| // check again to see where we are at if (executor.library) |l|
break :blk l; break :blk l; // someone beat us to the race..our defer above will take care of unlocking
log.info("library {s} requested but not loaded. Loading library", .{executor.path}); log.info("library {s} requested but not loaded. Loading library", .{executor.path});
const l = try dlopen(executor.path); const l = try dlopen(executor.path);
@ -139,6 +175,44 @@ fn getExecutor(requested_path: []const u8) !*Executor {
return executor; return executor;
} }
fn executorIsMatch(match_data: []const u8, requested_path: []const u8, headers: std.http.Headers) bool {
if (!std.mem.containsAtLeast(u8, match_data, 1, ":")) {
// match_data does not have a ':'. This means this is a straight path, without
// any header requirement. We can simply return a match prefix on the
// requested path
const rc = std.mem.startsWith(u8, requested_path, match_data);
if (rc) log.debug("executor match for path prefix '{s}'", .{match_data});
return rc;
}
const colon = std.mem.indexOf(u8, match_data, ":").?;
const header_needle = match_data[0..colon];
const header_inx = headers.firstIndexOf(header_needle) orelse return false;
// Apparently std.mem.split will return an empty first when the haystack starts
// with the delimiter
var split = std.mem.split(u8, std.mem.trim(u8, match_data[colon + 1 ..], "\t "), " ");
const header_value_needle = split.first();
const path_needle = split.next() orelse {
std.log.warn(
"Incorrect configuration. Header matching requires both header value and path prefix, space delimited. Key was '{s}'",
.{match_data},
);
return false;
};
// match_data includes some sort of header match as well. We assume the
// header match is a full match on the key (handled above)
// but a prefix match on the value
const request_header_value = headers.list.items[header_inx].value;
// (shoud this be case insensitive?)
if (!std.mem.startsWith(u8, request_header_value, header_value_needle)) return false;
// header value matches...return the path prefix match
const rc = std.mem.startsWith(u8, requested_path, path_needle);
if (rc) log.debug("executor match for header and path prefix '{s}'", .{match_data});
return rc;
}
/// Loads all optional symbols from the dynamic library. This has two entry
/// points, though the logic around the primary request handler is slighly
/// different in each case so we can't combine those two.
fn loadOptionalSymbols(executor: *Executor) void { fn loadOptionalSymbols(executor: *Executor) void {
if (std.c.dlsym(executor.library.?, "request_deinit")) |s| { if (std.c.dlsym(executor.library.?, "request_deinit")) |s| {
executor.requestDeinitFn = @ptrCast(requestDeinitFn, s); executor.requestDeinitFn = @ptrCast(requestDeinitFn, s);
@ -147,6 +221,11 @@ fn loadOptionalSymbols(executor: *Executor) void {
executor.zigInitFn = @ptrCast(zigInitFn, s); executor.zigInitFn = @ptrCast(zigInitFn, s);
} }
} }
/// Executor changed. This will be called by a sepearate thread that is
/// ultimately triggered from the operating system. This will wait for open
/// requests to that libary to complete, then lock out new requests until
/// the library is reloaded.
fn executorChanged(watch: usize) void { fn executorChanged(watch: usize) void {
// NOTE: This will be called off the main thread // NOTE: This will be called off the main thread
log.debug("executor with watch {d} changed", .{watch}); log.debug("executor with watch {d} changed", .{watch});
@ -154,10 +233,12 @@ fn executorChanged(watch: usize) void {
if (executor.watch) |w| { if (executor.watch) |w| {
if (w == watch) { if (w == watch) {
if (executor.library) |l| { if (executor.library) |l| {
while (executor.in_request_lock) executor.drain_in_progress.store(true, .Release);
defer executor.drain_in_progress.store(false, .Release);
while (executor.requests_in_flight.load(.Acquire) > 0) {
std.atomic.spinLoopHint();
std.time.sleep(1 * std.time.ns_per_ms / 2); std.time.sleep(1 * std.time.ns_per_ms / 2);
executor.reload_lock = true; }
defer executor.reload_lock = false;
if (std.c.dlclose(l) != 0) if (std.c.dlclose(l) != 0)
@panic("System unstable: Error after library open and cannot close"); @panic("System unstable: Error after library open and cannot close");
@ -181,6 +262,7 @@ fn executorChanged(watch: usize) void {
} }
} }
/// Wrapper to the c library to make us more ziggy
fn dlopen(path: [:0]const u8) !*anyopaque { fn dlopen(path: [:0]const u8) !*anyopaque {
// We need now (and local) because we're about to call it // We need now (and local) because we're about to call it
const lib = std.c.dlopen(path, std.c.RTLD.NOW); const lib = std.c.dlopen(path, std.c.RTLD.NOW);
@ -188,7 +270,8 @@ fn dlopen(path: [:0]const u8) !*anyopaque {
return error.CouldNotOpenDynamicLibrary; return error.CouldNotOpenDynamicLibrary;
} }
// fn exitApplication(sig: i32, info: *const std.os.siginfo_t, ctx_ptr: ?*const anyopaque,) callconv(.C) noreturn { /// Exits the application, which is wired up to SIGINT. This is the only
/// exit from the application as the main function has an infinite loop
fn exitApplication( fn exitApplication(
_: i32, _: i32,
_: *const std.os.siginfo_t, _: *const std.os.siginfo_t,
@ -198,6 +281,8 @@ fn exitApplication(
std.os.exit(0); std.os.exit(0);
} }
/// exitApp handles deinitialization for the application and any reporting
/// that needs to happen
fn exitApp(exitcode: u8) void { fn exitApp(exitcode: u8) void {
if (exitcode == 0) if (exitcode == 0)
std.io.getStdOut().writer().print("termination request: stopping watch\n", .{}) catch {} std.io.getStdOut().writer().print("termination request: stopping watch\n", .{}) catch {}
@ -210,10 +295,15 @@ fn exitApp(exitcode: u8) void {
std.os.exit(exitcode); std.os.exit(exitcode);
// joining threads will hang...we're ultimately in a signal handler. // joining threads will hang...we're ultimately in a signal handler.
// But everything is shut down cleanly now, so I don't think it hurts to // But everything is shut down cleanly now, so I don't think it hurts to
// just kill it all // just kill it all (NOTE: from a practical perspective, we do not seem
// to get a clean shutdown in all cases)
// if (watcher_thread) |t| // if (watcher_thread) |t|
// t.join(); // t.join();
} }
/// reloadConfig is wired to SIGHUP and will reload all executors. In its
/// current state, this is not a safe function as no connection draining
/// has been implemented. Operates off the main thread
fn reloadConfig( fn reloadConfig(
_: i32, _: i32,
_: *const std.os.siginfo_t, _: *const std.os.siginfo_t,
@ -231,6 +321,8 @@ fn reloadConfig(
@panic("Could not reload config"); @panic("Could not reload config");
}; };
} }
/// Installs all signal handlers for shutdown and configuration reload
fn installSignalHandler() !void { fn installSignalHandler() !void {
var act = std.os.Sigaction{ var act = std.os.Sigaction{
.handler = .{ .sigaction = exitApplication }, .handler = .{ .sigaction = exitApplication },
@ -255,9 +347,9 @@ pub fn main() !void {
// stdout is for the actual output of your application, for example if you // stdout is for the actual output of your application, for example if you
// are implementing gzip, then only the compressed bytes should be sent to // are implementing gzip, then only the compressed bytes should be sent to
// stdout, not any debugging messages. // stdout, not any debugging messages.
const stdout_file = std.io.getStdOut().writer(); // const stdout_file = std.io.getStdOut().writer();
var bw = std.io.bufferedWriter(stdout_file); // var bw = std.io.bufferedWriter(stdout_file);
const stdout = bw.writer(); // const stdout = bw.writer();
var allocator = std.heap.raw_c_allocator; // raw allocator recommended for use in arenas var allocator = std.heap.raw_c_allocator; // raw allocator recommended for use in arenas
executors = try loadConfig(allocator); executors = try loadConfig(allocator);
@ -277,6 +369,38 @@ pub fn main() !void {
log.info("pid: {d}", .{std.os.linux.getpid()}); log.info("pid: {d}", .{std.os.linux.getpid()});
try installSignalHandler(); try installSignalHandler();
var server_thread_count = if (std.os.getenv("SERVER_THREAD_COUNT")) |count|
try std.fmt.parseInt(usize, count, 10)
else switch (builtin.mode) {
.Debug => std.math.min(4, try std.Thread.getCpuCount()),
else => try std.Thread.getCpuCount(),
};
switch (builtin.mode) {
.Debug => log.info("serving using {d} threads (debug build: capped at 4)", .{server_thread_count}),
else => log.info("serving using {d} threads", .{server_thread_count}),
}
var server_threads = try std.ArrayList(std.Thread).initCapacity(allocator, server_thread_count);
defer server_threads.deinit();
// Set up thread pool
for (0..server_thread_count) |inx| {
server_threads.appendAssumeCapacity(try std.Thread.spawn(
.{},
threadMain,
.{ allocator, &server, inx },
));
}
// main thread will no longer do anything
std.time.sleep(std.math.maxInt(u64));
for (server_threads.items) |thread| thread.join();
}
fn threadMain(allocator: std.mem.Allocator, server: *std.http.Server, thread_number: usize) !void {
// TODO: If we're in a thread pool we need to be careful with this...
const stdout_file = std.io.getStdOut().writer();
var bw = std.io.bufferedWriter(stdout_file);
const stdout = bw.writer();
log.info("starting server thread {d}, tid {d}", .{ thread_number, std.Thread.getCurrentId() });
var arena = std.heap.ArenaAllocator.init(allocator); var arena = std.heap.ArenaAllocator.init(allocator);
defer arena.deinit(); defer arena.deinit();
var aa = arena.allocator(); var aa = arena.allocator();
@ -292,7 +416,7 @@ pub fn main() !void {
} }
} }
processRequest(&aa, &server, stdout) catch |e| { processRequest(&aa, server, stdout) catch |e| {
log.err("Unexpected error processing request: {any}", .{e}); log.err("Unexpected error processing request: {any}", .{e});
if (@errorReturnTrace()) |trace| { if (@errorReturnTrace()) |trace| {
std.debug.dumpStackTrace(trace.*); std.debug.dumpStackTrace(trace.*);
@ -302,7 +426,7 @@ pub fn main() !void {
try bw.flush(); try bw.flush();
} }
} }
var parsed_config: config.ParsedConfig = undefined;
fn loadConfig(allocator: std.mem.Allocator) ![]Executor { fn loadConfig(allocator: std.mem.Allocator) ![]Executor {
log.info("loading config", .{}); log.info("loading config", .{});
// We will not watch this file - let it reload on SIGHUP // We will not watch this file - let it reload on SIGHUP
@ -313,21 +437,23 @@ fn loadConfig(allocator: std.mem.Allocator) ![]Executor {
defer al.deinit(); defer al.deinit();
for (parsed_config.key_value_map.keys(), parsed_config.key_value_map.values()) |k, v| { for (parsed_config.key_value_map.keys(), parsed_config.key_value_map.values()) |k, v| {
al.appendAssumeCapacity(.{ al.appendAssumeCapacity(.{
.target_prefix = k, .match_data = k,
.path = v, .path = v,
}); });
} }
log.info("config loaded", .{}); log.info("config loaded", .{});
return al.toOwnedSlice(); return al.toOwnedSlice(); // TODO: This should return a struct with the parsed config and executors
} }
/// main loop calls processRequest, which is responsible for the interface
/// with logs, connection accounting, etc. The work dealing with the request
/// itself is delegated to the serve function to work with the executor
fn processRequest(allocator: *std.mem.Allocator, server: *std.http.Server, writer: anytype) !void { fn processRequest(allocator: *std.mem.Allocator, server: *std.http.Server, writer: anytype) !void {
const max_header_size = 8192;
if (timer == null) timer = try std.time.Timer.start(); if (timer == null) timer = try std.time.Timer.start();
var tm = timer.?; var tm = timer.?;
const res = try server.accept(.{ .dynamic = max_header_size }); var res = try server.accept(.{ .allocator = allocator.* });
defer res.deinit(); defer res.deinit();
defer res.reset(); defer _ = res.reset();
try res.wait(); // wait for client to send a complete request head try res.wait(); // wait for client to send a complete request head
// I believe it's fair to start our timer after this is done // I believe it's fair to start our timer after this is done
tm.reset(); tm.reset();
@ -345,7 +471,7 @@ fn processRequest(allocator: *std.mem.Allocator, server: *std.http.Server, write
var errbuf: [errstr.len]u8 = undefined; var errbuf: [errstr.len]u8 = undefined;
var response_bytes = try std.fmt.bufPrint(&errbuf, errstr, .{}); var response_bytes = try std.fmt.bufPrint(&errbuf, errstr, .{});
var full_response = serve(allocator, res) catch |e| brk: { var full_response = serve(allocator, &res) catch |e| brk: {
res.status = .internal_server_error; res.status = .internal_server_error;
// TODO: more about this particular request // TODO: more about this particular request
log.err("Unexpected error from executor processing request: {any}", .{e}); log.err("Unexpected error from executor processing request: {any}", .{e});
@ -355,9 +481,16 @@ fn processRequest(allocator: *std.mem.Allocator, server: *std.http.Server, write
break :brk null; break :brk null;
}; };
defer { defer {
// The call above converts uncaught errors to a null
// In request counter gets incremented during getExecutor
// Caught errors will get our in_flight counter decremented (see serve fn)
// - we cannot do this here because we may not have an executor at all
// This leaves this defer block as the only place we can/should decrement
// under normal conditions
if (full_response) |f| { if (full_response) |f| {
if (f.executor.requestDeinitFn) |d| d(); if (f.executor.requestDeinitFn) |d| d();
f.executor.in_request_lock = false; _ = f.executor.requests_in_flight.fetchSub(1, .Release);
} }
} }
if (full_response) |f| if (full_response) |f|
@ -365,7 +498,7 @@ fn processRequest(allocator: *std.mem.Allocator, server: *std.http.Server, write
res.transfer_encoding = .{ .content_length = response_bytes.len }; res.transfer_encoding = .{ .content_length = response_bytes.len };
try res.headers.append("connection", "close"); try res.headers.append("connection", "close");
try writer.print(" {d} ttfb {d:.3}ms", .{ @enumToInt(res.status), @intToFloat(f64, tm.read()) / std.time.ns_per_ms }); try writer.print(" {d} ttfb {d:.3}ms", .{ @enumToInt(res.status), @intToFloat(f64, tm.read()) / std.time.ns_per_ms });
if (builtin.is_test) writeToTestBuffers(response_bytes, res); if (builtin.is_test) writeToTestBuffers(response_bytes, &res);
try res.do(); try res.do();
_ = try res.writer().writeAll(response_bytes); _ = try res.writer().writeAll(response_bytes);
try res.finish(); try res.finish();
@ -440,7 +573,7 @@ fn testRequest(request_bytes: []const u8) !void {
var aa = arena.allocator(); var aa = arena.allocator();
var bytes_allocated: usize = 0; var bytes_allocated: usize = 0;
// pre-warm // pre-warm
bytes_allocated = try preWarmArena(aa, &arena, 1); bytes_allocated = try preWarmArena(aa, &arena, initial_request_buffer);
const server_thread = try std.Thread.spawn( const server_thread = try std.Thread.spawn(
.{}, .{},
processRequest, processRequest,