watcher fully functional

This commit is contained in:
Emil Lerch 2023-05-11 07:15:27 -07:00
parent 2f349f2e50
commit 4c3945e874
Signed by: lobo
GPG Key ID: A7B62D657EF764F8
4 changed files with 147 additions and 106 deletions

View File

@ -72,6 +72,8 @@ pub fn build(b: *std.Build) void {
.target = target,
.optimize = optimize,
});
main_tests.linkLibC();
const lib_tests = b.addTest(.{
.root_source_file = .{ .path = "src/main-lib.zig" },
.target = target,

View File

@ -6,12 +6,18 @@ const MAX_FDS = 1024;
const Self = @This();
const log = std.log.scoped(.watch);
const Wd = struct {
wd: i32 = 0,
path: ?*const []const u8 = null,
};
fileChanged: *const fn (usize) void,
inotify_fd: ?std.os.fd_t = null,
nfds_t: usize = 0,
wds: [MAX_FDS]i32 = [_]i32{0} ** MAX_FDS,
modified: [MAX_FDS]bool = [_]bool{false} ** MAX_FDS,
wds: [MAX_FDS]Wd = [_]Wd{.{}} ** MAX_FDS,
dir_nfds_t: usize = 0,
dir_wds: [MAX_FDS]Wd = [_]Wd{.{}} ** MAX_FDS,
control_socket: ?std.os.socket_t = null,
watch_started: bool = false,
@ -30,13 +36,14 @@ pub fn deinit(self: *Self) void {
std.os.closeSocket(s);
}
if (self.inotify_fd) |fd| {
for (0..self.nfds_t) |inx| {
switch (std.os.errno(std.os.linux.inotify_rm_watch(fd, self.wds[inx]))) {
for (0..self.nfds_t + self.dir_nfds_t) |inx| {
const wd = if (inx < self.nfds_t) self.wds[inx].wd else self.dir_wds[inx - self.nfds_t].wd;
switch (std.os.errno(std.os.linux.inotify_rm_watch(fd, wd))) {
.SUCCESS => {},
.BADF => unreachable,
// NOTE: Getting EINVAL, but the call looks valid to me?
// ...and wait...not all the time?
.INVAL => log.err("error removing watch (EINVAL). OS claims fd ({d}) or wd ({d}) is invalid", .{ self.inotify_fd.?, self.wds[inx] }),
.INVAL => log.err("error removing watch (EINVAL). OS claims fd ({d}) or wd ({d}) is invalid", .{ self.inotify_fd.?, wd }),
else => unreachable,
}
}
@ -148,13 +155,12 @@ pub fn startWatch(self: *Self) void {
@alignCast(@alignOf(*const std.os.linux.inotify_event), ptr),
);
self.processInotifyEvent(ev);
// Read next event from inotify
ptr = @alignCast(
@alignOf(std.os.linux.inotify_event),
ptr + @sizeOf(std.os.linux.inotify_event) + ev.len,
);
self.processInotifyEvent(ev, ptr - ev.len);
}
}
}
@ -175,27 +181,12 @@ fn acceptSocket(socket: std.os.socket_t) std.os.socket_t {
/// This will determine whether the inotify event indicates an actionable
/// change to the file, and if so, will call self.fileChanged
fn processInotifyEvent(self: *Self, ev: *const std.os.linux.inotify_event) void {
fn processInotifyEvent(self: Self, ev: *const std.os.linux.inotify_event, name_ptr: [*]u8) void {
// If the file was modified, it is good to know, but not
// actionable at this time. We can set a modification flag
// for later use. This flag and process may be unnecessary...
// how can we have a modify followed by CLOSE_NOWRITE?
//
// TODO: Delete the following
if (ev.mask & std.os.linux.IN.MODIFY == std.os.linux.IN.MODIFY) {
for (self.wds, 0..) |wd, inx| {
if (ev.wd == wd)
self.modified[inx] = true;
}
}
if (ev.mask & std.os.linux.IN.CLOSE_NOWRITE == std.os.linux.IN.CLOSE_NOWRITE) {
for (self.wds, 0..) |wd, inx| {
if (ev.wd == wd and self.modified[inx]) {
self.modified[inx] = false;
self.fileChanged(inx);
}
}
}
// There's a couple ways a file can be modified. The simplest
// way is to write(), then close(). For a variety of reasons
// due to safety, a lot of programs will write some temporary
@ -210,21 +201,73 @@ fn processInotifyEvent(self: *Self, ev: *const std.os.linux.inotify_event) void
// THIS WILL NOT WORK in the generic sense, and ultimately
// we're going to have to watch the directory as well
// attrib added as build process moves in place and modifies attributes
//
// TODO: Also watch MOVED_TO, which is on the directory...
if (ev.mask & std.os.linux.IN.CLOSE_WRITE == std.os.linux.IN.CLOSE_WRITE or
ev.mask & std.os.linux.IN.ATTRIB == std.os.linux.IN.ATTRIB)
if (ev.mask & std.os.linux.IN.CLOSE_WRITE == std.os.linux.IN.CLOSE_WRITE)
// ev.mask & std.os.linux.IN.ATTRIB == std.os.linux.IN.ATTRIB)
{
for (self.wds, 0..) |wd, inx| {
if (ev.wd == wd)
if (ev.wd == wd.wd) {
log.debug("CLOSE_WRITE: {d}", .{wd.wd});
self.fileChanged(inx);
break; // stop looking when we found the file
}
}
}
if (ev.mask & std.os.linux.IN.MOVED_TO == std.os.linux.IN.MOVED_TO) {
// This mem.span makes me deeply uncomfortable, but is how fs.watch does it
const name = std.mem.span(@ptrCast([*:0]u8, name_ptr));
log.debug("MOVED_TO({d}/{d}): {s}", .{ name.len, ev.len, name });
for (self.dir_wds) |dir| {
if (ev.wd == dir.wd) {
for (self.wds, 0..) |wd, inx| {
if (inx >= self.nfds_t) {
log.info(
"file moved into watch directory but is not registered watch: {s}",
.{name},
);
break;
}
log.debug(
"name '{s}', dir '{s}', basename '{s}'",
.{ name, std.fs.path.dirname(dir.path.?.*).?, wd.path.?.* },
);
if (nameMatch(
wd.path.?.*,
std.fs.path.dirname(dir.path.?.*).?,
name,
)) {
self.fileChanged(inx);
break; // stop looking when we found the file
}
}
break; // once we found the directory we need to stop looking
}
}
}
}
fn nameMatch(name: []const u8, dirname: []const u8, basename: []const u8) bool {
// check total length - should be fastest fail
if (dirname.len + basename.len + 1 != name.len) return false;
// check beginning
if (!std.mem.eql(u8, dirname, name[0..dirname.len])) return false;
// check end
if (!std.mem.eql(u8, basename, name[dirname.len + 1 ..])) return false;
// check path seperator (assuming unix)
return name[dirname.len] == '/';
}
test "nameMatch" {
try std.testing.expect(nameMatch(
"zig-out/lib/libfaas-proxy-sample-lib.so",
"zig-out/lib",
"libfaas-proxy-sample-lib.so",
));
}
/// adds a file to watch. The return will be a handle that will be returned
/// in the fileChanged event triffered from startWatch
pub fn addFileWatch(self: *Self, path: [:0]const u8) !usize {
pub fn addFileWatch(self: *Self, path: *[:0]const u8) !usize {
self.inotify_fd = self.inotify_fd orelse try std.os.inotify_init1(std.os.linux.IN.NONBLOCK);
errdefer {
std.os.close(self.inotify_fd.?);
@ -233,19 +276,41 @@ pub fn addFileWatch(self: *Self, path: [:0]const u8) !usize {
// zig build modification pattern: open 20, close_nowrite 10, MOVED_TO (on the directory), attrib 4
// unix cp: OPEN, MODIFY, CLOSE_WRITE, ATTRIB
// unix mv: MOVED_TO (on the directory)
self.wds[self.nfds_t] = try std.os.inotify_add_watchZ(
self.inotify_fd.?,
path,
std.os.linux.IN.ATTRIB | std.os.linux.IN.CLOSE | std.os.linux.IN.CLOSE_WRITE | std.os.linux.IN.MODIFY,
);
log.debug("watch added. fd {d}, wd {d}", .{ self.inotify_fd.?, self.wds[self.nfds_t] });
if (self.wds[self.nfds_t] == -1)
self.wds[self.nfds_t] = .{
.wd = try std.os.inotify_add_watchZ(
self.inotify_fd.?,
path.*,
std.os.linux.IN.CLOSE_WRITE,
),
.path = path,
};
if (self.wds[self.nfds_t].wd == -1)
@panic("could not set watch");
log.debug("watch added. fd {d}, wd {d}. Path {s}", .{ self.inotify_fd.?, self.wds[self.nfds_t].wd, path });
self.nfds_t += 1;
try self.addDirWatch(path);
if (self.watch_started) self.reloadWatch() catch @panic("could not reload watch");
return self.nfds_t - 1;
}
// This will add a hidden directory watch to catch OS moves into place
fn addDirWatch(self: *Self, path: *[]const u8) !void {
const dirname = std.fs.path.dirname(path.*).?; // TODO: reimplement std.fs.path.dirname as we're getting a local in here
log.debug("addDirWatch: dir_nfds_t: {d}, dir: {s}", .{ self.dir_nfds_t, dirname });
if (self.dir_nfds_t > 1)
for (0..self.dir_nfds_t) |inx|
if (self.dir_wds[inx].path) |p|
if (std.mem.eql(u8, std.fs.path.dirname(p.*).?, dirname))
return; // We are already watching this directory
// We do not have a directory watch
self.dir_wds[self.dir_nfds_t] = .{
.wd = try std.os.inotify_add_watch(self.inotify_fd.?, dirname, std.os.linux.IN.MOVED_TO),
.path = path, // we store path rather than directory because doing this without an allocator is...tough
};
self.dir_nfds_t += 1;
log.debug("directory watch added. fd {d}, wd {d}, dir {s}", .{ self.inotify_fd.?, self.wds[self.nfds_t].wd, dirname });
}
fn reloadWatch(self: Self) !void {
try self.sendControl('c');
}

View File

@ -5,7 +5,7 @@ export fn serve() void {
const stdout_file = std.io.getStdOut().writer();
var bw = std.io.bufferedWriter(stdout_file);
const stdout = bw.writer();
stdout.print(" 6 ", .{}) catch unreachable;
stdout.print(" 0 ", .{}) catch unreachable;
bw.flush() catch unreachable;
}

View File

@ -1,10 +1,7 @@
const std = @import("std");
const c = @cImport({
@cInclude("dlfcn.h");
});
const Watch = @import("Watch.zig");
const serve_op = *const fn () void;
const serveFn = *const fn () void;
var shutdown = false;
const timeout = 250;
@ -12,121 +9,92 @@ const timeout = 250;
const Executor = struct {
path: [:0]const u8,
library: ?*anyopaque = null,
serve: ?serve_op = null,
serve: ?serveFn = null,
watch: ?usize = null,
reload_lock: bool = false,
};
var executors = [_]Executor{
.{ .path = "zig-out/lib/libfaas-proxy-sample-lib2.so" },
.{ .path = "zig-out/lib/libfaas-proxy-sample-lib.so" },
.{ .path = "zig-out/lib/libfaas-proxy-sample-lib2.so" },
};
var watcher = Watch.init(executorChanged);
const log = std.log.scoped(.main);
pub const std_options = struct {
// Set the log level to info
pub const log_level = .debug;
pub const log_scope_levels = &[_]std.log.ScopeLevel{
.{ .scope = .watch, .level = .info },
};
};
const SERVE_FN_NAME = "serve";
fn serve() !void {
// if (some path routing thing) {
(try getExecutor(0))();
if (inx > 4) {
if (inx % 2 == 0)
(try getExecutor(0))()
else
(try getExecutor(1))();
}
// if (std.c.dlerror()) |_| { // TODO: use capture
// return error.CouldNotLoadSymbolServe;
// }
// TODO: only close on reload
// if (std.c.dlclose(library.?) != 0) {
// return error.CouldNotUnloadLibrary;
// }
// library = null;
// if (inx > 4) {
// if (inx % 2 == 0)
// (try getExecutor(0))()
// else
// (try getExecutor(1))();
// }
}
fn getExecutor(key: usize) !serve_op {
fn getExecutor(key: usize) !serveFn {
var executor = &executors[key];
if (executor.serve) |s| return s;
executor.library = blk: {
if (executor.library) |l| {
if (executor.library) |l|
break :blk l;
}
while (executor.reload_lock) // system is reloading the library
std.time.sleep(1);
if (executor.library) |l| // check again to see where we are at
break :blk l;
log.info("library {s} requested but not loaded. Loading library", .{executor.path});
const l = try dlopen(executor.path);
errdefer if (std.c.dlclose(l) != 0)
@panic("System unstable: Error after library open and cannot close");
executor.watch = executor.watch orelse try watcher.addFileWatch(executor.path);
executor.watch = executor.watch orelse try watcher.addFileWatch(&executor.path);
break :blk l;
};
// std.c.dlerror();
const serve_function = std.c.dlsym(executor.library.?, "serve");
const serve_function = std.c.dlsym(executor.library.?, SERVE_FN_NAME);
if (serve_function == null) return error.CouldNotLoadSymbolServe;
executor.serve = @ptrCast(serve_op, serve_function.?);
executor.serve = @ptrCast(serveFn, serve_function.?);
return executor.serve.?;
}
// This works
// fn executorChanged(watch: usize) void {
// log.debug("executor changed event", .{});
// for (&executors) |*executor| {
// if (executor.watch) |w| {
// if (w == watch) {
// if (executor.library) |l| {
// log.info("library {s} changed. Unloading library", .{executor.path});
// // TODO: These two lines could introduce a race. Right now that would mean a panic
// executor.serve = null;
// if (std.c.dlclose(l) != 0)
// @panic("System unstable: Error after library open and cannot close");
// }
// executor.library = null;
// executor.serve = null;
// // NOTE: Would love to reload the library here, but that action
// // does not seem to be thread safe
// }
// }
// }
// }
// NOTE: this will be on a different thread. This code does not work, and I
// am fairly certain it is because we can't share a function pointer between
// threads
fn executorChanged(watch: usize) void {
// NOTE: This will be called off the main thread
log.debug("executor with watch {d} changed", .{watch});
for (&executors) |*executor| {
if (executor.watch) |w| {
if (w == watch) {
if (executor.library) |l| {
log.debug("reloading executor at path: {s}", .{executor.path});
const newlib = dlopen(executor.path) catch {
executor.reload_lock = true;
defer executor.reload_lock = false;
if (std.c.dlclose(l) != 0)
@panic("System unstable: Error after library open and cannot close");
log.debug("closed old library. reloading executor at: {s}", .{executor.path});
executor.library = dlopen(executor.path) catch {
log.warn("could not reload! error opening library", .{});
return;
};
errdefer if (std.c.dlclose(newlib) != 0)
@panic("System unstable: Error after library open and cannot close");
const serve_function = std.c.dlsym(newlib, "serve");
if (serve_function == null) {
executor.serve = @ptrCast(serveFn, std.c.dlsym(executor.library.?, SERVE_FN_NAME));
if (executor.serve == null) {
log.warn("could not reload! error finding symbol", .{});
if (std.c.dlclose(executor.library.?) != 0)
@panic("System unstable: Error after library open and cannot close");
return;
}
// new lib all loaded up - do the swap and close the old
log.debug("updating function and library", .{});
executor.serve = @ptrCast(serve_op, serve_function.?);
executor.library = newlib;
if (std.c.dlclose(l) != 0)
@panic("System unstable: Error after library open and cannot close");
log.debug("closed old library", .{});
}
}
}
@ -135,7 +103,7 @@ fn executorChanged(watch: usize) void {
fn dlopen(path: [:0]const u8) !*anyopaque {
// We need now (and local) because we're about to call it
const lib = std.c.dlopen(path, c.RTLD_NOW);
const lib = std.c.dlopen(path, std.c.RTLD.NOW);
if (lib) |l| return l;
return error.CouldNotOpenDynamicLibrary;
}
@ -180,7 +148,13 @@ pub fn main() !void {
shutdown = true;
watcher_thread.join();
}
test {
// To run nested container tests, either, call `refAllDecls` which will
// reference all declarations located in the given argument.
// `@This()` is a builtin function that returns the innermost container it is called from.
// In this example, the innermost container is this file (implicitly a struct).
std.testing.refAllDecls(@This());
}
test "simple test" {
var list = std.ArrayList(i32).init(std.testing.allocator);
defer list.deinit(); // try commenting this out and see if zig detects the memory leak!