From 5e09b735c6be4d64dc2c15bb4bdb595c371877eb Mon Sep 17 00:00:00 2001 From: Emil Lerch Date: Tue, 9 May 2023 09:18:38 -0700 Subject: [PATCH] add file watcher --- build.zig | 5 ++ src/Watch.zig | 126 ++++++++++++++++++++++++++++++++++++++++++ src/main-lib.zig | 8 +++ src/main.zig | 140 ++++++++++++++++++++++++++++++++++++++++++++++- 4 files changed, 277 insertions(+), 2 deletions(-) create mode 100644 src/Watch.zig diff --git a/build.zig b/build.zig index d96cad2..bcb6458 100644 --- a/build.zig +++ b/build.zig @@ -24,6 +24,8 @@ pub fn build(b: *std.Build) void { .optimize = optimize, }); + exe.linkLibC(); + const lib = b.addSharedLibrary(.{ .name = "faas-proxy-sample-lib", // In this case the main source file is merely a path, however, in more @@ -60,6 +62,9 @@ pub fn build(b: *std.Build) void { run_cmd.addArgs(args); } + const run_step = b.step("run", "Run the app"); + run_step.dependOn(&run_cmd.step); + // Creates a step for unit testing. This only builds the test executable // but does not run it. const main_tests = b.addTest(.{ diff --git a/src/Watch.zig b/src/Watch.zig new file mode 100644 index 0000000..c1a051e --- /dev/null +++ b/src/Watch.zig @@ -0,0 +1,126 @@ +const builtin = @import("builtin"); +const std = @import("std"); +const c = @cImport({ + @cInclude("poll.h"); +}); + +const MAX_FDS = 1024; + +const Self = @This(); + +fileChanged: *const fn (usize) void, +inotify_fd: ?std.os.fd_t = null, +// sizeof(std.os.pollfd) == 8, so this is 8k +// fds: [MAX_FDS]std.os.pollfd = [_]std.os.pollfd{.{ .fd = 0, .events = 0, .revents = 0 }} ** MAX_FDS, +nfds_t: usize = 0, +wds: [MAX_FDS]i32 = [_]i32{0} ** MAX_FDS, +modified: [MAX_FDS]bool = [_]bool{false} ** MAX_FDS, + +pub fn init(file_changed: *const fn (usize) void) Self { + if (builtin.os.tag != .linux) + @compileError("Unsupported OS"); + return .{ + .fileChanged = file_changed, + }; +} + +pub fn deinit(self: *Self) void { + if (self.inotify_fd) |fd| { + for (self.wds) |wd| { + const rc = std.os.linux.inotify_rm_watch(fd, wd); + // Errno can only be EBADF, EINVAL if either the inotify fs or the wd are invalid + std.debug.assert(rc == 0); + } + std.os.close(fd); + } +} + +pub fn watchFds(self: *Self) void { + while (true) { + if (self.nfds_t == 0) { + std.time.sleep(250); + continue; + } + var fds = &[_]std.os.pollfd{.{ .fd = self.inotify_fd.?, .events = c.POLLIN, .revents = 0 }}; + // NOTE: There is a std.io.poll that provides a higher level abstraction. + // However, this API is strictly related to the use case of an open stream + // for which we are awaiting data. In this case, we are polling for + // an inotify event, for which no abstraction currently exists + // + // std.fs.watch looks really good...but it requires event based I/O, + // which is not yet ready to be (re)added. + if ((std.os.poll( + fds, + -1, // Infinite timeout + ) catch @panic("poll error")) > 0) { + if (fds[0].revents & c.POLLIN == c.POLLIN) { // POLLIN means "there is data to read" + var event_buf: [4096]u8 align(@alignOf(std.os.linux.inotify_event)) = undefined; + // "borrowed" from https://ziglang.org/documentation/master/std/src/std/fs/watch.zig.html#L588 + const bytes_read = std.os.read(self.inotify_fd.?, &event_buf) catch unreachable; + + var ptr: [*]u8 = &event_buf; + const end_ptr = ptr + bytes_read; + while (@ptrToInt(ptr) < @ptrToInt(end_ptr)) { + const ev = @ptrCast( + *const std.os.linux.inotify_event, + @alignCast(@alignOf(*const std.os.linux.inotify_event), ptr), + ); + + if (ev.mask & std.os.linux.IN.MODIFY == std.os.linux.IN.MODIFY) { + for (self.wds, 0..) |wd, inx| { + if (ev.wd == wd) + self.modified[inx] = true; + } + } + // attrib added as build process moves in place and modifies attributes + // TODO: Also watch MOVED_TO, which is on the directory... + if (ev.mask & std.os.linux.IN.CLOSE_WRITE == std.os.linux.IN.CLOSE_WRITE or + ev.mask & std.os.linux.IN.ATTRIB == std.os.linux.IN.ATTRIB) + { + for (self.wds, 0..) |wd, inx| { + if (ev.wd == wd) + self.fileChanged(inx); + } + } + if (ev.mask & std.os.linux.IN.CLOSE_NOWRITE == std.os.linux.IN.CLOSE_NOWRITE) { + for (self.wds, 0..) |wd, inx| { + if (ev.wd == wd and self.modified[inx]) { + self.modified[inx] = false; + self.fileChanged(inx); + } + } + } + // see man 2 poll + self.handleFile(fds[0]); + ptr = @alignCast( + @alignOf(std.os.linux.inotify_event), + ptr + @sizeOf(std.os.linux.inotify_event) + ev.len, + ); + } + } + } + } +} + +pub fn addFileWatch(self: *Self, path: [:0]const u8) !usize { + self.inotify_fd = self.inotify_fd orelse try std.os.inotify_init1(std.os.linux.IN.NONBLOCK); + errdefer { + std.os.close(self.inotify_fd.?); + self.inotify_fd = null; + } + // open 20, close_norite 10, attrib 4 + self.wds[self.nfds_t] = try std.os.inotify_add_watchZ( + self.inotify_fd.?, + path, + std.os.linux.IN.ATTRIB | std.os.linux.IN.CLOSE | std.os.linux.IN.CLOSE_WRITE | std.os.linux.IN.MODIFY, + ); + if (self.wds[self.nfds_t] == -1) + @panic("could not set watch"); + self.nfds_t += 1; + return self.nfds_t - 1; +} + +fn handleFile(self: Self, fd: std.os.pollfd) void { + _ = fd; + _ = self; +} diff --git a/src/main-lib.zig b/src/main-lib.zig index ecfeade..3c1f1a5 100644 --- a/src/main-lib.zig +++ b/src/main-lib.zig @@ -1,6 +1,14 @@ const std = @import("std"); const testing = std.testing; +export fn serve() void { + const stdout_file = std.io.getStdOut().writer(); + var bw = std.io.bufferedWriter(stdout_file); + const stdout = bw.writer(); + stdout.print(" 3 ", .{}) catch unreachable; + bw.flush() catch unreachable; +} + export fn add(a: i32, b: i32) i32 { return a + b; } diff --git a/src/main.zig b/src/main.zig index c8a3f67..8c33e33 100644 --- a/src/main.zig +++ b/src/main.zig @@ -1,8 +1,125 @@ const std = @import("std"); +const c = @cImport({ + @cInclude("dlfcn.h"); +}); + +const Watch = @import("Watch.zig"); +const serve_op = *const fn () void; + +var shutdown = false; +const timeout = 250; + +const Executor = struct { + path: [:0]const u8, + library: ?*anyopaque = null, + serve: ?serve_op = null, + watch: ?usize = null, +}; + +var executors = [_]Executor{.{ + .path = "/home/lobo/home/faas-proxy/zig-out/lib/libfaas-proxy-sample-lib.so", +}}; + +var watcher = Watch.init(executorChanged); + +fn serve() !void { + // if (some path routing thing) { + + (try getExecutor(0))(); + // if (std.c.dlerror()) |_| { // TODO: use capture + // return error.CouldNotLoadSymbolServe; + // } + // TODO: only close on reload + // if (std.c.dlclose(library.?) != 0) { + // return error.CouldNotUnloadLibrary; + // } + // library = null; + // } +} +fn getExecutor(key: usize) !serve_op { + var executor = &executors[key]; + if (executor.serve) |s| return s; + + executor.library = blk: { + if (executor.library) |l| { + break :blk l; + } + const l = try dlopen(executor.path); + errdefer if (std.c.dlclose(l) != 0) + @panic("System unstable: Error after library open and cannot close"); + executor.watch = try watcher.addFileWatch(executor.path); + break :blk l; + }; + + // std.c.dlerror(); + const serve_function = std.c.dlsym(executor.library.?, "serve"); + if (serve_function == null) return error.CouldNotLoadSymbolServe; + + executor.serve = @ptrCast(serve_op, serve_function.?); + return executor.serve.?; +} + +// This works +fn executorChanged(watch: usize) void { + for (&executors) |*executor| { + if (executor.watch) |w| { + if (w == watch) { + if (executor.library) |l| { + // TODO: These two lines could introduce a race. Right now that would mean a panic + executor.serve = null; + if (std.c.dlclose(l) != 0) + @panic("System unstable: Error after library open and cannot close"); + } + executor.library = null; + executor.serve = null; + } + } + } +} + +// NOTE: this will be on a different thread. This code does not work, and I +// am fairly certain it is because we can't share a function pointer between +// threads +// fn executorChanged(watch: usize) void { +// std.debug.print("executor with watch {d} changed\n", .{watch}); +// for (&executors) |*executor| { +// if (executor.watch) |w| { +// if (w == watch) { +// if (executor.library) |l| { +// std.debug.print("reloading executor at path: {s}\n", .{executor.path}); +// const newlib = dlopen(executor.path) catch { +// std.debug.print("could not reload! error opening library\n", .{}); +// return; +// }; +// errdefer if (std.c.dlclose(newlib) != 0) +// @panic("System unstable: Error after library open and cannot close"); +// const serve_function = std.c.dlsym(newlib, "serve"); +// if (serve_function == null) { +// std.debug.print("could not reload! error finding symbol\n", .{}); +// return; +// } +// // new lib all loaded up - do the swap and close the old +// std.debug.print("updating function and library\n", .{}); +// executor.serve = @ptrCast(serve_op, serve_function.?); +// executor.library = newlib; +// if (std.c.dlclose(l) != 0) +// @panic("System unstable: Error after library open and cannot close"); +// std.debug.print("closed old library\n", .{}); +// } +// } +// } +// } +// } + +fn dlopen(path: [:0]const u8) !*anyopaque { + // We need now (and local) because we're about to call it + const lib = std.c.dlopen(path, c.RTLD_NOW); + if (lib) |l| return l; + return error.CouldNotOpenDynamicLibrary; +} pub fn main() !void { - // Prints to stderr (it's a shortcut based on `std.io.getStdErr()`) - std.debug.print("All your {s} are belong to us.\n", .{"codebase"}); + defer watcher.deinit(); // stdout is for the actual output of your application, for example if you // are implementing gzip, then only the compressed bytes should be sent to @@ -11,9 +128,28 @@ pub fn main() !void { var bw = std.io.bufferedWriter(stdout_file); const stdout = bw.writer(); + const stderr_file = std.io.getStdErr().writer(); + var bw_stderr = std.io.bufferedWriter(stderr_file); + const stderr = bw_stderr.writer(); + try stdout.print("Run `zig build test` to run the tests.\n", .{}); try bw.flush(); // don't forget to flush! + const watcher_thread = try std.Thread.spawn(.{}, Watch.watchFds, .{&watcher}); + + while (true) { + std.time.sleep(std.time.ns_per_s * 2); + try stdout.print("Serving...", .{}); + try bw.flush(); + serve() catch |err| { + try stderr.print("Error serving request ({any})\n", .{err}); + try bw_stderr.flush(); + }; + try stdout.print("served\n", .{}); + try bw.flush(); + } + shutdown = true; + watcher_thread.join(); } test "simple test" {