From 4c3945e87418fece28c5205b3a68512e2796d377 Mon Sep 17 00:00:00 2001 From: Emil Lerch Date: Thu, 11 May 2023 07:15:27 -0700 Subject: [PATCH] watcher fully functional --- build.zig | 2 + src/Watch.zig | 139 ++++++++++++++++++++++++++++++++++------------- src/main-lib.zig | 2 +- src/main.zig | 110 ++++++++++++++----------------------- 4 files changed, 147 insertions(+), 106 deletions(-) diff --git a/build.zig b/build.zig index bcb6458..946a12d 100644 --- a/build.zig +++ b/build.zig @@ -72,6 +72,8 @@ pub fn build(b: *std.Build) void { .target = target, .optimize = optimize, }); + main_tests.linkLibC(); + const lib_tests = b.addTest(.{ .root_source_file = .{ .path = "src/main-lib.zig" }, .target = target, diff --git a/src/Watch.zig b/src/Watch.zig index 6932fb1..bb07194 100644 --- a/src/Watch.zig +++ b/src/Watch.zig @@ -6,12 +6,18 @@ const MAX_FDS = 1024; const Self = @This(); const log = std.log.scoped(.watch); +const Wd = struct { + wd: i32 = 0, + path: ?*const []const u8 = null, +}; + fileChanged: *const fn (usize) void, inotify_fd: ?std.os.fd_t = null, nfds_t: usize = 0, -wds: [MAX_FDS]i32 = [_]i32{0} ** MAX_FDS, -modified: [MAX_FDS]bool = [_]bool{false} ** MAX_FDS, +wds: [MAX_FDS]Wd = [_]Wd{.{}} ** MAX_FDS, +dir_nfds_t: usize = 0, +dir_wds: [MAX_FDS]Wd = [_]Wd{.{}} ** MAX_FDS, control_socket: ?std.os.socket_t = null, watch_started: bool = false, @@ -30,13 +36,14 @@ pub fn deinit(self: *Self) void { std.os.closeSocket(s); } if (self.inotify_fd) |fd| { - for (0..self.nfds_t) |inx| { - switch (std.os.errno(std.os.linux.inotify_rm_watch(fd, self.wds[inx]))) { + for (0..self.nfds_t + self.dir_nfds_t) |inx| { + const wd = if (inx < self.nfds_t) self.wds[inx].wd else self.dir_wds[inx - self.nfds_t].wd; + switch (std.os.errno(std.os.linux.inotify_rm_watch(fd, wd))) { .SUCCESS => {}, .BADF => unreachable, // NOTE: Getting EINVAL, but the call looks valid to me? // ...and wait...not all the time? - .INVAL => log.err("error removing watch (EINVAL). OS claims fd ({d}) or wd ({d}) is invalid", .{ self.inotify_fd.?, self.wds[inx] }), + .INVAL => log.err("error removing watch (EINVAL). OS claims fd ({d}) or wd ({d}) is invalid", .{ self.inotify_fd.?, wd }), else => unreachable, } } @@ -148,13 +155,12 @@ pub fn startWatch(self: *Self) void { @alignCast(@alignOf(*const std.os.linux.inotify_event), ptr), ); - self.processInotifyEvent(ev); - // Read next event from inotify ptr = @alignCast( @alignOf(std.os.linux.inotify_event), ptr + @sizeOf(std.os.linux.inotify_event) + ev.len, ); + self.processInotifyEvent(ev, ptr - ev.len); } } } @@ -175,27 +181,12 @@ fn acceptSocket(socket: std.os.socket_t) std.os.socket_t { /// This will determine whether the inotify event indicates an actionable /// change to the file, and if so, will call self.fileChanged -fn processInotifyEvent(self: *Self, ev: *const std.os.linux.inotify_event) void { +fn processInotifyEvent(self: Self, ev: *const std.os.linux.inotify_event, name_ptr: [*]u8) void { // If the file was modified, it is good to know, but not // actionable at this time. We can set a modification flag // for later use. This flag and process may be unnecessary... // how can we have a modify followed by CLOSE_NOWRITE? - // - // TODO: Delete the following - if (ev.mask & std.os.linux.IN.MODIFY == std.os.linux.IN.MODIFY) { - for (self.wds, 0..) |wd, inx| { - if (ev.wd == wd) - self.modified[inx] = true; - } - } - if (ev.mask & std.os.linux.IN.CLOSE_NOWRITE == std.os.linux.IN.CLOSE_NOWRITE) { - for (self.wds, 0..) |wd, inx| { - if (ev.wd == wd and self.modified[inx]) { - self.modified[inx] = false; - self.fileChanged(inx); - } - } - } + // There's a couple ways a file can be modified. The simplest // way is to write(), then close(). For a variety of reasons // due to safety, a lot of programs will write some temporary @@ -210,21 +201,73 @@ fn processInotifyEvent(self: *Self, ev: *const std.os.linux.inotify_event) void // THIS WILL NOT WORK in the generic sense, and ultimately // we're going to have to watch the directory as well // attrib added as build process moves in place and modifies attributes - // - // TODO: Also watch MOVED_TO, which is on the directory... - if (ev.mask & std.os.linux.IN.CLOSE_WRITE == std.os.linux.IN.CLOSE_WRITE or - ev.mask & std.os.linux.IN.ATTRIB == std.os.linux.IN.ATTRIB) + if (ev.mask & std.os.linux.IN.CLOSE_WRITE == std.os.linux.IN.CLOSE_WRITE) + // ev.mask & std.os.linux.IN.ATTRIB == std.os.linux.IN.ATTRIB) { for (self.wds, 0..) |wd, inx| { - if (ev.wd == wd) + if (ev.wd == wd.wd) { + log.debug("CLOSE_WRITE: {d}", .{wd.wd}); self.fileChanged(inx); + break; // stop looking when we found the file + } + } + } + if (ev.mask & std.os.linux.IN.MOVED_TO == std.os.linux.IN.MOVED_TO) { + // This mem.span makes me deeply uncomfortable, but is how fs.watch does it + const name = std.mem.span(@ptrCast([*:0]u8, name_ptr)); + log.debug("MOVED_TO({d}/{d}): {s}", .{ name.len, ev.len, name }); + for (self.dir_wds) |dir| { + if (ev.wd == dir.wd) { + for (self.wds, 0..) |wd, inx| { + if (inx >= self.nfds_t) { + log.info( + "file moved into watch directory but is not registered watch: {s}", + .{name}, + ); + break; + } + + log.debug( + "name '{s}', dir '{s}', basename '{s}'", + .{ name, std.fs.path.dirname(dir.path.?.*).?, wd.path.?.* }, + ); + if (nameMatch( + wd.path.?.*, + std.fs.path.dirname(dir.path.?.*).?, + name, + )) { + self.fileChanged(inx); + break; // stop looking when we found the file + } + } + break; // once we found the directory we need to stop looking + } } } } +fn nameMatch(name: []const u8, dirname: []const u8, basename: []const u8) bool { + // check total length - should be fastest fail + if (dirname.len + basename.len + 1 != name.len) return false; + // check beginning + if (!std.mem.eql(u8, dirname, name[0..dirname.len])) return false; + // check end + if (!std.mem.eql(u8, basename, name[dirname.len + 1 ..])) return false; + // check path seperator (assuming unix) + return name[dirname.len] == '/'; +} + +test "nameMatch" { + try std.testing.expect(nameMatch( + "zig-out/lib/libfaas-proxy-sample-lib.so", + "zig-out/lib", + "libfaas-proxy-sample-lib.so", + )); +} + /// adds a file to watch. The return will be a handle that will be returned /// in the fileChanged event triffered from startWatch -pub fn addFileWatch(self: *Self, path: [:0]const u8) !usize { +pub fn addFileWatch(self: *Self, path: *[:0]const u8) !usize { self.inotify_fd = self.inotify_fd orelse try std.os.inotify_init1(std.os.linux.IN.NONBLOCK); errdefer { std.os.close(self.inotify_fd.?); @@ -233,19 +276,41 @@ pub fn addFileWatch(self: *Self, path: [:0]const u8) !usize { // zig build modification pattern: open 20, close_nowrite 10, MOVED_TO (on the directory), attrib 4 // unix cp: OPEN, MODIFY, CLOSE_WRITE, ATTRIB // unix mv: MOVED_TO (on the directory) - self.wds[self.nfds_t] = try std.os.inotify_add_watchZ( - self.inotify_fd.?, - path, - std.os.linux.IN.ATTRIB | std.os.linux.IN.CLOSE | std.os.linux.IN.CLOSE_WRITE | std.os.linux.IN.MODIFY, - ); - log.debug("watch added. fd {d}, wd {d}", .{ self.inotify_fd.?, self.wds[self.nfds_t] }); - if (self.wds[self.nfds_t] == -1) + self.wds[self.nfds_t] = .{ + .wd = try std.os.inotify_add_watchZ( + self.inotify_fd.?, + path.*, + std.os.linux.IN.CLOSE_WRITE, + ), + .path = path, + }; + if (self.wds[self.nfds_t].wd == -1) @panic("could not set watch"); + log.debug("watch added. fd {d}, wd {d}. Path {s}", .{ self.inotify_fd.?, self.wds[self.nfds_t].wd, path }); self.nfds_t += 1; + try self.addDirWatch(path); if (self.watch_started) self.reloadWatch() catch @panic("could not reload watch"); return self.nfds_t - 1; } +// This will add a hidden directory watch to catch OS moves into place +fn addDirWatch(self: *Self, path: *[]const u8) !void { + const dirname = std.fs.path.dirname(path.*).?; // TODO: reimplement std.fs.path.dirname as we're getting a local in here + log.debug("addDirWatch: dir_nfds_t: {d}, dir: {s}", .{ self.dir_nfds_t, dirname }); + if (self.dir_nfds_t > 1) + for (0..self.dir_nfds_t) |inx| + if (self.dir_wds[inx].path) |p| + if (std.mem.eql(u8, std.fs.path.dirname(p.*).?, dirname)) + return; // We are already watching this directory + // We do not have a directory watch + self.dir_wds[self.dir_nfds_t] = .{ + .wd = try std.os.inotify_add_watch(self.inotify_fd.?, dirname, std.os.linux.IN.MOVED_TO), + .path = path, // we store path rather than directory because doing this without an allocator is...tough + }; + self.dir_nfds_t += 1; + log.debug("directory watch added. fd {d}, wd {d}, dir {s}", .{ self.inotify_fd.?, self.wds[self.nfds_t].wd, dirname }); +} + fn reloadWatch(self: Self) !void { try self.sendControl('c'); } diff --git a/src/main-lib.zig b/src/main-lib.zig index de78a35..fbcdb40 100644 --- a/src/main-lib.zig +++ b/src/main-lib.zig @@ -5,7 +5,7 @@ export fn serve() void { const stdout_file = std.io.getStdOut().writer(); var bw = std.io.bufferedWriter(stdout_file); const stdout = bw.writer(); - stdout.print(" 6 ", .{}) catch unreachable; + stdout.print(" 0 ", .{}) catch unreachable; bw.flush() catch unreachable; } diff --git a/src/main.zig b/src/main.zig index d7f82f9..d0ce635 100644 --- a/src/main.zig +++ b/src/main.zig @@ -1,10 +1,7 @@ const std = @import("std"); -const c = @cImport({ - @cInclude("dlfcn.h"); -}); const Watch = @import("Watch.zig"); -const serve_op = *const fn () void; +const serveFn = *const fn () void; var shutdown = false; const timeout = 250; @@ -12,121 +9,92 @@ const timeout = 250; const Executor = struct { path: [:0]const u8, library: ?*anyopaque = null, - serve: ?serve_op = null, + serve: ?serveFn = null, watch: ?usize = null, + reload_lock: bool = false, }; var executors = [_]Executor{ - .{ .path = "zig-out/lib/libfaas-proxy-sample-lib2.so" }, .{ .path = "zig-out/lib/libfaas-proxy-sample-lib.so" }, + .{ .path = "zig-out/lib/libfaas-proxy-sample-lib2.so" }, }; var watcher = Watch.init(executorChanged); - const log = std.log.scoped(.main); pub const std_options = struct { - // Set the log level to info pub const log_level = .debug; pub const log_scope_levels = &[_]std.log.ScopeLevel{ .{ .scope = .watch, .level = .info }, }; }; +const SERVE_FN_NAME = "serve"; fn serve() !void { // if (some path routing thing) { (try getExecutor(0))(); - if (inx > 4) { - if (inx % 2 == 0) - (try getExecutor(0))() - else - (try getExecutor(1))(); - } - // if (std.c.dlerror()) |_| { // TODO: use capture - // return error.CouldNotLoadSymbolServe; - // } - // TODO: only close on reload - // if (std.c.dlclose(library.?) != 0) { - // return error.CouldNotUnloadLibrary; - // } - // library = null; + // if (inx > 4) { + // if (inx % 2 == 0) + // (try getExecutor(0))() + // else + // (try getExecutor(1))(); // } } -fn getExecutor(key: usize) !serve_op { +fn getExecutor(key: usize) !serveFn { var executor = &executors[key]; if (executor.serve) |s| return s; executor.library = blk: { - if (executor.library) |l| { + if (executor.library) |l| break :blk l; - } + + while (executor.reload_lock) // system is reloading the library + std.time.sleep(1); + + if (executor.library) |l| // check again to see where we are at + break :blk l; + log.info("library {s} requested but not loaded. Loading library", .{executor.path}); const l = try dlopen(executor.path); errdefer if (std.c.dlclose(l) != 0) @panic("System unstable: Error after library open and cannot close"); - executor.watch = executor.watch orelse try watcher.addFileWatch(executor.path); + executor.watch = executor.watch orelse try watcher.addFileWatch(&executor.path); break :blk l; }; // std.c.dlerror(); - const serve_function = std.c.dlsym(executor.library.?, "serve"); + const serve_function = std.c.dlsym(executor.library.?, SERVE_FN_NAME); if (serve_function == null) return error.CouldNotLoadSymbolServe; - executor.serve = @ptrCast(serve_op, serve_function.?); + executor.serve = @ptrCast(serveFn, serve_function.?); return executor.serve.?; } -// This works -// fn executorChanged(watch: usize) void { -// log.debug("executor changed event", .{}); -// for (&executors) |*executor| { -// if (executor.watch) |w| { -// if (w == watch) { -// if (executor.library) |l| { -// log.info("library {s} changed. Unloading library", .{executor.path}); -// // TODO: These two lines could introduce a race. Right now that would mean a panic -// executor.serve = null; -// if (std.c.dlclose(l) != 0) -// @panic("System unstable: Error after library open and cannot close"); -// } -// executor.library = null; -// executor.serve = null; -// // NOTE: Would love to reload the library here, but that action -// // does not seem to be thread safe -// } -// } -// } -// } - -// NOTE: this will be on a different thread. This code does not work, and I -// am fairly certain it is because we can't share a function pointer between -// threads fn executorChanged(watch: usize) void { + // NOTE: This will be called off the main thread log.debug("executor with watch {d} changed", .{watch}); for (&executors) |*executor| { if (executor.watch) |w| { if (w == watch) { if (executor.library) |l| { - log.debug("reloading executor at path: {s}", .{executor.path}); - const newlib = dlopen(executor.path) catch { + executor.reload_lock = true; + defer executor.reload_lock = false; + + if (std.c.dlclose(l) != 0) + @panic("System unstable: Error after library open and cannot close"); + log.debug("closed old library. reloading executor at: {s}", .{executor.path}); + executor.library = dlopen(executor.path) catch { log.warn("could not reload! error opening library", .{}); return; }; - errdefer if (std.c.dlclose(newlib) != 0) - @panic("System unstable: Error after library open and cannot close"); - const serve_function = std.c.dlsym(newlib, "serve"); - if (serve_function == null) { + executor.serve = @ptrCast(serveFn, std.c.dlsym(executor.library.?, SERVE_FN_NAME)); + if (executor.serve == null) { log.warn("could not reload! error finding symbol", .{}); + if (std.c.dlclose(executor.library.?) != 0) + @panic("System unstable: Error after library open and cannot close"); return; } - // new lib all loaded up - do the swap and close the old - log.debug("updating function and library", .{}); - executor.serve = @ptrCast(serve_op, serve_function.?); - executor.library = newlib; - if (std.c.dlclose(l) != 0) - @panic("System unstable: Error after library open and cannot close"); - log.debug("closed old library", .{}); } } } @@ -135,7 +103,7 @@ fn executorChanged(watch: usize) void { fn dlopen(path: [:0]const u8) !*anyopaque { // We need now (and local) because we're about to call it - const lib = std.c.dlopen(path, c.RTLD_NOW); + const lib = std.c.dlopen(path, std.c.RTLD.NOW); if (lib) |l| return l; return error.CouldNotOpenDynamicLibrary; } @@ -180,7 +148,13 @@ pub fn main() !void { shutdown = true; watcher_thread.join(); } - +test { + // To run nested container tests, either, call `refAllDecls` which will + // reference all declarations located in the given argument. + // `@This()` is a builtin function that returns the innermost container it is called from. + // In this example, the innermost container is this file (implicitly a struct). + std.testing.refAllDecls(@This()); +} test "simple test" { var list = std.ArrayList(i32).init(std.testing.allocator); defer list.deinit(); // try commenting this out and see if zig detects the memory leak!