diff --git a/src/awslambda.zig b/src/awslambda.zig index 0f666cc..756115b 100644 --- a/src/awslambda.zig +++ b/src/awslambda.zig @@ -1,6 +1,6 @@ const std = @import("std"); -const lambda_zig = @import("lambda-zig"); +// const lambda_zig = @import("lambda-zig"); pub fn run() !void { - lambda_zig.run(); + // lambda_zig.run(); } diff --git a/src/universal_lambda.zig b/src/universal_lambda.zig index f2c61a4..38547d7 100644 --- a/src/universal_lambda.zig +++ b/src/universal_lambda.zig @@ -1,7 +1,5 @@ -const builtin = @import("builtin"); -const std = @import("std"); const build_options = @import("universal_lambda_build_options"); -const flexilib = @import("flexilib-interface"); +const std = @import("std"); const interface = @import("universal_lambda_interface"); const log = std.log.scoped(.universal_lambda); @@ -9,7 +7,7 @@ const log = std.log.scoped(.universal_lambda); const runFn = blk: { switch (build_options.build_type) { .awslambda => break :blk @import("awslambda.zig").run, - .standalone_server => break :blk runStandaloneServerParent, + .standalone_server => break :blk @import("standalone_server.zig").runStandaloneServerParent, // In the case of flexilib, our root module is actually flexilib.zig // so we need to import that, otherwise we risk the dreaded "file exists // in multiple modules" problem @@ -28,151 +26,11 @@ pub fn run(allocator: ?std.mem.Allocator, event_handler: interface.HandlerFn) !u return try runFn(allocator, event_handler); } -/// We need to create a child process to be able to deal with panics appropriately -fn runStandaloneServerParent(allocator: ?std.mem.Allocator, event_handler: interface.HandlerFn) !u8 { - const alloc = allocator orelse std.heap.page_allocator; - - var arena = std.heap.ArenaAllocator.init(alloc); - defer arena.deinit(); - - const aa = arena.allocator(); - var al = std.ArrayList([]const u8).init(aa); - defer al.deinit(); - var argi = std.process.args(); - // We do this first so it shows more prominently when looking at processes - // Also it will be slightly faster for whatever that is worth - const child_arg = "--child_of_standalone_server"; - if (argi.next()) |a| try al.append(a); - try al.append(child_arg); - while (argi.next()) |a| { - if (std.mem.eql(u8, child_arg, a)) { - // This should never actually return - try runStandaloneServer(allocator, event_handler, 8080); // TODO: configurable port - return 0; - } - try al.append(a); - } - // Parent - const stdin = std.io.getStdIn(); - const stdout = std.io.getStdOut(); - const stderr = std.io.getStdErr(); - while (true) { - var cp = std.ChildProcess.init(al.items, alloc); - cp.stdin = stdin; - cp.stdout = stdout; - cp.stderr = stderr; - _ = try cp.spawnAndWait(); - try stderr.writeAll("Caught abnormal process termination, relaunching server"); - } -} - -/// Will create a web server and marshall all requests back to our event handler -/// To keep things simple, we'll have this on a single thread, at least for now -fn runStandaloneServer(allocator: ?std.mem.Allocator, event_handler: interface.HandlerFn, port: u16) !void { - const alloc = allocator orelse std.heap.page_allocator; - - var arena = std.heap.ArenaAllocator.init(alloc); - defer arena.deinit(); - - var aa = arena.allocator(); - const address = try std.net.Address.parseIp("127.0.0.1", port); - var net_server = try address.listen(.{ .reuse_address = true }); - const server_port = net_server.listen_address.in.getPort(); - _ = try std.fmt.bufPrint(&server_url, "http://127.0.0.1:{d}", .{server_port}); - log.info("server listening at {s}", .{server_url}); - if (builtin.is_test) server_ready = true; - - // No threads, maybe later - //log.info("starting server thread, tid {d}", .{std.Thread.getCurrentId()}); - while (remaining_requests == null or remaining_requests.? > 0) { - defer { - if (remaining_requests) |*r| r.* -= 1; - if (!arena.reset(.{ .retain_with_limit = 1024 * 1024 })) { - // reallocation failed, arena is degraded - log.warn("Arena reset failed and is degraded. Resetting arena", .{}); - arena.deinit(); - arena = std.heap.ArenaAllocator.init(alloc); - aa = arena.allocator(); - } - } - const supports_getrusage = builtin.os.tag != .windows and @hasDecl(std.posix.system, "rusage"); // Is Windows it? - var rss: if (supports_getrusage) std.posix.rusage else void = undefined; - if (supports_getrusage and builtin.mode == .Debug) - rss = std.posix.getrusage(std.posix.rusage.SELF); - if (builtin.is_test) bytes_allocated = arena.queryCapacity(); - processRequest(aa, &net_server, event_handler) catch |e| { - log.err("Unexpected error processing request: {any}", .{e}); - if (@errorReturnTrace()) |trace| { - std.debug.dumpStackTrace(trace.*); - } - }; - if (builtin.is_test) bytes_allocated = arena.queryCapacity() - bytes_allocated; - if (supports_getrusage and builtin.mode == .Debug) { // and debug mode) { - const rusage = std.posix.getrusage(std.posix.rusage.SELF); - log.debug( - "Request complete, max RSS of process: {d}M. Incremental: {d}K, User: {d}μs, System: {d}μs", - .{ - @divTrunc(rusage.maxrss, 1024), - rusage.maxrss - rss.maxrss, - (rusage.utime.tv_sec - rss.utime.tv_sec) * std.time.us_per_s + - rusage.utime.tv_usec - rss.utime.tv_usec, - (rusage.stime.tv_sec - rss.stime.tv_sec) * std.time.us_per_s + - rusage.stime.tv_usec - rss.stime.tv_usec, - }, - ); - } - } - return; -} - -fn processRequest(aa: std.mem.Allocator, server: *std.net.Server, event_handler: interface.HandlerFn) !void { - // This function is under test, but not the standalone server itself - var connection = try server.accept(); - defer connection.stream.close(); - - var read_buffer: [1024 * 16]u8 = undefined; // TODO: Fix this - var server_connection = std.http.Server.init(connection, &read_buffer); - var req = try server_connection.receiveHead(); - - const request_body = try (try req.reader()).readAllAlloc(aa, @as(usize, std.math.maxInt(usize))); - var request_headers = std.ArrayList(std.http.Header).init(aa); - defer request_headers.deinit(); - var hi = req.iterateHeaders(); - while (hi.next()) |h| try request_headers.append(h); - - var response = interface.Response.init(aa); - defer response.deinit(); - response.request.headers = request_headers.items; - response.request.headers_owned = false; - response.request.target = req.head.target; - response.request.method = req.head.method; - response.headers = &.{}; - response.headers_owned = false; - - var respond_options = std.http.Server.Request.RespondOptions{}; - const response_bytes = event_handler(aa, request_body, &response) catch |e| brk: { - respond_options.status = response.status; - respond_options.reason = response.reason; - if (respond_options.status.class() == .success) { - respond_options.status = .internal_server_error; - respond_options.reason = null; - response.body.items = ""; - } - // TODO: stream body to client? or keep internal? - // TODO: more about this particular request - log.err("Unexpected error from executor processing request: {any}", .{e}); - if (@errorReturnTrace()) |trace| { - std.debug.dumpStackTrace(trace.*); - } - break :brk "Unexpected error generating request to lambda"; - }; - - const final_response = try std.mem.concat(aa, u8, &[_][]const u8{ response.body.items, response_bytes }); - try req.respond(final_response, respond_options); -} test { std.testing.refAllDecls(@This()); std.testing.refAllDecls(@import("console.zig")); + std.testing.refAllDecls(@import("standalone_server.zig")); + std.testing.refAllDecls(@import("awslambda.zig")); // By importing flexilib.zig, this breaks downstream any time someone // tries to build flexilib, because flexilib.zig becomes the root module, // then gets imported here again. It shouldn't be done unless doing @@ -190,60 +48,3 @@ test { // TODO: Do we want build files here too? } - -var server_ready = false; -var remaining_requests: ?usize = null; -var server_url: ["http://127.0.0.1:99999".len]u8 = undefined; -var bytes_allocated: usize = 0; - -fn testRequest(method: std.http.Method, target: []const u8, event_handler: interface.HandlerFn) !void { - remaining_requests = 1; - defer remaining_requests = null; - const server_thread = try std.Thread.spawn( - .{}, - runStandaloneServer, - .{ null, event_handler, 0 }, - ); - var client = std.http.Client{ .allocator = std.testing.allocator }; - defer client.deinit(); - defer server_ready = false; - while (!server_ready) std.time.sleep(1); - - const url = try std.mem.concat(std.testing.allocator, u8, &[_][]const u8{ server_url[0..], target }); - defer std.testing.allocator.free(url); - log.debug("fetch from url: {s}", .{url}); - - var response_data = std.ArrayList(u8).init(std.testing.allocator); - defer response_data.deinit(); - - const resp = try client.fetch(.{ - .response_storage = .{ .dynamic = &response_data }, - .method = method, - .location = .{ .url = url }, - }); - - server_thread.join(); - log.debug("Bytes allocated during request: {d}", .{bytes_allocated}); - log.debug("Response status: {}", .{resp.status}); - log.debug("Response: {s}", .{response_data.items}); -} - -fn testGet(comptime path: []const u8, event_handler: interface.HandlerFn) !void { - try testRequest(.GET, path, event_handler); -} -test "can make a request" { - // std.testing.log_level = .debug; - if (@import("builtin").os.tag == .wasi) return error.SkipZigTest; - const HandlerClosure = struct { - var data_received: []const u8 = undefined; - var context_received: interface.Context = undefined; - const Self = @This(); - pub fn handler(allocator: std.mem.Allocator, event_data: []const u8, context: interface.Context) ![]const u8 { - _ = allocator; - data_received = event_data; - context_received = context; - return "success"; - } - }; - try testGet("/", HandlerClosure.handler); -}