From e2bbaf2c9a3186833119701c51803ddddb2cf55a Mon Sep 17 00:00:00 2001 From: Emil Lerch Date: Wed, 2 Aug 2023 17:36:22 -0700 Subject: [PATCH] refactor main loop into multiple functions This also gains the ability to report errors when posting the response --- src/lambda.zig | 395 +++++++++++++++++++++++++++++++------------------ 1 file changed, 248 insertions(+), 147 deletions(-) diff --git a/src/lambda.zig b/src/lambda.zig index 6a302b0..6d7e262 100644 --- a/src/lambda.zig +++ b/src/lambda.zig @@ -5,11 +5,21 @@ const HandlerFn = *const fn (std.mem.Allocator, []const u8) anyerror![]const u8; const log = std.log.scoped(.lambda); +var empty_headers: std.http.Headers = undefined; +var client: ?std.http.Client = null; + +const prefix = "http://"; +const postfix = "/2018-06-01/runtime/invocation"; + +pub fn deinit() void { + if (client) |*c| c.deinit(); + client = null; +} /// Starts the lambda framework. Handler will be called when an event is processing /// If an allocator is not provided, an approrpriate allocator will be selected and used +/// This function is intended to loop infinitely. If not used in this manner, +/// make sure to call the deinit() function pub fn run(allocator: ?std.mem.Allocator, event_handler: HandlerFn) !void { // TODO: remove inferred error set? - const prefix = "http://"; - const postfix = "/2018-06-01/runtime/invocation"; const lambda_runtime_uri = std.os.getenv("AWS_LAMBDA_RUNTIME_API") orelse test_lambda_runtime_uri.?; // TODO: If this is null, go into single use command line mode @@ -21,9 +31,13 @@ pub fn run(allocator: ?std.mem.Allocator, event_handler: HandlerFn) !void { // T defer alloc.free(url); const uri = try std.Uri.parse(url); - var client: std.http.Client = .{ .allocator = alloc }; - defer client.deinit(); - var empty_headers = std.http.Headers.init(alloc); + // TODO: Simply adding this line without even using the client is enough + // to cause seg faults!? + // client = client orelse .{ .allocator = alloc }; + // so we'll do this instead + if (client != null) return error.MustDeInitBeforeCallingRunAgain; + client = .{ .allocator = alloc }; + empty_headers = std.http.Headers.init(alloc); defer empty_headers.deinit(); log.info("tid {d} (lambda): Bootstrap initializing with event url: {s}", .{ std.Thread.getCurrentId(), url }); @@ -36,160 +50,229 @@ pub fn run(allocator: ?std.mem.Allocator, event_handler: HandlerFn) !void { // T var req_alloc = std.heap.ArenaAllocator.init(alloc); defer req_alloc.deinit(); const req_allocator = req_alloc.allocator(); - var req = try client.request(.GET, uri, empty_headers, .{}); - defer req.deinit(); - req.start() catch |err| { // Well, at this point all we can do is shout at the void - log.err("Get fail (start): {}", .{err}); - std.os.exit(0); + // Fundamentally we're doing 3 things: + // 1. Get the next event from Lambda (event data and request id) + // 2. Call our handler to get the response + // 3. Post the response back to Lambda + var ev = getEvent(req_allocator, uri) catch |err| { + // Well, at this point all we can do is shout at the void + log.err("Error fetching event details: {}", .{err}); + std.os.exit(1); + // continue; + }; + if (ev == null) continue; // this gets logged in getEvent, and without + // a request id, we still can't do anything + // reasonable to report back + const event = ev.?; + defer ev.?.deinit(); + const event_response = event_handler(req_allocator, event.event_data) catch |err| { + event.reportError(@errorReturnTrace(), err, lambda_runtime_uri) catch unreachable; continue; }; - - // Lambda freezes the process at this line of code. During warm start, - // the process will unfreeze and data will be sent in response to client.get - req.wait() catch |err| { // Well, at this point all we can do is shout at the void - log.err("Get fail (wait): {}", .{err}); - std.os.exit(0); + event.postResponse(lambda_runtime_uri, event_response) catch |err| { + event.reportError(@errorReturnTrace(), err, lambda_runtime_uri) catch unreachable; continue; }; + } +} + +const Event = struct { + allocator: std.mem.Allocator, + event_data: []u8, + request_id: []u8, + + const Self = @This(); + + pub fn init(allocator: std.mem.Allocator, event_data: []u8, request_id: []u8) Self { + return .{ + .allocator = allocator, + .event_data = event_data, + .request_id = request_id, + }; + } + pub fn deinit(self: *Self) void { + self.allocator.free(self.event_data); + self.allocator.free(self.request_id); + } + fn reportError( + self: Self, + return_trace: ?*std.builtin.StackTrace, + err: anytype, + lambda_runtime_uri: []const u8, + ) !void { + // If we fail in this function, we're pretty hosed up + if (return_trace) |rt| + log.err("Caught error: {}. Return Trace: {any}", .{ err, rt }) + else + log.err("Caught error: {}. No return trace available", .{err}); + const err_url = try std.fmt.allocPrint( + self.allocator, + "{s}{s}{s}/{s}/error", + .{ prefix, lambda_runtime_uri, postfix, self.request_id }, + ); + defer self.allocator.free(err_url); + const err_uri = try std.Uri.parse(err_url); + const content = + \\{{ + \\ "errorMessage": "{s}", + \\ "errorType": "HandlerReturnedError", + \\ "stackTrace": [ "{any}" ] + \\}} + ; + const content_fmt = if (return_trace) |rt| + try std.fmt.allocPrint(self.allocator, content, .{ @errorName(err), rt }) + else + try std.fmt.allocPrint(self.allocator, content, .{ @errorName(err), "no return trace available" }); + defer self.allocator.free(content_fmt); + log.err("Posting to {s}: Data {s}", .{ err_url, content_fmt }); + + var err_headers = std.http.Headers.init(self.allocator); + defer err_headers.deinit(); + err_headers.append( + "Lambda-Runtime-Function-Error-Type", + "HandlerReturned", + ) catch |append_err| { + log.err("Error appending error header to post response for request id {s}: {}", .{ self.request_id, append_err }); + std.os.exit(1); + }; + // TODO: There is something up with using a shared client in this way + // so we're taking a perf hit in favor of stability. In a practical + // sense, without making HTTPS connections (lambda environment is + // non-ssl), this shouldn't be a big issue + var cl = std.http.Client{ .allocator = self.allocator }; + defer cl.deinit(); + var req = try cl.request(.POST, err_uri, empty_headers, .{}); + // var req = try client.?.request(.POST, err_uri, empty_headers, .{}); + // defer req.deinit(); + req.transfer_encoding = .{ .content_length = content_fmt.len }; + req.start() catch |post_err| { // Well, at this point all we can do is shout at the void + log.err("Error posting response (start) for request id {s}: {}", .{ self.request_id, post_err }); + std.os.exit(1); + }; + try req.writeAll(content_fmt); + try req.finish(); + req.wait() catch |post_err| { // Well, at this point all we can do is shout at the void + log.err("Error posting response (wait) for request id {s}: {}", .{ self.request_id, post_err }); + std.os.exit(1); + }; + // TODO: Determine why this post is not returning if (req.response.status != .ok) { // Documentation says something about "exit immediately". The // Lambda infrastrucutre restarts, so it's unclear if that's necessary. // It seems as though a continue should be fine, and slightly faster - // std.os.exit(1); log.err("Get fail: {} {s}", .{ @intFromEnum(req.response.status), req.response.status.phrase() orelse "", }); - continue; + std.os.exit(1); } - - var request_id: ?[]const u8 = null; - var content_length: ?usize = null; - for (req.response.headers.list.items) |h| { - if (std.ascii.eqlIgnoreCase(h.name, "Lambda-Runtime-Aws-Request-Id")) - request_id = h.value; - if (std.ascii.eqlIgnoreCase(h.name, "Content-Length")) { - content_length = std.fmt.parseUnsigned(usize, h.value, 10) catch null; - if (content_length == null) - log.warn("Error parsing content length value: '{s}'", .{h.value}); - } - // TODO: XRay uses an environment variable to do its magic. It's our - // responsibility to set this, but no zig-native setenv(3)/putenv(3) - // exists. I would kind of rather not link in libc for this, - // so we'll hold for now and think on this - // if (std.mem.indexOf(u8, h.name.value, "Lambda-Runtime-Trace-Id")) |_| - // std.process. - // std.os.setenv("AWS_LAMBDA_RUNTIME_API"); - } - if (request_id == null) { - // We can't report back an issue because the runtime error reporting endpoint - // uses request id in its path. So the best we can do is log the error and move - // on here. - log.err("Could not find request id: skipping request", .{}); - continue; - } - const req_id = request_id.?; - log.debug("got lambda request with id {s}", .{req_id}); - - const reader = req.reader(); - var buf: [65535]u8 = undefined; - - var resp_payload = std.ArrayList(u8).init(req_allocator); - if (content_length) |len| { - resp_payload.ensureTotalCapacity(len) catch { - log.err("Could not allocate memory for body of request id: {s}", .{request_id.?}); - continue; - }; - } - - defer resp_payload.deinit(); - - while (true) { - const read = try reader.read(&buf); - try resp_payload.appendSlice(buf[0..read]); - if (read == 0) break; - } - const event_response = event_handler(req_allocator, resp_payload.items) catch |err| { - // Stack trace will return null if stripped - const return_trace = @errorReturnTrace(); - if (return_trace) |rt| - log.err("Caught error: {}. Return Trace: {any}", .{ err, rt }) - else - log.err("Caught error: {}. No return trace available", .{err}); - const err_url = try std.fmt.allocPrint(req_allocator, "{s}{s}/runtime/invocation/{s}/error", .{ prefix, lambda_runtime_uri, req_id }); - defer req_allocator.free(err_url); - const err_uri = try std.Uri.parse(err_url); - const content = - \\ {s} - \\ "errorMessage": "{s}", - \\ "errorType": "HandlerReturnedError", - \\ "stackTrace": [ "{any}" ] - \\ {s} - ; - const content_fmt = if (return_trace) |rt| - try std.fmt.allocPrint(req_allocator, content, .{ "{", @errorName(err), rt, "}" }) - else - try std.fmt.allocPrint(req_allocator, content, .{ "{", @errorName(err), "no return trace available", "}" }); - defer req_allocator.free(content_fmt); - log.err("Posting to {s}: Data {s}", .{ err_url, content_fmt }); - - var err_headers = std.http.Headers.init(req_allocator); - defer err_headers.deinit(); - err_headers.append( - "Lambda-Runtime-Function-Error-Type", - "HandlerReturned", - ) catch |append_err| { - log.err("Error appending error header to post response for request id {s}: {}", .{ req_id, append_err }); - std.os.exit(0); - continue; - }; - var err_req = try client.request(.POST, err_uri, empty_headers, .{}); - defer err_req.deinit(); - err_req.start() catch |post_err| { // Well, at this point all we can do is shout at the void - log.err("Error posting response for request id {s}: {}", .{ req_id, post_err }); - std.os.exit(0); - continue; - }; - - err_req.wait() catch |post_err| { // Well, at this point all we can do is shout at the void - log.err("Error posting response for request id {s}: {}", .{ req_id, post_err }); - std.os.exit(0); - continue; - }; - // TODO: Determine why this post is not returning - if (err_req.response.status != .ok) { - // Documentation says something about "exit immediately". The - // Lambda infrastrucutre restarts, so it's unclear if that's necessary. - // It seems as though a continue should be fine, and slightly faster - // std.os.exit(1); - log.err("Get fail: {} {s}", .{ - @intFromEnum(err_req.response.status), - err_req.response.status.phrase() orelse "", - }); - continue; - } - log.err("Post complete", .{}); - continue; - }; - // TODO: We should catch these potential alloc errors too - // TODO: This whole loop should be in another function so we can catch everything at once - const response_url = try std.fmt.allocPrint(req_allocator, "{s}{s}{s}/{s}/response", .{ prefix, lambda_runtime_uri, postfix, req_id }); - defer req_allocator.free(response_url); - const response_uri = try std.Uri.parse(response_url); - const response_content = try std.fmt.allocPrint(req_allocator, "{s} \"content\": \"{s}\" {s}", .{ "{", event_response, "}" }); - var resp_req = try client.request(.POST, response_uri, empty_headers, .{}); - defer resp_req.deinit(); - resp_req.transfer_encoding = .{ .content_length = response_content.len }; - try resp_req.start(); - try resp_req.writeAll(response_content); // TODO: AllocPrint + writeAll makes no sense - try resp_req.finish(); - resp_req.wait() catch |err| { - // TODO: report error - log.err("Error posting response for request id {s}: {}", .{ req_id, err }); - continue; - }; + log.err("Error reporting post complete", .{}); } + + fn postResponse(self: Self, lambda_runtime_uri: []const u8, event_response: []const u8) !void { + const response_url = try std.fmt.allocPrint( + self.allocator, + "{s}{s}{s}/{s}/response", + .{ prefix, lambda_runtime_uri, postfix, self.request_id }, + ); + defer self.allocator.free(response_url); + const response_uri = try std.Uri.parse(response_url); + var cl = std.http.Client{ .allocator = self.allocator }; + defer cl.deinit(); + var req = try cl.request(.POST, response_uri, empty_headers, .{}); + // var req = try client.?.request(.POST, response_uri, empty_headers, .{}); + defer req.deinit(); + const response_content = try std.fmt.allocPrint( + self.allocator, + "{{ \"content\": \"{s}\" }}", + .{event_response}, + ); + defer self.allocator.free(response_content); + + req.transfer_encoding = .{ .content_length = response_content.len }; + try req.start(); + try req.writeAll(response_content); + try req.finish(); + try req.wait(); + } +}; + +fn getEvent(allocator: std.mem.Allocator, event_data_uri: std.Uri) !?Event { + // TODO: There is something up with using a shared client in this way + // so we're taking a perf hit in favor of stability. In a practical + // sense, without making HTTPS connections (lambda environment is + // non-ssl), this shouldn't be a big issue + var cl = std.http.Client{ .allocator = allocator }; + defer cl.deinit(); + var req = try cl.request(.GET, event_data_uri, empty_headers, .{}); + // var req = try client.?.request(.GET, event_data_uri, empty_headers, .{}); + // defer req.deinit(); + + try req.start(); + try req.finish(); + // Lambda freezes the process at this line of code. During warm start, + // the process will unfreeze and data will be sent in response to client.get + try req.wait(); + if (req.response.status != .ok) { + // Documentation says something about "exit immediately". The + // Lambda infrastrucutre restarts, so it's unclear if that's necessary. + // It seems as though a continue should be fine, and slightly faster + // std.os.exit(1); + log.err("Lambda server event response returned bad error code: {} {s}", .{ + @intFromEnum(req.response.status), + req.response.status.phrase() orelse "", + }); + return error.EventResponseNotOkResponse; + } + + var request_id: ?[]const u8 = null; + var content_length: ?usize = null; + for (req.response.headers.list.items) |h| { + if (std.ascii.eqlIgnoreCase(h.name, "Lambda-Runtime-Aws-Request-Id")) + request_id = h.value; + if (std.ascii.eqlIgnoreCase(h.name, "Content-Length")) { + content_length = std.fmt.parseUnsigned(usize, h.value, 10) catch null; + if (content_length == null) + log.warn("Error parsing content length value: '{s}'", .{h.value}); + } + // TODO: XRay uses an environment variable to do its magic. It's our + // responsibility to set this, but no zig-native setenv(3)/putenv(3) + // exists. I would kind of rather not link in libc for this, + // so we'll hold for now and think on this + // if (std.mem.indexOf(u8, h.name.value, "Lambda-Runtime-Trace-Id")) |_| + // std.process. + // std.os.setenv("AWS_LAMBDA_RUNTIME_API"); + } + if (request_id == null) { + // We can't report back an issue because the runtime error reporting endpoint + // uses request id in its path. So the best we can do is log the error and move + // on here. + log.err("Could not find request id: skipping request", .{}); + return null; + } + if (content_length == null) { + // We can't report back an issue because the runtime error reporting endpoint + // uses request id in its path. So the best we can do is log the error and move + // on here. + log.err("No content length provided for event data", .{}); + return null; + } + const req_id = request_id.?; + log.debug("got lambda request with id {s}", .{req_id}); + + var resp_payload = try std.ArrayList(u8).initCapacity(allocator, content_length.?); + defer resp_payload.deinit(); + try resp_payload.resize(content_length.?); + var response_data = try resp_payload.toOwnedSlice(); + errdefer allocator.free(response_data); + _ = try req.readAll(response_data); + + return Event.init( + allocator, + response_data, + try allocator.dupe(u8, req_id), + ); } //////////////////////////////////////////////////////////////////////// @@ -320,7 +403,7 @@ fn test_run(allocator: std.mem.Allocator, event_handler: HandlerFn) !std.Thread ); } -fn lambda_request(allocator: std.mem.Allocator, request: []const u8) ![]u8 { +fn lambda_request(allocator: std.mem.Allocator, request: []const u8, request_count: usize) ![]u8 { var arena = std.heap.ArenaAllocator.init(allocator); defer arena.deinit(); var aa = arena.allocator(); @@ -344,7 +427,7 @@ fn lambda_request(allocator: std.mem.Allocator, request: []const u8) ![]u8 { // booleans to know when to shut down. This function is designed for a // single request/response pair only - lambda_remaining_requests = 1; // in case anyone messed with this, we will make sure we start + lambda_remaining_requests = request_count; server_remaining_requests = lambda_remaining_requests.? * 2; // Lambda functions // fetch from the server, // then post back. Always @@ -379,7 +462,25 @@ test "basic request" { const expected_response = \\{ "content": "{"foo": "bar", "baz": "qux"}" } ; - const lambda_response = try lambda_request(allocator, request); + const lambda_response = try lambda_request(allocator, request, 1); + defer deinit(); + defer allocator.free(lambda_response); + try std.testing.expectEqualStrings(expected_response, lambda_response); +} + +test "several requests do not fail" { + // std.testing.log_level = .debug; + const allocator = std.testing.allocator; + const request = + \\{"foo": "bar", "baz": "qux"} + ; + + // This is what's actually coming back. Is this right? + const expected_response = + \\{ "content": "{"foo": "bar", "baz": "qux"}" } + ; + const lambda_response = try lambda_request(allocator, request, 5); + defer deinit(); defer allocator.free(lambda_response); try std.testing.expectEqualStrings(expected_response, lambda_response); }