get all response body data from executor without allocations in core
This commit is contained in:
parent
f57189fbfd
commit
5be44c911f
|
@ -34,6 +34,7 @@ pub fn build(b: *std.Build) void {
|
||||||
.target = target,
|
.target = target,
|
||||||
.optimize = optimize,
|
.optimize = optimize,
|
||||||
});
|
});
|
||||||
|
lib.linkLibC();
|
||||||
|
|
||||||
// This declares intent for the executable to be installed into the
|
// This declares intent for the executable to be installed into the
|
||||||
// standard location when the user invokes the "install" step (the default
|
// standard location when the user invokes the "install" step (the default
|
||||||
|
|
|
@ -1,12 +1,34 @@
|
||||||
const std = @import("std");
|
const std = @import("std");
|
||||||
const testing = std.testing;
|
const testing = std.testing;
|
||||||
|
|
||||||
export fn serve() void {
|
const Response = extern struct {
|
||||||
const stdout_file = std.io.getStdOut().writer();
|
ptr: [*]u8,
|
||||||
var bw = std.io.bufferedWriter(stdout_file);
|
len: usize,
|
||||||
const stdout = bw.writer();
|
};
|
||||||
stdout.print(" 2 ", .{}) catch unreachable;
|
|
||||||
bw.flush() catch unreachable;
|
var child_allocator = std.heap.raw_c_allocator; // raw allocator recommended for use in arenas
|
||||||
|
var arena: std.heap.ArenaAllocator = undefined;
|
||||||
|
|
||||||
|
export fn handle_request() *Response {
|
||||||
|
arena = std.heap.ArenaAllocator.init(child_allocator);
|
||||||
|
var allocator = arena.allocator();
|
||||||
|
var al = std.ArrayList(u8).init(allocator);
|
||||||
|
var writer = al.writer();
|
||||||
|
|
||||||
|
writer.print(" 2.", .{}) catch unreachable;
|
||||||
|
|
||||||
|
// Cannot simply return &Blah. Need to assign to var first
|
||||||
|
var rc = &Response{
|
||||||
|
.ptr = al.items.ptr,
|
||||||
|
.len = al.items.len,
|
||||||
|
};
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// having request_deinit allows for a general deinit as well
|
||||||
|
export fn request_deinit() void {
|
||||||
|
std.log.debug("deinit", .{});
|
||||||
|
arena.deinit();
|
||||||
}
|
}
|
||||||
|
|
||||||
export fn add(a: i32, b: i32) i32 {
|
export fn add(a: i32, b: i32) i32 {
|
||||||
|
|
146
src/main.zig
146
src/main.zig
|
@ -2,16 +2,29 @@ const std = @import("std");
|
||||||
const builtin = @import("builtin");
|
const builtin = @import("builtin");
|
||||||
|
|
||||||
const Watch = @import("Watch.zig");
|
const Watch = @import("Watch.zig");
|
||||||
const serveFn = *const fn () void;
|
const serveFn = *const fn () *ServeReturn;
|
||||||
|
const requestDeinitFn = *const fn () void;
|
||||||
|
|
||||||
const timeout = 250;
|
const timeout = 250;
|
||||||
|
|
||||||
|
const ServeReturn = extern struct {
|
||||||
|
ptr: [*]u8,
|
||||||
|
len: usize,
|
||||||
|
};
|
||||||
|
|
||||||
|
const FullReturn = struct {
|
||||||
|
response: []u8,
|
||||||
|
executor: *Executor,
|
||||||
|
};
|
||||||
|
|
||||||
const Executor = struct {
|
const Executor = struct {
|
||||||
path: [:0]const u8,
|
path: [:0]const u8,
|
||||||
library: ?*anyopaque = null,
|
library: ?*anyopaque = null,
|
||||||
serveFn: ?serveFn = null,
|
serveFn: ?serveFn = null,
|
||||||
|
requestDeinitFn: ?requestDeinitFn = null,
|
||||||
watch: ?usize = null,
|
watch: ?usize = null,
|
||||||
reload_lock: bool = false,
|
reload_lock: bool = false,
|
||||||
|
in_request_lock: bool = false,
|
||||||
};
|
};
|
||||||
|
|
||||||
var executors = [_]Executor{
|
var executors = [_]Executor{
|
||||||
|
@ -30,30 +43,58 @@ pub const std_options = struct {
|
||||||
.{ .scope = .watch, .level = .info },
|
.{ .scope = .watch, .level = .info },
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
const SERVE_FN_NAME = "serve";
|
const SERVE_FN_NAME = "handle_request";
|
||||||
const PORT = 8069;
|
const PORT = 8069;
|
||||||
|
|
||||||
fn serve() !void {
|
// TODO: writer as anytype is not going to survive across library boundaries...
|
||||||
|
fn serve(allocator: std.mem.Allocator, response: *std.http.Server.Response) !*FullReturn {
|
||||||
|
var null_server = std.http.Server.init(allocator, .{});
|
||||||
|
defer null_server.deinit();
|
||||||
|
var data: [14]u8 = @constCast(&[_]u8{0x00} ** 14).*;
|
||||||
|
var child_response = std.http.Server.Response{
|
||||||
|
.server = &null_server,
|
||||||
|
.request = response.request,
|
||||||
|
.connection = .{
|
||||||
|
.conn = .{
|
||||||
|
.stream = .{ .handle = 0 },
|
||||||
|
.protocol = .plain,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
.address = .{ .any = .{
|
||||||
|
.data = data,
|
||||||
|
.family = 0,
|
||||||
|
} },
|
||||||
|
.headers = response.headers,
|
||||||
|
};
|
||||||
|
_ = child_response;
|
||||||
// if (some path routing thing) {
|
// if (some path routing thing) {
|
||||||
|
// TODO: Get request body into executor
|
||||||
|
// TODO: Get headers back from executor
|
||||||
|
// TODO: Get request headers into executor
|
||||||
|
const executor = try getExecutor(0);
|
||||||
|
executor.in_request_lock = true;
|
||||||
|
errdefer executor.in_request_lock = false;
|
||||||
|
// Call external library
|
||||||
|
var serve_result = executor.serveFn.?();
|
||||||
|
|
||||||
(try getExecutor(0))();
|
// Deal with results
|
||||||
// if (inx > 4) {
|
var slice: []u8 = serve_result.ptr[0..serve_result.len];
|
||||||
// if (inx % 2 == 0)
|
var rc = &FullReturn{
|
||||||
// (try getExecutor(0))()
|
.executor = executor,
|
||||||
// else
|
.response = slice,
|
||||||
// (try getExecutor(1))();
|
};
|
||||||
// }
|
return rc;
|
||||||
}
|
}
|
||||||
fn getExecutor(key: usize) !serveFn {
|
fn getExecutor(key: usize) !*Executor {
|
||||||
var executor = &executors[key];
|
var executor = &executors[key];
|
||||||
if (executor.serveFn) |s| return s;
|
if (executor.serveFn != null) return executor;
|
||||||
|
|
||||||
executor.library = blk: {
|
executor.library = blk: {
|
||||||
if (executor.library) |l|
|
if (executor.library) |l|
|
||||||
break :blk l;
|
break :blk l;
|
||||||
|
|
||||||
while (executor.reload_lock) // system is reloading the library
|
while (executor.reload_lock) // system is reloading the library
|
||||||
std.time.sleep(1);
|
std.time.sleep(1 * std.time.ns_per_ms / 2);
|
||||||
|
|
||||||
if (executor.library) |l| // check again to see where we are at
|
if (executor.library) |l| // check again to see where we are at
|
||||||
break :blk l;
|
break :blk l;
|
||||||
|
@ -71,9 +112,15 @@ fn getExecutor(key: usize) !serveFn {
|
||||||
if (serve_fn == null) return error.CouldNotLoadSymbolServe;
|
if (serve_fn == null) return error.CouldNotLoadSymbolServe;
|
||||||
|
|
||||||
executor.serveFn = @ptrCast(serveFn, serve_fn.?);
|
executor.serveFn = @ptrCast(serveFn, serve_fn.?);
|
||||||
return executor.serveFn.?;
|
loadOptionalSymbols(executor);
|
||||||
|
return executor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn loadOptionalSymbols(executor: *Executor) void {
|
||||||
|
if (std.c.dlsym(executor.library.?, "request_deinit")) |s| {
|
||||||
|
executor.requestDeinitFn = @ptrCast(requestDeinitFn, s);
|
||||||
|
}
|
||||||
|
}
|
||||||
fn executorChanged(watch: usize) void {
|
fn executorChanged(watch: usize) void {
|
||||||
// NOTE: This will be called off the main thread
|
// NOTE: This will be called off the main thread
|
||||||
log.debug("executor with watch {d} changed", .{watch});
|
log.debug("executor with watch {d} changed", .{watch});
|
||||||
|
@ -81,6 +128,8 @@ fn executorChanged(watch: usize) void {
|
||||||
if (executor.watch) |w| {
|
if (executor.watch) |w| {
|
||||||
if (w == watch) {
|
if (w == watch) {
|
||||||
if (executor.library) |l| {
|
if (executor.library) |l| {
|
||||||
|
while (executor.in_request_lock)
|
||||||
|
std.time.sleep(1 * std.time.ns_per_ms / 2);
|
||||||
executor.reload_lock = true;
|
executor.reload_lock = true;
|
||||||
defer executor.reload_lock = false;
|
defer executor.reload_lock = false;
|
||||||
|
|
||||||
|
@ -91,13 +140,15 @@ fn executorChanged(watch: usize) void {
|
||||||
log.warn("could not reload! error opening library", .{});
|
log.warn("could not reload! error opening library", .{});
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
executor.serveFn = @ptrCast(serveFn, std.c.dlsym(executor.library.?, SERVE_FN_NAME));
|
var symbol = std.c.dlsym(executor.library.?, SERVE_FN_NAME);
|
||||||
if (executor.serveFn == null) {
|
if (symbol == null) {
|
||||||
log.warn("could not reload! error finding symbol", .{});
|
log.warn("could not reload! error finding symbol", .{});
|
||||||
if (std.c.dlclose(executor.library.?) != 0)
|
if (std.c.dlclose(executor.library.?) != 0)
|
||||||
@panic("System unstable: Error after library open and cannot close");
|
@panic("System unstable: Error after library open and cannot close");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
executor.serveFn = @ptrCast(serveFn, symbol);
|
||||||
|
loadOptionalSymbols(executor);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -164,36 +215,67 @@ pub fn main() !void {
|
||||||
log.info("listening on port: {d}", .{server_port});
|
log.info("listening on port: {d}", .{server_port});
|
||||||
if (builtin.os.tag == .linux)
|
if (builtin.os.tag == .linux)
|
||||||
log.info("pid: {d}", .{std.os.linux.getpid()});
|
log.info("pid: {d}", .{std.os.linux.getpid()});
|
||||||
// install signal handler
|
|
||||||
var act = std.os.Sigaction{
|
|
||||||
.handler = .{ .sigaction = exitApplication },
|
|
||||||
.mask = std.os.empty_sigset,
|
|
||||||
.flags = (std.os.SA.SIGINFO | std.os.SA.RESTART | std.os.SA.RESETHAND),
|
|
||||||
};
|
|
||||||
|
|
||||||
try std.os.sigaction(std.os.SIG.INT, &act, null);
|
try installSignalHandler();
|
||||||
try std.os.sigaction(std.os.SIG.TERM, &act, null);
|
|
||||||
while (true) {
|
while (true) {
|
||||||
var arena = std.heap.ArenaAllocator.init(std.heap.c_allocator);
|
var arena = std.heap.ArenaAllocator.init(std.heap.c_allocator);
|
||||||
defer arena.deinit();
|
defer arena.deinit();
|
||||||
|
|
||||||
|
processRequest(arena.allocator(), &server) catch |e| {
|
||||||
|
log.err("Unexpected error processing request: {any}", .{e});
|
||||||
|
if (@errorReturnTrace()) |trace| {
|
||||||
|
std.debug.dumpStackTrace(trace.*);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn processRequest(allocator: std.mem.Allocator, server: *std.http.Server) !void {
|
||||||
|
const max_header_size = 8192;
|
||||||
const res = try server.accept(.{ .dynamic = max_header_size });
|
const res = try server.accept(.{ .dynamic = max_header_size });
|
||||||
defer res.deinit();
|
defer res.deinit();
|
||||||
defer res.reset();
|
defer res.reset();
|
||||||
try res.wait();
|
try res.wait();
|
||||||
|
|
||||||
const server_body: []const u8 = "message from server!\n";
|
// TODO: deal with this
|
||||||
res.transfer_encoding = .{ .content_length = server_body.len };
|
var buf: [1024]u8 = undefined;
|
||||||
|
const n = try res.readAll(&buf);
|
||||||
|
_ = n;
|
||||||
|
|
||||||
|
// TODO: we need to also have a defer statement to deinit whatever happens
|
||||||
|
// with the executor library. This will also add a race condition where
|
||||||
|
// we could have a memory leak if the executor reloads in the middle of a
|
||||||
|
// request. We may want to add a new spinlock on the reload thread to
|
||||||
|
// avoid reloading in the middle of a request, which would be generally
|
||||||
|
// bad anyway
|
||||||
|
const errstr = "Internal Server Error\n";
|
||||||
|
var errbuf: [errstr.len]u8 = undefined;
|
||||||
|
var response_bytes = try std.fmt.bufPrint(&errbuf, errstr, .{});
|
||||||
|
|
||||||
|
var full_response = serve(allocator, res) catch |e| brk: {
|
||||||
|
res.status = .internal_server_error;
|
||||||
|
// TODO: more about this particular request
|
||||||
|
log.err("Unexpected error from executor processing request: {any}", .{e});
|
||||||
|
if (@errorReturnTrace()) |trace| {
|
||||||
|
std.debug.dumpStackTrace(trace.*);
|
||||||
|
}
|
||||||
|
break :brk null;
|
||||||
|
};
|
||||||
|
defer {
|
||||||
|
if (full_response) |f| {
|
||||||
|
if (f.executor.requestDeinitFn) |d| d();
|
||||||
|
f.executor.in_request_lock = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (full_response) |f|
|
||||||
|
response_bytes = f.response;
|
||||||
|
res.transfer_encoding = .{ .content_length = response_bytes.len };
|
||||||
try res.headers.append("content-type", "text/plain");
|
try res.headers.append("content-type", "text/plain");
|
||||||
try res.headers.append("connection", "close");
|
try res.headers.append("connection", "close");
|
||||||
try res.do();
|
try res.do();
|
||||||
|
_ = try res.writer().writeAll(response_bytes);
|
||||||
var buf: [128]u8 = undefined;
|
|
||||||
const n = try res.readAll(&buf);
|
|
||||||
_ = n;
|
|
||||||
_ = try res.writer().writeAll(server_body);
|
|
||||||
try res.finish();
|
try res.finish();
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
test {
|
test {
|
||||||
// To run nested container tests, either, call `refAllDecls` which will
|
// To run nested container tests, either, call `refAllDecls` which will
|
||||||
|
|
Loading…
Reference in New Issue
Block a user