Compare commits
10 Commits
327ebae4f2
...
dfe53a6e5e
Author | SHA1 | Date | |
---|---|---|---|
dfe53a6e5e | |||
b335108f0d | |||
626a2e618c | |||
eaef2c6ddc | |||
7b271a0698 | |||
df5cb27e8e | |||
1f9fd19771 | |||
2004d97919 | |||
e0bf317706 | |||
4d4f19d7bc |
72
README.md
72
README.md
|
@ -0,0 +1,72 @@
|
||||||
|
[![](flexilib.png)](flexilib.png)
|
||||||
|
|
||||||
|
FlexiLib
|
||||||
|
========
|
||||||
|
|
||||||
|
This is a web server written with the following goals:
|
||||||
|
|
||||||
|
* Low memory consumption
|
||||||
|
* Low latency
|
||||||
|
* Flexible "reverse proxy" capabilities
|
||||||
|
* Ability to simulate FAAS capabilities of various online providers (AWS, CloudFlare, etc)
|
||||||
|
|
||||||
|
This last point is indirectly supported through the ability of the server to
|
||||||
|
load, at run time, dynamic libraries to support requests. It will also reload
|
||||||
|
these libraries after any in flight requests have completed, to support the
|
||||||
|
experience of developing new libaries.
|
||||||
|
|
||||||
|
Libraries can be written in any programming language that supports a standard
|
||||||
|
Linux C-Based calling convention, which is to say, nearly every programming
|
||||||
|
language.
|
||||||
|
|
||||||
|
This project provides slightly better development and performance characteristics
|
||||||
|
if the library used is written in [zig](https://ziglang.org). An example zig-based
|
||||||
|
library can be found in src/main-lib.zig.
|
||||||
|
|
||||||
|
Architecture
|
||||||
|
------------
|
||||||
|
|
||||||
|
We assume Linux.
|
||||||
|
To achieve the lowest latency possible and eliminate the proliferation, The architecture of this server is setup
|
||||||
|
|
||||||
|
Security
|
||||||
|
--------
|
||||||
|
|
||||||
|
There is little attempt to secure libraries from interfering with the current
|
||||||
|
thread or even the main process. As such, the libraries should be fully trusted.
|
||||||
|
However, libraries themselves may be hardened to run other non-trusted code.
|
||||||
|
For example: A "I run WASM code" library may be written to create a WASM VM and
|
||||||
|
run user-supplied WASM code. In that case, the "I run WASM code" library is
|
||||||
|
trusted, although the code it runs may not be.
|
||||||
|
|
||||||
|
Configuration
|
||||||
|
-------------
|
||||||
|
|
||||||
|
Very little has been done so far in terms of configuration. By default, the
|
||||||
|
number of threads created to serve requests is equal to the number of CPUs
|
||||||
|
reported by the system (although thread count is limited to 4 threads when
|
||||||
|
compiled in debug mode). This can be controlled with the environment variable
|
||||||
|
`SERVER_THREAD_COUNT`.
|
||||||
|
|
||||||
|
Future plans include an environment variable for IP address and port to listen
|
||||||
|
on, as well as the amount of pre-allocated memory for response data (currently
|
||||||
|
hardcoded to 1k/thread). Pre-allocated memory reduces the number of system
|
||||||
|
calls required for memory allocation, and pre-allocation/allocation statistics
|
||||||
|
per request are reported in the logs.
|
||||||
|
|
||||||
|
Logs
|
||||||
|
----
|
||||||
|
|
||||||
|
Request logs are sent to standard out, and are likely to change. Here is a sample:
|
||||||
|
|
||||||
|
```
|
||||||
|
127.0.0.1:59940 - - "GET / HTTP/1.1" 200 ttfb 2000.420ms 11 ttlb 2000.568ms (pre-alloc: 1569, alloc: 4350)
|
||||||
|
```
|
||||||
|
|
||||||
|
The first part mirrors common logs from Apache/nginx.
|
||||||
|
|
||||||
|
ttfb: Time to first byte. This represents the number of ms of processing within the library
|
||||||
|
ttlb: Time to last byte. This includes processing as well a transmission of data
|
||||||
|
pre-alloc: The amount of memory actually pre-allocated (1k is just a minimum and the system may allocate more)
|
||||||
|
alloc: The amount of memory actually allocated during the request
|
||||||
|
|
|
@ -4,6 +4,8 @@
|
||||||
# for libraries to self-select whether they can handle a request, which opens
|
# for libraries to self-select whether they can handle a request, which opens
|
||||||
# up additional possibilities
|
# up additional possibilities
|
||||||
|
|
||||||
|
# Example of match based on an HTTP header. The key is space-delimited:
|
||||||
|
# <http header key>: <header match prefix> <path match prefix>
|
||||||
|
Host: iam / = zig-out/lib/libfaas-proxy-sample-lib.so
|
||||||
/c = zig-out/lib/libfaas-proxy-sample-lib-in-c.so
|
/c = zig-out/lib/libfaas-proxy-sample-lib-in-c.so
|
||||||
/ = zig-out/lib/libfaas-proxy-sample-lib.so
|
/ = zig-out/lib/libfaas-proxy-sample-lib.so
|
||||||
Host: iam.aws.lerch.org / = zig-out/lib/libfaas-proxy-sample-lib.so
|
|
||||||
|
|
|
@ -73,12 +73,12 @@ pub fn startWatch(self: *Self) void {
|
||||||
self.watch_started = true;
|
self.watch_started = true;
|
||||||
|
|
||||||
var fds = if (self.inotify_fd == null)
|
var fds = if (self.inotify_fd == null)
|
||||||
&[_]std.os.pollfd{.{ .fd = self.control_socket.?, .events = std.os.POLL.IN, .revents = undefined }}
|
@constCast(&[_]std.os.pollfd{.{ .fd = self.control_socket.?, .events = std.os.POLL.IN, .revents = undefined }})
|
||||||
else
|
else
|
||||||
&[_]std.os.pollfd{
|
@constCast(&[_]std.os.pollfd{
|
||||||
.{ .fd = self.control_socket.?, .events = std.os.POLL.IN, .revents = undefined },
|
.{ .fd = self.control_socket.?, .events = std.os.POLL.IN, .revents = undefined },
|
||||||
.{ .fd = self.inotify_fd.?, .events = std.os.POLL.IN, .revents = undefined },
|
.{ .fd = self.inotify_fd.?, .events = std.os.POLL.IN, .revents = undefined },
|
||||||
};
|
});
|
||||||
|
|
||||||
const control_fd_inx = 0;
|
const control_fd_inx = 0;
|
||||||
const inotify_fd_inx = 1;
|
const inotify_fd_inx = 1;
|
||||||
|
|
|
@ -38,6 +38,11 @@ pub const ZigRequest = struct {
|
||||||
headers: []Header,
|
headers: []Header,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
pub const ZigHeader = struct {
|
||||||
|
name: []u8,
|
||||||
|
value: []u8,
|
||||||
|
};
|
||||||
|
|
||||||
pub const ZigResponse = struct {
|
pub const ZigResponse = struct {
|
||||||
body: *std.ArrayList(u8),
|
body: *std.ArrayList(u8),
|
||||||
headers: *std.StringHashMap([]const u8),
|
headers: *std.StringHashMap([]const u8),
|
||||||
|
@ -55,10 +60,17 @@ pub fn zigInit(parent_allocator: *anyopaque) callconv(.C) void {
|
||||||
allocator = @ptrCast(*std.mem.Allocator, @alignCast(@alignOf(*std.mem.Allocator), parent_allocator));
|
allocator = @ptrCast(*std.mem.Allocator, @alignCast(@alignOf(*std.mem.Allocator), parent_allocator));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn toZigHeader(header: Header) ZigHeader {
|
||||||
|
return .{
|
||||||
|
.name = header.name_ptr[0..header.name_len],
|
||||||
|
.value = header.value_ptr[0..header.value_len],
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
/// Converts a StringHashMap to the structure necessary for passing through the
|
/// Converts a StringHashMap to the structure necessary for passing through the
|
||||||
/// C boundary. This will be called automatically for you via the handleRequest function
|
/// C boundary. This will be called automatically for you via the handleRequest function
|
||||||
/// and is also used by the main processing loop to coerce request headers
|
/// and is also used by the main processing loop to coerce request headers
|
||||||
pub fn toHeaders(alloc: std.mem.Allocator, headers: std.StringHashMap([]const u8)) ![*]Header {
|
fn toHeaders(alloc: std.mem.Allocator, headers: std.StringHashMap([]const u8)) ![*]Header {
|
||||||
var header_array = try std.ArrayList(Header).initCapacity(alloc, headers.count());
|
var header_array = try std.ArrayList(Header).initCapacity(alloc, headers.count());
|
||||||
var iterator = headers.iterator();
|
var iterator = headers.iterator();
|
||||||
while (iterator.next()) |kv| {
|
while (iterator.next()) |kv| {
|
||||||
|
|
|
@ -4,30 +4,52 @@ const testing = std.testing;
|
||||||
|
|
||||||
const log = std.log.scoped(.@"main-lib");
|
const log = std.log.scoped(.@"main-lib");
|
||||||
|
|
||||||
// request_deinit is an optional export and will be called a the end of the
|
// The main program will look for exports during the request lifecycle:
|
||||||
// request. Useful for deallocating memory
|
// zigInit (optional): called at the beginning of a request, includes pointer to an allocator
|
||||||
// export fn request_deinit() void {
|
// handle_request: called with request data, expects response data
|
||||||
|
// request_deinit (optional): called at the end of a request to allow resource cleanup
|
||||||
|
//
|
||||||
|
// Setup for these is aided by the interface library as shown below
|
||||||
|
|
||||||
|
// zigInit is an optional export called at the beginning of a request. It will
|
||||||
|
// be passed an allocator (which...shh...is an arena allocator). Since the
|
||||||
|
// interface library provides a request handler that requires a built-in allocator,
|
||||||
|
// if you are using the interface's handleRequest function as shown above,
|
||||||
|
// you will need to also include this export. To customize, just do something
|
||||||
|
// like this:
|
||||||
|
//
|
||||||
|
// export fn zigInit(parent_allocator: *anyopaque) callconv(.C) void {
|
||||||
|
// // your code here, just include the next line
|
||||||
|
// interface.zigInit(parent_allocator);
|
||||||
// }
|
// }
|
||||||
|
//
|
||||||
|
comptime {
|
||||||
|
@export(interface.zigInit, .{ .name = "zigInit", .linkage = .Strong });
|
||||||
|
}
|
||||||
|
|
||||||
/// handle_request will be called on a single request, but due to the preservation
|
/// handle_request will be called on a single request, but due to the preservation
|
||||||
/// of restrictions imposed by the calling interface, it should generally be more
|
/// of restrictions imposed by the calling interface, it should generally be more
|
||||||
/// useful to call into the interface library to let it do the conversion work
|
/// useful to call into the interface library to let it do the conversion work
|
||||||
/// on your behalf
|
/// on your behalf
|
||||||
export fn handle_request(request: *interface.Request) callconv(.C) ?*interface.Response {
|
export fn handle_request(request: *interface.Request) callconv(.C) ?*interface.Response {
|
||||||
|
// The interface library provides a handleRequest function that will handle
|
||||||
|
// marshalling data back and forth from the C format used for the interface
|
||||||
|
// to a more Zig friendly format. It also allows usage of zig errors. To
|
||||||
|
// use, pass in the request and the zig function used to handle the request
|
||||||
|
// (here called "handleRequest"). The function signature must be:
|
||||||
|
//
|
||||||
|
// fn (std.mem.Allocator, interface.ZigRequest, interface.ZigResponse) !void
|
||||||
|
//
|
||||||
return interface.handleRequest(request, handleRequest);
|
return interface.handleRequest(request, handleRequest);
|
||||||
}
|
}
|
||||||
|
|
||||||
// zigInit is an optional export called at the beginning of a request. It will
|
// request_deinit is an optional export and will be called a the end of the
|
||||||
// be passed an allocator (which...shh...is an arena allocator). Since the
|
// request. Useful for deallocating memory. Since this is zig code and the
|
||||||
// interface library provides a request handler that requires a built-in allocator,
|
// allocator used is an arena allocator, all allocated memory will be automatically
|
||||||
// if you are using the interface library, you will need to also include this
|
// cleaned up by the main program at the end of a request
|
||||||
// export
|
//
|
||||||
comptime {
|
// export fn request_deinit() void {
|
||||||
@export(
|
// }
|
||||||
interface.zigInit,
|
|
||||||
.{ .name = "zigInit", .linkage = .Strong },
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
// ************************************************************************
|
// ************************************************************************
|
||||||
// Boilerplate ^^, Custom code vv
|
// Boilerplate ^^, Custom code vv
|
||||||
|
@ -40,7 +62,16 @@ fn handleRequest(allocator: std.mem.Allocator, request: interface.ZigRequest, re
|
||||||
// setup
|
// setup
|
||||||
var response_writer = response.body.writer();
|
var response_writer = response.body.writer();
|
||||||
// real work
|
// real work
|
||||||
response_writer.print(" {d}", .{request.headers.len}) catch unreachable;
|
for (request.headers) |h| {
|
||||||
|
const header = interface.toZigHeader(h);
|
||||||
|
if (!std.ascii.eqlIgnoreCase(header.name, "host")) continue;
|
||||||
|
if (std.mem.startsWith(u8, header.value, "iam")) {
|
||||||
|
try response_writer.print("iam response", .{});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
try response_writer.print(" {d}", .{request.headers.len});
|
||||||
try response.headers.put("X-custom-foo", "bar");
|
try response.headers.put("X-custom-foo", "bar");
|
||||||
log.info("handlerequest header count {d}", .{response.headers.count()});
|
log.info("handlerequest header count {d}", .{response.headers.count()});
|
||||||
}
|
}
|
||||||
|
|
223
src/main.zig
223
src/main.zig
|
@ -4,19 +4,44 @@ const interface = @import("interface.zig");
|
||||||
const Watch = @import("Watch.zig");
|
const Watch = @import("Watch.zig");
|
||||||
const config = @import("config.zig");
|
const config = @import("config.zig");
|
||||||
|
|
||||||
|
const log = std.log.scoped(.main);
|
||||||
|
|
||||||
|
// logging options
|
||||||
|
pub const std_options = struct {
|
||||||
|
pub const log_level = switch (builtin.mode) {
|
||||||
|
.Debug => .debug,
|
||||||
|
.ReleaseSafe => .info,
|
||||||
|
.ReleaseFast, .ReleaseSmall => .err,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub const log_scope_levels = &[_]std.log.ScopeLevel{
|
||||||
|
.{ .scope = .watch, .level = .info },
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
const serveFn = *const fn (*interface.Request) ?*interface.Response;
|
const serveFn = *const fn (*interface.Request) ?*interface.Response;
|
||||||
const zigInitFn = *const fn (*anyopaque) void;
|
const zigInitFn = *const fn (*anyopaque) void;
|
||||||
const requestDeinitFn = *const fn () void;
|
const requestDeinitFn = *const fn () void;
|
||||||
|
|
||||||
const timeout = 250;
|
const timeout = 250;
|
||||||
|
|
||||||
|
var watcher = Watch.init(executorChanged);
|
||||||
|
var watcher_thread: ?std.Thread = null;
|
||||||
|
|
||||||
|
// Timer used by processRequest to provide ttfb/ttlb data in output
|
||||||
|
var timer: ?std.time.Timer = null;
|
||||||
|
|
||||||
const FullReturn = struct {
|
const FullReturn = struct {
|
||||||
response: []u8,
|
response: []u8,
|
||||||
executor: *Executor,
|
executor: *Executor,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Executor structure, including functions that were found in the library
|
||||||
|
// and accounting. Also contains match data to determine if an executor
|
||||||
|
// applies
|
||||||
const Executor = struct {
|
const Executor = struct {
|
||||||
// configuration
|
// configuration
|
||||||
target_prefix: []const u8,
|
match_data: []const u8,
|
||||||
path: [:0]const u8,
|
path: [:0]const u8,
|
||||||
|
|
||||||
// fields used at runtime to do real work
|
// fields used at runtime to do real work
|
||||||
|
@ -27,35 +52,26 @@ const Executor = struct {
|
||||||
|
|
||||||
// fields used for internal accounting
|
// fields used for internal accounting
|
||||||
watch: ?usize = null,
|
watch: ?usize = null,
|
||||||
reload_lock: bool = false,
|
drain_in_progress: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
|
||||||
in_request_lock: bool = false,
|
load_in_progress: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
|
||||||
|
requests_in_flight: std.atomic.Atomic(usize) = std.atomic.Atomic(usize).init(0),
|
||||||
};
|
};
|
||||||
|
|
||||||
var watcher = Watch.init(executorChanged);
|
|
||||||
var watcher_thread: ?std.Thread = null;
|
|
||||||
var timer: ?std.time.Timer = null; // timer used by processRequest
|
|
||||||
|
|
||||||
const log = std.log.scoped(.main);
|
|
||||||
pub const std_options = struct {
|
|
||||||
pub const log_level = .debug;
|
|
||||||
|
|
||||||
pub const log_scope_levels = &[_]std.log.ScopeLevel{
|
|
||||||
.{ .scope = .watch, .level = .info },
|
|
||||||
};
|
|
||||||
};
|
|
||||||
const SERVE_FN_NAME = "handle_request";
|
const SERVE_FN_NAME = "handle_request";
|
||||||
const PORT = 8069;
|
const PORT = 8069; // TODO: Update based on environment variable
|
||||||
|
var initial_request_buffer: usize = 1; // TODO: Update based on environment variable
|
||||||
|
|
||||||
var executors: []Executor = undefined;
|
var executors: []Executor = undefined;
|
||||||
|
var parsed_config: config.ParsedConfig = undefined;
|
||||||
|
|
||||||
|
/// Serves a single request. Finds executor, marshalls request data for the C
|
||||||
|
/// interface, calls the executor and marshalls data back
|
||||||
fn serve(allocator: *std.mem.Allocator, response: *std.http.Server.Response) !*FullReturn {
|
fn serve(allocator: *std.mem.Allocator, response: *std.http.Server.Response) !*FullReturn {
|
||||||
// if (some path routing thing) {
|
const executor = try getExecutor(response.request.target, response.request.headers);
|
||||||
const executor = try getExecutor(response.request.target);
|
errdefer _ = executor.requests_in_flight.fetchSub(1, .Monotonic);
|
||||||
if (executor.zigInitFn) |f|
|
if (executor.zigInitFn) |f|
|
||||||
f(allocator);
|
f(allocator);
|
||||||
|
|
||||||
executor.in_request_lock = true;
|
|
||||||
errdefer executor.in_request_lock = false;
|
|
||||||
// Call external library
|
// Call external library
|
||||||
const method_tag = @tagName(response.request.method);
|
const method_tag = @tagName(response.request.method);
|
||||||
const headers = try toHeaders(allocator.*, response.request.headers);
|
const headers = try toHeaders(allocator.*, response.request.headers);
|
||||||
|
@ -76,7 +92,7 @@ fn serve(allocator: *std.mem.Allocator, response: *std.http.Server.Response) !*F
|
||||||
};
|
};
|
||||||
var serve_result = executor.serveFn.?(&request).?; // ok for this pointer deref to fail
|
var serve_result = executor.serveFn.?(&request).?; // ok for this pointer deref to fail
|
||||||
log.debug("target: {s}", .{response.request.target});
|
log.debug("target: {s}", .{response.request.target});
|
||||||
log.warn("response ptr: {*}", .{serve_result.ptr}); // BUG: This works in tests, but does not when compiled (even debug mode)
|
log.debug("response ptr: {*}", .{serve_result.ptr});
|
||||||
var slice: []u8 = serve_result.ptr[0..serve_result.len];
|
var slice: []u8 = serve_result.ptr[0..serve_result.len];
|
||||||
log.debug("response body: {s}", .{slice});
|
log.debug("response body: {s}", .{slice});
|
||||||
|
|
||||||
|
@ -100,27 +116,47 @@ fn serve(allocator: *std.mem.Allocator, response: *std.http.Server.Response) !*F
|
||||||
rc.response = slice;
|
rc.response = slice;
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
fn getExecutor(requested_path: []const u8) !*Executor {
|
|
||||||
|
/// Gets and executor based on request data
|
||||||
|
fn getExecutor(requested_path: []const u8, headers: std.http.Headers) !*Executor {
|
||||||
var executor = blk: {
|
var executor = blk: {
|
||||||
for (executors) |*exec| {
|
for (executors) |*exec| {
|
||||||
if (std.mem.startsWith(u8, requested_path, exec.target_prefix)) {
|
if (executorIsMatch(exec.match_data, requested_path, headers)) {
|
||||||
break :blk exec;
|
break :blk exec;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.err("Could not find executor for target path '{s}'", .{requested_path});
|
log.err("Could not find executor for target path '{s}'", .{requested_path});
|
||||||
return error.NoApplicableExecutor;
|
return error.NoApplicableExecutor;
|
||||||
};
|
};
|
||||||
|
while (executor.drain_in_progress.load(.Acquire)) {
|
||||||
|
// we need to stand down and stand by
|
||||||
|
std.atomic.spinLoopHint(); // Let CPU know we're just hanging out
|
||||||
|
std.time.sleep(1 * std.time.ns_per_ms / 2);
|
||||||
|
}
|
||||||
|
// Tell everyone we're about to use this bad boy
|
||||||
|
// While requests_in_flight is >= 0, nobody should unload the library
|
||||||
|
_ = executor.requests_in_flight.fetchAdd(1, .Acquire);
|
||||||
|
errdefer _ = executor.requests_in_flight.fetchSub(1, .Release);
|
||||||
|
|
||||||
if (executor.serveFn != null) return executor;
|
if (executor.serveFn != null) return executor;
|
||||||
|
|
||||||
executor.library = blk: {
|
executor.library = blk: {
|
||||||
if (executor.library) |l|
|
if (executor.library) |l|
|
||||||
break :blk l;
|
break :blk l;
|
||||||
|
|
||||||
while (executor.reload_lock) // system is reloading the library
|
// If the library is being reloaded and a bunch of requests come in,
|
||||||
|
// we could have multiple threads racing to load
|
||||||
|
// NOTE: I am not confident of the memory ordering here on tryCompareAndSwap
|
||||||
|
while (executor.load_in_progress.tryCompareAndSwap(false, true, .Acquire, .Acquire)) |_| {
|
||||||
|
// we need to stand down and stand by
|
||||||
|
std.atomic.spinLoopHint(); // Let CPU know we're just hanging out
|
||||||
std.time.sleep(1 * std.time.ns_per_ms / 2);
|
std.time.sleep(1 * std.time.ns_per_ms / 2);
|
||||||
|
}
|
||||||
|
// we have the conch...lock others out
|
||||||
|
defer executor.load_in_progress.store(false, .Release);
|
||||||
|
|
||||||
if (executor.library) |l| // check again to see where we are at
|
if (executor.library) |l|
|
||||||
break :blk l;
|
break :blk l; // someone beat us to the race..our defer above will take care of unlocking
|
||||||
|
|
||||||
log.info("library {s} requested but not loaded. Loading library", .{executor.path});
|
log.info("library {s} requested but not loaded. Loading library", .{executor.path});
|
||||||
const l = try dlopen(executor.path);
|
const l = try dlopen(executor.path);
|
||||||
|
@ -139,6 +175,44 @@ fn getExecutor(requested_path: []const u8) !*Executor {
|
||||||
return executor;
|
return executor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn executorIsMatch(match_data: []const u8, requested_path: []const u8, headers: std.http.Headers) bool {
|
||||||
|
if (!std.mem.containsAtLeast(u8, match_data, 1, ":")) {
|
||||||
|
// match_data does not have a ':'. This means this is a straight path, without
|
||||||
|
// any header requirement. We can simply return a match prefix on the
|
||||||
|
// requested path
|
||||||
|
const rc = std.mem.startsWith(u8, requested_path, match_data);
|
||||||
|
if (rc) log.debug("executor match for path prefix '{s}'", .{match_data});
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
const colon = std.mem.indexOf(u8, match_data, ":").?;
|
||||||
|
const header_needle = match_data[0..colon];
|
||||||
|
const header_inx = headers.firstIndexOf(header_needle) orelse return false;
|
||||||
|
// Apparently std.mem.split will return an empty first when the haystack starts
|
||||||
|
// with the delimiter
|
||||||
|
var split = std.mem.split(u8, std.mem.trim(u8, match_data[colon + 1 ..], "\t "), " ");
|
||||||
|
const header_value_needle = split.first();
|
||||||
|
const path_needle = split.next() orelse {
|
||||||
|
std.log.warn(
|
||||||
|
"Incorrect configuration. Header matching requires both header value and path prefix, space delimited. Key was '{s}'",
|
||||||
|
.{match_data},
|
||||||
|
);
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
// match_data includes some sort of header match as well. We assume the
|
||||||
|
// header match is a full match on the key (handled above)
|
||||||
|
// but a prefix match on the value
|
||||||
|
const request_header_value = headers.list.items[header_inx].value;
|
||||||
|
// (shoud this be case insensitive?)
|
||||||
|
if (!std.mem.startsWith(u8, request_header_value, header_value_needle)) return false;
|
||||||
|
// header value matches...return the path prefix match
|
||||||
|
const rc = std.mem.startsWith(u8, requested_path, path_needle);
|
||||||
|
if (rc) log.debug("executor match for header and path prefix '{s}'", .{match_data});
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Loads all optional symbols from the dynamic library. This has two entry
|
||||||
|
/// points, though the logic around the primary request handler is slighly
|
||||||
|
/// different in each case so we can't combine those two.
|
||||||
fn loadOptionalSymbols(executor: *Executor) void {
|
fn loadOptionalSymbols(executor: *Executor) void {
|
||||||
if (std.c.dlsym(executor.library.?, "request_deinit")) |s| {
|
if (std.c.dlsym(executor.library.?, "request_deinit")) |s| {
|
||||||
executor.requestDeinitFn = @ptrCast(requestDeinitFn, s);
|
executor.requestDeinitFn = @ptrCast(requestDeinitFn, s);
|
||||||
|
@ -147,6 +221,11 @@ fn loadOptionalSymbols(executor: *Executor) void {
|
||||||
executor.zigInitFn = @ptrCast(zigInitFn, s);
|
executor.zigInitFn = @ptrCast(zigInitFn, s);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Executor changed. This will be called by a sepearate thread that is
|
||||||
|
/// ultimately triggered from the operating system. This will wait for open
|
||||||
|
/// requests to that libary to complete, then lock out new requests until
|
||||||
|
/// the library is reloaded.
|
||||||
fn executorChanged(watch: usize) void {
|
fn executorChanged(watch: usize) void {
|
||||||
// NOTE: This will be called off the main thread
|
// NOTE: This will be called off the main thread
|
||||||
log.debug("executor with watch {d} changed", .{watch});
|
log.debug("executor with watch {d} changed", .{watch});
|
||||||
|
@ -154,10 +233,12 @@ fn executorChanged(watch: usize) void {
|
||||||
if (executor.watch) |w| {
|
if (executor.watch) |w| {
|
||||||
if (w == watch) {
|
if (w == watch) {
|
||||||
if (executor.library) |l| {
|
if (executor.library) |l| {
|
||||||
while (executor.in_request_lock)
|
executor.drain_in_progress.store(true, .Release);
|
||||||
|
defer executor.drain_in_progress.store(false, .Release);
|
||||||
|
while (executor.requests_in_flight.load(.Acquire) > 0) {
|
||||||
|
std.atomic.spinLoopHint();
|
||||||
std.time.sleep(1 * std.time.ns_per_ms / 2);
|
std.time.sleep(1 * std.time.ns_per_ms / 2);
|
||||||
executor.reload_lock = true;
|
}
|
||||||
defer executor.reload_lock = false;
|
|
||||||
|
|
||||||
if (std.c.dlclose(l) != 0)
|
if (std.c.dlclose(l) != 0)
|
||||||
@panic("System unstable: Error after library open and cannot close");
|
@panic("System unstable: Error after library open and cannot close");
|
||||||
|
@ -181,6 +262,7 @@ fn executorChanged(watch: usize) void {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Wrapper to the c library to make us more ziggy
|
||||||
fn dlopen(path: [:0]const u8) !*anyopaque {
|
fn dlopen(path: [:0]const u8) !*anyopaque {
|
||||||
// We need now (and local) because we're about to call it
|
// We need now (and local) because we're about to call it
|
||||||
const lib = std.c.dlopen(path, std.c.RTLD.NOW);
|
const lib = std.c.dlopen(path, std.c.RTLD.NOW);
|
||||||
|
@ -188,7 +270,8 @@ fn dlopen(path: [:0]const u8) !*anyopaque {
|
||||||
return error.CouldNotOpenDynamicLibrary;
|
return error.CouldNotOpenDynamicLibrary;
|
||||||
}
|
}
|
||||||
|
|
||||||
// fn exitApplication(sig: i32, info: *const std.os.siginfo_t, ctx_ptr: ?*const anyopaque,) callconv(.C) noreturn {
|
/// Exits the application, which is wired up to SIGINT. This is the only
|
||||||
|
/// exit from the application as the main function has an infinite loop
|
||||||
fn exitApplication(
|
fn exitApplication(
|
||||||
_: i32,
|
_: i32,
|
||||||
_: *const std.os.siginfo_t,
|
_: *const std.os.siginfo_t,
|
||||||
|
@ -198,6 +281,8 @@ fn exitApplication(
|
||||||
std.os.exit(0);
|
std.os.exit(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// exitApp handles deinitialization for the application and any reporting
|
||||||
|
/// that needs to happen
|
||||||
fn exitApp(exitcode: u8) void {
|
fn exitApp(exitcode: u8) void {
|
||||||
if (exitcode == 0)
|
if (exitcode == 0)
|
||||||
std.io.getStdOut().writer().print("termination request: stopping watch\n", .{}) catch {}
|
std.io.getStdOut().writer().print("termination request: stopping watch\n", .{}) catch {}
|
||||||
|
@ -210,10 +295,15 @@ fn exitApp(exitcode: u8) void {
|
||||||
std.os.exit(exitcode);
|
std.os.exit(exitcode);
|
||||||
// joining threads will hang...we're ultimately in a signal handler.
|
// joining threads will hang...we're ultimately in a signal handler.
|
||||||
// But everything is shut down cleanly now, so I don't think it hurts to
|
// But everything is shut down cleanly now, so I don't think it hurts to
|
||||||
// just kill it all
|
// just kill it all (NOTE: from a practical perspective, we do not seem
|
||||||
|
// to get a clean shutdown in all cases)
|
||||||
// if (watcher_thread) |t|
|
// if (watcher_thread) |t|
|
||||||
// t.join();
|
// t.join();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// reloadConfig is wired to SIGHUP and will reload all executors. In its
|
||||||
|
/// current state, this is not a safe function as no connection draining
|
||||||
|
/// has been implemented. Operates off the main thread
|
||||||
fn reloadConfig(
|
fn reloadConfig(
|
||||||
_: i32,
|
_: i32,
|
||||||
_: *const std.os.siginfo_t,
|
_: *const std.os.siginfo_t,
|
||||||
|
@ -231,6 +321,8 @@ fn reloadConfig(
|
||||||
@panic("Could not reload config");
|
@panic("Could not reload config");
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Installs all signal handlers for shutdown and configuration reload
|
||||||
fn installSignalHandler() !void {
|
fn installSignalHandler() !void {
|
||||||
var act = std.os.Sigaction{
|
var act = std.os.Sigaction{
|
||||||
.handler = .{ .sigaction = exitApplication },
|
.handler = .{ .sigaction = exitApplication },
|
||||||
|
@ -255,9 +347,9 @@ pub fn main() !void {
|
||||||
// stdout is for the actual output of your application, for example if you
|
// stdout is for the actual output of your application, for example if you
|
||||||
// are implementing gzip, then only the compressed bytes should be sent to
|
// are implementing gzip, then only the compressed bytes should be sent to
|
||||||
// stdout, not any debugging messages.
|
// stdout, not any debugging messages.
|
||||||
const stdout_file = std.io.getStdOut().writer();
|
// const stdout_file = std.io.getStdOut().writer();
|
||||||
var bw = std.io.bufferedWriter(stdout_file);
|
// var bw = std.io.bufferedWriter(stdout_file);
|
||||||
const stdout = bw.writer();
|
// const stdout = bw.writer();
|
||||||
|
|
||||||
var allocator = std.heap.raw_c_allocator; // raw allocator recommended for use in arenas
|
var allocator = std.heap.raw_c_allocator; // raw allocator recommended for use in arenas
|
||||||
executors = try loadConfig(allocator);
|
executors = try loadConfig(allocator);
|
||||||
|
@ -277,6 +369,38 @@ pub fn main() !void {
|
||||||
log.info("pid: {d}", .{std.os.linux.getpid()});
|
log.info("pid: {d}", .{std.os.linux.getpid()});
|
||||||
|
|
||||||
try installSignalHandler();
|
try installSignalHandler();
|
||||||
|
var server_thread_count = if (std.os.getenv("SERVER_THREAD_COUNT")) |count|
|
||||||
|
try std.fmt.parseInt(usize, count, 10)
|
||||||
|
else switch (builtin.mode) {
|
||||||
|
.Debug => std.math.min(4, try std.Thread.getCpuCount()),
|
||||||
|
else => try std.Thread.getCpuCount(),
|
||||||
|
};
|
||||||
|
switch (builtin.mode) {
|
||||||
|
.Debug => log.info("serving using {d} threads (debug build: capped at 4)", .{server_thread_count}),
|
||||||
|
else => log.info("serving using {d} threads", .{server_thread_count}),
|
||||||
|
}
|
||||||
|
var server_threads = try std.ArrayList(std.Thread).initCapacity(allocator, server_thread_count);
|
||||||
|
defer server_threads.deinit();
|
||||||
|
// Set up thread pool
|
||||||
|
for (0..server_thread_count) |inx| {
|
||||||
|
server_threads.appendAssumeCapacity(try std.Thread.spawn(
|
||||||
|
.{},
|
||||||
|
threadMain,
|
||||||
|
.{ allocator, &server, inx },
|
||||||
|
));
|
||||||
|
}
|
||||||
|
// main thread will no longer do anything
|
||||||
|
std.time.sleep(std.math.maxInt(u64));
|
||||||
|
for (server_threads.items) |thread| thread.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn threadMain(allocator: std.mem.Allocator, server: *std.http.Server, thread_number: usize) !void {
|
||||||
|
// TODO: If we're in a thread pool we need to be careful with this...
|
||||||
|
const stdout_file = std.io.getStdOut().writer();
|
||||||
|
var bw = std.io.bufferedWriter(stdout_file);
|
||||||
|
const stdout = bw.writer();
|
||||||
|
|
||||||
|
log.info("starting server thread {d}, tid {d}", .{ thread_number, std.Thread.getCurrentId() });
|
||||||
var arena = std.heap.ArenaAllocator.init(allocator);
|
var arena = std.heap.ArenaAllocator.init(allocator);
|
||||||
defer arena.deinit();
|
defer arena.deinit();
|
||||||
var aa = arena.allocator();
|
var aa = arena.allocator();
|
||||||
|
@ -292,7 +416,7 @@ pub fn main() !void {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
processRequest(&aa, &server, stdout) catch |e| {
|
processRequest(&aa, server, stdout) catch |e| {
|
||||||
log.err("Unexpected error processing request: {any}", .{e});
|
log.err("Unexpected error processing request: {any}", .{e});
|
||||||
if (@errorReturnTrace()) |trace| {
|
if (@errorReturnTrace()) |trace| {
|
||||||
std.debug.dumpStackTrace(trace.*);
|
std.debug.dumpStackTrace(trace.*);
|
||||||
|
@ -302,7 +426,7 @@ pub fn main() !void {
|
||||||
try bw.flush();
|
try bw.flush();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
var parsed_config: config.ParsedConfig = undefined;
|
|
||||||
fn loadConfig(allocator: std.mem.Allocator) ![]Executor {
|
fn loadConfig(allocator: std.mem.Allocator) ![]Executor {
|
||||||
log.info("loading config", .{});
|
log.info("loading config", .{});
|
||||||
// We will not watch this file - let it reload on SIGHUP
|
// We will not watch this file - let it reload on SIGHUP
|
||||||
|
@ -313,21 +437,23 @@ fn loadConfig(allocator: std.mem.Allocator) ![]Executor {
|
||||||
defer al.deinit();
|
defer al.deinit();
|
||||||
for (parsed_config.key_value_map.keys(), parsed_config.key_value_map.values()) |k, v| {
|
for (parsed_config.key_value_map.keys(), parsed_config.key_value_map.values()) |k, v| {
|
||||||
al.appendAssumeCapacity(.{
|
al.appendAssumeCapacity(.{
|
||||||
.target_prefix = k,
|
.match_data = k,
|
||||||
.path = v,
|
.path = v,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
log.info("config loaded", .{});
|
log.info("config loaded", .{});
|
||||||
return al.toOwnedSlice();
|
return al.toOwnedSlice(); // TODO: This should return a struct with the parsed config and executors
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// main loop calls processRequest, which is responsible for the interface
|
||||||
|
/// with logs, connection accounting, etc. The work dealing with the request
|
||||||
|
/// itself is delegated to the serve function to work with the executor
|
||||||
fn processRequest(allocator: *std.mem.Allocator, server: *std.http.Server, writer: anytype) !void {
|
fn processRequest(allocator: *std.mem.Allocator, server: *std.http.Server, writer: anytype) !void {
|
||||||
const max_header_size = 8192;
|
|
||||||
if (timer == null) timer = try std.time.Timer.start();
|
if (timer == null) timer = try std.time.Timer.start();
|
||||||
var tm = timer.?;
|
var tm = timer.?;
|
||||||
const res = try server.accept(.{ .dynamic = max_header_size });
|
var res = try server.accept(.{ .allocator = allocator.* });
|
||||||
defer res.deinit();
|
defer res.deinit();
|
||||||
defer res.reset();
|
defer _ = res.reset();
|
||||||
try res.wait(); // wait for client to send a complete request head
|
try res.wait(); // wait for client to send a complete request head
|
||||||
// I believe it's fair to start our timer after this is done
|
// I believe it's fair to start our timer after this is done
|
||||||
tm.reset();
|
tm.reset();
|
||||||
|
@ -345,7 +471,7 @@ fn processRequest(allocator: *std.mem.Allocator, server: *std.http.Server, write
|
||||||
var errbuf: [errstr.len]u8 = undefined;
|
var errbuf: [errstr.len]u8 = undefined;
|
||||||
var response_bytes = try std.fmt.bufPrint(&errbuf, errstr, .{});
|
var response_bytes = try std.fmt.bufPrint(&errbuf, errstr, .{});
|
||||||
|
|
||||||
var full_response = serve(allocator, res) catch |e| brk: {
|
var full_response = serve(allocator, &res) catch |e| brk: {
|
||||||
res.status = .internal_server_error;
|
res.status = .internal_server_error;
|
||||||
// TODO: more about this particular request
|
// TODO: more about this particular request
|
||||||
log.err("Unexpected error from executor processing request: {any}", .{e});
|
log.err("Unexpected error from executor processing request: {any}", .{e});
|
||||||
|
@ -355,9 +481,16 @@ fn processRequest(allocator: *std.mem.Allocator, server: *std.http.Server, write
|
||||||
break :brk null;
|
break :brk null;
|
||||||
};
|
};
|
||||||
defer {
|
defer {
|
||||||
|
// The call above converts uncaught errors to a null
|
||||||
|
// In request counter gets incremented during getExecutor
|
||||||
|
// Caught errors will get our in_flight counter decremented (see serve fn)
|
||||||
|
// - we cannot do this here because we may not have an executor at all
|
||||||
|
// This leaves this defer block as the only place we can/should decrement
|
||||||
|
// under normal conditions
|
||||||
|
|
||||||
if (full_response) |f| {
|
if (full_response) |f| {
|
||||||
if (f.executor.requestDeinitFn) |d| d();
|
if (f.executor.requestDeinitFn) |d| d();
|
||||||
f.executor.in_request_lock = false;
|
_ = f.executor.requests_in_flight.fetchSub(1, .Release);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (full_response) |f|
|
if (full_response) |f|
|
||||||
|
@ -365,7 +498,7 @@ fn processRequest(allocator: *std.mem.Allocator, server: *std.http.Server, write
|
||||||
res.transfer_encoding = .{ .content_length = response_bytes.len };
|
res.transfer_encoding = .{ .content_length = response_bytes.len };
|
||||||
try res.headers.append("connection", "close");
|
try res.headers.append("connection", "close");
|
||||||
try writer.print(" {d} ttfb {d:.3}ms", .{ @enumToInt(res.status), @intToFloat(f64, tm.read()) / std.time.ns_per_ms });
|
try writer.print(" {d} ttfb {d:.3}ms", .{ @enumToInt(res.status), @intToFloat(f64, tm.read()) / std.time.ns_per_ms });
|
||||||
if (builtin.is_test) writeToTestBuffers(response_bytes, res);
|
if (builtin.is_test) writeToTestBuffers(response_bytes, &res);
|
||||||
try res.do();
|
try res.do();
|
||||||
_ = try res.writer().writeAll(response_bytes);
|
_ = try res.writer().writeAll(response_bytes);
|
||||||
try res.finish();
|
try res.finish();
|
||||||
|
@ -440,7 +573,7 @@ fn testRequest(request_bytes: []const u8) !void {
|
||||||
var aa = arena.allocator();
|
var aa = arena.allocator();
|
||||||
var bytes_allocated: usize = 0;
|
var bytes_allocated: usize = 0;
|
||||||
// pre-warm
|
// pre-warm
|
||||||
bytes_allocated = try preWarmArena(aa, &arena, 1);
|
bytes_allocated = try preWarmArena(aa, &arena, initial_request_buffer);
|
||||||
const server_thread = try std.Thread.spawn(
|
const server_thread = try std.Thread.spawn(
|
||||||
.{},
|
.{},
|
||||||
processRequest,
|
processRequest,
|
||||||
|
|
Loading…
Reference in New Issue
Block a user