From 4a1548b4be9b2163c3237a1863fba1bd616c7cc7 Mon Sep 17 00:00:00 2001 From: Emil Lerch Date: Wed, 10 May 2023 15:05:44 -0700 Subject: [PATCH] add control channel through socket --- src/Watch.zig | 180 ++++++++++++++++++++++++++++++++++++++++++++--- src/main-lib.zig | 2 +- src/main.zig | 53 ++++++++++++-- 3 files changed, 220 insertions(+), 15 deletions(-) diff --git a/src/Watch.zig b/src/Watch.zig index 42a9914..6932fb1 100644 --- a/src/Watch.zig +++ b/src/Watch.zig @@ -1,12 +1,10 @@ const builtin = @import("builtin"); const std = @import("std"); -const c = @cImport({ - @cInclude("poll.h"); -}); const MAX_FDS = 1024; const Self = @This(); +const log = std.log.scoped(.watch); fileChanged: *const fn (usize) void, inotify_fd: ?std.os.fd_t = null, @@ -14,6 +12,8 @@ inotify_fd: ?std.os.fd_t = null, nfds_t: usize = 0, wds: [MAX_FDS]i32 = [_]i32{0} ** MAX_FDS, modified: [MAX_FDS]bool = [_]bool{false} ** MAX_FDS, +control_socket: ?std.os.socket_t = null, +watch_started: bool = false, pub fn init(file_changed: *const fn (usize) void) Self { if (builtin.os.tag != .linux) @@ -24,16 +24,31 @@ pub fn init(file_changed: *const fn (usize) void) Self { } pub fn deinit(self: *Self) void { + if (self.control_socket) |s| { + // Sockets...where Unix still pretends everything is a file, but it's not... + log.debug("closing control socket", .{}); + std.os.closeSocket(s); + } if (self.inotify_fd) |fd| { - for (self.wds) |wd| { - const rc = std.os.linux.inotify_rm_watch(fd, wd); - // Errno can only be EBADF, EINVAL if either the inotify fs or the wd are invalid - std.debug.assert(rc == 0); + for (0..self.nfds_t) |inx| { + switch (std.os.errno(std.os.linux.inotify_rm_watch(fd, self.wds[inx]))) { + .SUCCESS => {}, + .BADF => unreachable, + // NOTE: Getting EINVAL, but the call looks valid to me? + // ...and wait...not all the time? + .INVAL => log.err("error removing watch (EINVAL). OS claims fd ({d}) or wd ({d}) is invalid", .{ self.inotify_fd.?, self.wds[inx] }), + else => unreachable, + } } std.os.close(fd); } + const cwd = std.fs.cwd(); + cwd.deleteFileZ(SOCK_NAME) catch |e| + log.err("error removing socket file " ++ SOCK_NAME ++ ": {any}", .{e}); } +const SOCK_NAME = "S.watch-control"; + /// starts the file watch. This function will not return, so it is best /// to put this function in its own thread: /// @@ -43,12 +58,29 @@ pub fn deinit(self: *Self) void { /// well if files are added after the watch begins. A method for doing this /// is intended later pub fn startWatch(self: *Self) void { + if (self.control_socket == null) + self.addControlSocket(SOCK_NAME) catch @panic("could not add control socket"); + std.debug.assert(self.control_socket != null); + while (true) { if (self.nfds_t == 0) { std.time.sleep(250); continue; } - var fds = &[_]std.os.pollfd{.{ .fd = self.inotify_fd.?, .events = c.POLLIN, .revents = 0 }}; + self.watch_started = true; + + var fds = if (self.inotify_fd == null) + &[_]std.os.pollfd{.{ .fd = self.control_socket.?, .events = std.os.POLL.IN, .revents = undefined }} + else + &[_]std.os.pollfd{ + .{ .fd = self.control_socket.?, .events = std.os.POLL.IN, .revents = undefined }, + .{ .fd = self.inotify_fd.?, .events = std.os.POLL.IN, .revents = undefined }, + }; + + const control_fd_inx = 0; + const inotify_fd_inx = 1; + + // // NOTE: There is a std.io.poll that provides a higher level abstraction. // However, this API is strictly related to the use case of an open stream // for which we are awaiting data. In this case, we are polling for @@ -56,14 +88,54 @@ pub fn startWatch(self: *Self) void { // // std.fs.watch looks really good...but it requires event based I/O, // which is not yet ready to be (re)added. + log.debug("tid={d} start poll with {d} fds", .{ std.Thread.getCurrentId(), fds.len }); if ((std.os.poll( fds, -1, // Infinite timeout ) catch @panic("poll error")) > 0) { + if (fds[control_fd_inx].revents & std.os.POLL.IN == std.os.POLL.IN) { // POLLIN means "there is data to read" + log.debug("tid={d} control event", .{std.Thread.getCurrentId()}); + // we only need one byte for what we're doing + var control_buf: [1]u8 = undefined; - // fds[0] is inotify, so if we have data in that file descriptor, + // self.control_socket_accepted_fd = self.control_socket_accepted_fd orelse acceptSocket(self.control_socket.?); + // const fd = self.control_socket_accepted_fd.?; // let's save some typing + const fd = acceptSocket(self.control_socket.?); + defer std.os.close(fd); + + var readcount = std.os.recv(fd, &control_buf, 0) catch unreachable; + // var other_buf: [1]u8 = undefined; + // if (std.os.recv(fd, &other_buf, 0) catch unreachable != 0) + // @panic("socket contains more data than expected"); + log.debug("read {d} bytes from socket: {s}", .{ readcount, std.fmt.fmtSliceHexLower(control_buf[0..readcount]) }); + if (readcount == 0) { + // then what? + log.err("tid={d} control socket with POLL.IN but no data?", .{std.Thread.getCurrentId()}); + @panic("control event received but no data available"); + } + + switch (control_buf[0]) { + 'c' => { + log.info("tid={d} continue command (reload) received on control socket", .{std.Thread.getCurrentId()}); + continue; + }, + 'q' => { + log.info("tid={d} quit command received on control socket", .{std.Thread.getCurrentId()}); + self.watch_started = false; + return; + }, + else => { + log.err("tid={d} Unexpected command on control socket 0x{x}", .{ std.Thread.getCurrentId(), control_buf[0] }); + if (control_buf[0] == 0xaa) // we are in a world of hurt - panic + @panic("seems like a buffer overrun!"); + }, + } + } + + // fds[1] is inotify, so if we have data in that file descriptor, // we can force the data into an inotify_event structure and act on it - if (fds[0].revents & c.POLLIN == c.POLLIN) { // POLLIN means "there is data to read" + if (self.inotify_fd != null and fds[inotify_fd_inx].revents & std.os.POLL.IN == std.os.POLL.IN) { + log.debug("tid={d} inotify event", .{std.Thread.getCurrentId()}); var event_buf: [4096]u8 align(@alignOf(std.os.linux.inotify_event)) = undefined; // "borrowed" from https://ziglang.org/documentation/master/std/src/std/fs/watch.zig.html#L588 const bytes_read = std.os.read(self.inotify_fd.?, &event_buf) catch unreachable; @@ -76,6 +148,8 @@ pub fn startWatch(self: *Self) void { @alignCast(@alignOf(*const std.os.linux.inotify_event), ptr), ); + self.processInotifyEvent(ev); + // Read next event from inotify ptr = @alignCast( @alignOf(std.os.linux.inotify_event), @@ -87,6 +161,18 @@ pub fn startWatch(self: *Self) void { } } +fn acceptSocket(socket: std.os.socket_t) std.os.socket_t { + var sockaddr = std.net.Address.initUnix(SOCK_NAME) catch @panic("could not get sockaddr"); + var sockaddr_len: std.os.socklen_t = sockaddr.getOsSockLen(); + log.debug("tid={d} accepting on socket fd {d}", .{ std.Thread.getCurrentId(), socket }); + return std.os.accept( + socket, + &sockaddr.any, + &sockaddr_len, + 0, + ) catch @panic("could not accept connection"); +} + /// This will determine whether the inotify event indicates an actionable /// change to the file, and if so, will call self.fileChanged fn processInotifyEvent(self: *Self, ev: *const std.os.linux.inotify_event) void { @@ -152,8 +238,82 @@ pub fn addFileWatch(self: *Self, path: [:0]const u8) !usize { path, std.os.linux.IN.ATTRIB | std.os.linux.IN.CLOSE | std.os.linux.IN.CLOSE_WRITE | std.os.linux.IN.MODIFY, ); + log.debug("watch added. fd {d}, wd {d}", .{ self.inotify_fd.?, self.wds[self.nfds_t] }); if (self.wds[self.nfds_t] == -1) @panic("could not set watch"); self.nfds_t += 1; + if (self.watch_started) self.reloadWatch() catch @panic("could not reload watch"); return self.nfds_t - 1; } + +fn reloadWatch(self: Self) !void { + try self.sendControl('c'); +} + +pub fn stopWatch(self: Self) !void { + try self.sendControl('q'); +} + +fn sendControl(self: Self, control: u8) !void { + // Sockets...where Unix still pretends everything is a file, but it's not... + // + // For client processing, there are a bunch of steps, but the zig stdlib + // saves us a bunch of work. Once we do std.net.connectUnixSocket(), we + // get a stream back that has reader() and writer() calls + // + // log.debug("request to send control 0x{x}", .{control}); + if (self.control_socket == null) return; // nothing to do + // log.debug("tid={d} opening stream", .{std.Thread.getCurrentId()}); + var stream = try std.net.connectUnixSocket(SOCK_NAME); + defer stream.close(); + log.debug("tid={d} sending control 0x{x} on socket fd={d}", .{ std.Thread.getCurrentId(), control, stream.handle }); + try stream.writer().writeByte(control); +} + +/// creates a control socket. This allows for managing the watcher. With it, +/// you can gracefully terminate the process and you can add files after the fact +fn addControlSocket(self: *Self, path: [:0]const u8) !void { + // This function theoretically should work without requiring linux...except this inotify call, + // which is completely linux specific + self.inotify_fd = self.inotify_fd orelse try std.os.inotify_init1(std.os.linux.IN.NONBLOCK); + log.debug("Established inotify file descriptor {d}", .{self.inotify_fd.?}); + errdefer { + std.os.close(self.inotify_fd.?); + self.inotify_fd = null; + } + // this should work on all systems theoretically, but I believe would work only + // on *nix systems + // + // Sockets...where Unix still pretends everything is a file, but it's not... + // We'll create a unix socket, which looks like a file on the file system + // + // For client processing, see comments in the sendControl function + // + // From the "server" perspective, we need to to this initially: + // 1. std.os.socket: create the socket. This file descriptor should be used in poll(2) calls + // 2. std.os.bind: tell the system where the socket is (here, it's the filesystem path) + // 3. std.os.listen: tell the system how many simultaneous connections we can have + // + // At this point, clients can write to the socket (but that's not typical fs ops either) + // To read from the socket, we need to: + // + // 4. std.os.accept: create a file descriptor from the socket descriptor + // 5. std.os.recv: works just like read(2). Call lots + // 6. std.os.close: close the fd + // + // On end of use, we need to std.os.closeSocket() + const sock = try std.os.socket( + std.os.linux.AF.LOCAL, + std.os.linux.SOCK.STREAM | std.os.SOCK.CLOEXEC, + 0, + ); + errdefer std.os.closeSocket(sock); + + const sockaddr = try std.net.Address.initUnix(path); + + // TODO: If this bind fails with EADDRINUSE we can probably delete the existing file + try std.os.bind(sock, &sockaddr.any, sockaddr.getOsSockLen()); + try std.os.listen(sock, 10); + self.control_socket = sock; + log.debug("added control socket with fd={d}", .{sock}); +} diff --git a/src/main-lib.zig b/src/main-lib.zig index 3c1f1a5..de78a35 100644 --- a/src/main-lib.zig +++ b/src/main-lib.zig @@ -5,7 +5,7 @@ export fn serve() void { const stdout_file = std.io.getStdOut().writer(); var bw = std.io.bufferedWriter(stdout_file); const stdout = bw.writer(); - stdout.print(" 3 ", .{}) catch unreachable; + stdout.print(" 6 ", .{}) catch unreachable; bw.flush() catch unreachable; } diff --git a/src/main.zig b/src/main.zig index 9422810..9b02a19 100644 --- a/src/main.zig +++ b/src/main.zig @@ -16,16 +16,49 @@ const Executor = struct { watch: ?usize = null, }; -var executors = [_]Executor{.{ - .path = "/home/lobo/home/faas-proxy/zig-out/lib/libfaas-proxy-sample-lib.so", -}}; +var executors = [_]Executor{ + .{ .path = "zig-out/lib/libfaas-proxy-sample-lib2.so" }, + .{ .path = "zig-out/lib/libfaas-proxy-sample-lib.so" }, +}; var watcher = Watch.init(executorChanged); +const log = std.log.scoped(.main); +pub const std_options = struct { + // Set the log level to info + pub const log_level = .info; + + // Define logFn to override the std implementation + pub const logFn = myLogFn; +}; + +pub fn myLogFn( + comptime level: std.log.Level, + comptime scope: @TypeOf(.EnumLiteral), + comptime format: []const u8, + args: anytype, +) void { + // Ignore all non-error logging from sources other than + // .my_project, .nice_library and the default + switch (scope) { + .watch => if (@enumToInt(level) >= @enumToInt(std.log.Level.debug)) + return, // Kill debug messages + else => {}, + } + + std.log.defaultLog(level, scope, format, args); +} + fn serve() !void { // if (some path routing thing) { (try getExecutor(0))(); + if (inx > 4) { + if (inx % 2 == 0) + (try getExecutor(0))() + else + (try getExecutor(1))(); + } // if (std.c.dlerror()) |_| { // TODO: use capture // return error.CouldNotLoadSymbolServe; // } @@ -44,10 +77,11 @@ fn getExecutor(key: usize) !serve_op { if (executor.library) |l| { break :blk l; } + log.info("library {s} requested but not loaded. Loading library", .{executor.path}); const l = try dlopen(executor.path); errdefer if (std.c.dlclose(l) != 0) @panic("System unstable: Error after library open and cannot close"); - executor.watch = try watcher.addFileWatch(executor.path); + executor.watch = executor.watch orelse try watcher.addFileWatch(executor.path); break :blk l; }; @@ -61,10 +95,12 @@ fn getExecutor(key: usize) !serve_op { // This works fn executorChanged(watch: usize) void { + log.debug("executor changed event", .{}); for (&executors) |*executor| { if (executor.watch) |w| { if (w == watch) { if (executor.library) |l| { + log.info("library {s} changed. Unloading library", .{executor.path}); // TODO: These two lines could introduce a race. Right now that would mean a panic executor.serve = null; if (std.c.dlclose(l) != 0) @@ -72,6 +108,8 @@ fn executorChanged(watch: usize) void { } executor.library = null; executor.serve = null; + // NOTE: Would love to reload the library here, but that action + // does not seem to be thread safe } } } @@ -118,6 +156,7 @@ fn dlopen(path: [:0]const u8) !*anyopaque { return error.CouldNotOpenDynamicLibrary; } +var inx: usize = 0; pub fn main() !void { defer watcher.deinit(); @@ -139,6 +178,12 @@ pub fn main() !void { while (true) { std.time.sleep(std.time.ns_per_s * 2); + inx += 1; + if (inx == 10) { + log.debug("forcing stop to make sure it works", .{}); + try watcher.stopWatch(); + break; + } try stdout.print("Serving...", .{}); try bw.flush(); serve() catch |err| {