380 lines
17 KiB
Zig
380 lines
17 KiB
Zig
const builtin = @import("builtin");
|
|
const std = @import("std");
|
|
|
|
const MAX_FDS = 1024;
|
|
|
|
const Self = @This();
|
|
const log = std.log.scoped(.watch);
|
|
|
|
const Wd = struct {
|
|
wd: i32 = 0,
|
|
path: ?*const []const u8 = null,
|
|
};
|
|
|
|
fileChanged: *const fn (usize) void,
|
|
inotify_fd: ?std.posix.fd_t = null,
|
|
|
|
nfds_t: usize = 0,
|
|
wds: [MAX_FDS]Wd = [_]Wd{.{}} ** MAX_FDS,
|
|
dir_nfds_t: usize = 0,
|
|
dir_wds: [MAX_FDS]Wd = [_]Wd{.{}} ** MAX_FDS,
|
|
control_socket: ?std.posix.socket_t = null,
|
|
watch_started: bool = false,
|
|
sock_name: [:0]const u8,
|
|
|
|
pub fn init(file_changed: *const fn (usize) void) Self {
|
|
if (builtin.os.tag != .linux)
|
|
@compileError("Unsupported OS");
|
|
return .{
|
|
.fileChanged = file_changed,
|
|
.sock_name = sockName(),
|
|
};
|
|
}
|
|
|
|
pub fn deinit(self: *Self) void {
|
|
if (self.control_socket) |s| {
|
|
// Sockets...where Unix still pretends everything is a file, but it's not...
|
|
log.debug("closing control socket", .{});
|
|
std.posix.close(s);
|
|
}
|
|
if (self.inotify_fd) |fd| {
|
|
for (0..self.nfds_t + self.dir_nfds_t) |inx| {
|
|
const wd = if (inx < self.nfds_t) self.wds[inx].wd else self.dir_wds[inx - self.nfds_t].wd;
|
|
std.posix.inotify_rm_watch(fd, wd);
|
|
}
|
|
std.posix.close(fd);
|
|
}
|
|
const cwd = std.fs.cwd();
|
|
cwd.deleteFileZ(self.sock_name) catch |e|
|
|
log.err("error removing socket file {s}: {any}", .{ self.sock_name, e });
|
|
}
|
|
|
|
const SOCK_NAME = "S.watch-control";
|
|
var buf = [_]u8{0} ** (SOCK_NAME.len + "-9223372036854775807 ".len);
|
|
fn sockName() [:0]const u8 {
|
|
return std.fmt.bufPrintZ(buf[0..], "{s}-{d}", .{ SOCK_NAME, std.time.timestamp() }) catch unreachable; // buffer designed for Max(i64) with sock name and a trailing \0
|
|
}
|
|
/// starts the file watch. This function will not return, so it is best
|
|
/// to put this function in its own thread:
|
|
///
|
|
/// const watcher_thread = try std.Thread.spawn(.{}, Watch.startWatch, .{&watcher});
|
|
///
|
|
/// Due to the nature of the poll(), behavior will almost definitely not work
|
|
/// well if files are added after the watch begins. A method for doing this
|
|
/// is intended later
|
|
pub fn startWatch(self: *Self) void {
|
|
if (self.control_socket == null)
|
|
self.addControlSocket(self.sock_name) catch @panic("could not add control socket");
|
|
std.debug.assert(self.control_socket != null);
|
|
|
|
while (true) {
|
|
self.watch_started = true;
|
|
|
|
const fds = if (self.inotify_fd == null)
|
|
@constCast(&[_]std.posix.pollfd{.{ .fd = self.control_socket.?, .events = std.posix.POLL.IN, .revents = undefined }})
|
|
else
|
|
@constCast(&[_]std.posix.pollfd{
|
|
.{ .fd = self.control_socket.?, .events = std.posix.POLL.IN, .revents = undefined },
|
|
.{ .fd = self.inotify_fd.?, .events = std.posix.POLL.IN, .revents = undefined },
|
|
});
|
|
|
|
const control_fd_inx = 0;
|
|
const inotify_fd_inx = 1;
|
|
|
|
//
|
|
// NOTE: There is a std.io.poll that provides a higher level abstraction.
|
|
// However, this API is strictly related to the use case of an open stream
|
|
// for which we are awaiting data. In this case, we are polling for
|
|
// an inotify event, for which no abstraction currently exists
|
|
//
|
|
// std.fs.watch looks really good...but it requires event based I/O,
|
|
// which is not yet ready to be (re)added.
|
|
log.debug("tid={d} start poll with {d} fds", .{ std.Thread.getCurrentId(), fds.len });
|
|
if ((std.posix.poll(
|
|
fds,
|
|
-1, // Infinite timeout
|
|
) catch @panic("poll error")) > 0) {
|
|
if (fds[control_fd_inx].revents & std.posix.POLL.IN == std.posix.POLL.IN) { // POLLIN means "there is data to read"
|
|
log.debug("tid={d} control event", .{std.Thread.getCurrentId()});
|
|
// we only need one byte for what we're doing
|
|
var control_buf: [1]u8 = undefined;
|
|
|
|
// self.control_socket_accepted_fd = self.control_socket_accepted_fd orelse acceptSocket(self.control_socket.?);
|
|
// const fd = self.control_socket_accepted_fd.?; // let's save some typing
|
|
const fd = acceptSocket(self.sock_name, self.control_socket.?);
|
|
defer std.posix.close(fd);
|
|
|
|
const readcount = std.posix.recv(fd, &control_buf, 0) catch unreachable;
|
|
// var other_buf: [1]u8 = undefined;
|
|
// if (std.os.recv(fd, &other_buf, 0) catch unreachable != 0)
|
|
// @panic("socket contains more data than expected");
|
|
log.debug("read {d} bytes from socket: {s}", .{ readcount, std.fmt.fmtSliceHexLower(control_buf[0..readcount]) });
|
|
if (readcount == 0) {
|
|
// then what?
|
|
log.err("tid={d} control socket with POLL.IN but no data?", .{std.Thread.getCurrentId()});
|
|
@panic("control event received but no data available");
|
|
}
|
|
|
|
switch (control_buf[0]) {
|
|
'c' => {
|
|
log.info("tid={d} continue command (reload) received on control socket", .{std.Thread.getCurrentId()});
|
|
continue;
|
|
},
|
|
'q' => {
|
|
log.info("tid={d} quit command received on control socket", .{std.Thread.getCurrentId()});
|
|
self.watch_started = false;
|
|
return;
|
|
},
|
|
else => {
|
|
log.err("tid={d} Unexpected command on control socket 0x{x}", .{ std.Thread.getCurrentId(), control_buf[0] });
|
|
if (control_buf[0] == 0xaa) // we are in a world of hurt - panic
|
|
@panic("seems like a buffer overrun!");
|
|
},
|
|
}
|
|
}
|
|
|
|
// fds[1] is inotify, so if we have data in that file descriptor,
|
|
// we can force the data into an inotify_event structure and act on it
|
|
if (self.inotify_fd != null and fds[inotify_fd_inx].revents & std.posix.POLL.IN == std.posix.POLL.IN) {
|
|
log.debug("tid={d} inotify event", .{std.Thread.getCurrentId()});
|
|
var event_buf: [4096]u8 align(@alignOf(std.os.linux.inotify_event)) = undefined;
|
|
// "borrowed" from https://ziglang.org/documentation/master/std/src/std/fs/watch.zig.html#L588
|
|
const bytes_read = std.posix.read(self.inotify_fd.?, &event_buf) catch unreachable;
|
|
|
|
var ptr: [*]u8 = &event_buf;
|
|
const end_ptr = ptr + bytes_read;
|
|
while (@intFromPtr(ptr) < @intFromPtr(end_ptr)) {
|
|
const ev = @as(*const std.os.linux.inotify_event, @ptrCast(@alignCast(ptr)));
|
|
|
|
// Read next event from inotify
|
|
ptr = ptr + @sizeOf(std.os.linux.inotify_event) + ev.len;
|
|
self.processInotifyEvent(ev, ptr - ev.len);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
fn acceptSocket(name: [:0]const u8, socket: std.posix.socket_t) std.posix.socket_t {
|
|
var sockaddr = std.net.Address.initUnix(name) catch @panic("could not get sockaddr");
|
|
var sockaddr_len: std.posix.socklen_t = sockaddr.getOsSockLen();
|
|
log.debug("tid={d} accepting on socket fd {d}", .{ std.Thread.getCurrentId(), socket });
|
|
return std.posix.accept(
|
|
socket,
|
|
&sockaddr.any,
|
|
&sockaddr_len,
|
|
0,
|
|
) catch @panic("could not accept connection");
|
|
}
|
|
|
|
/// This will determine whether the inotify event indicates an actionable
|
|
/// change to the file, and if so, will call self.fileChanged
|
|
fn processInotifyEvent(self: Self, ev: *const std.os.linux.inotify_event, name_ptr: [*]u8) void {
|
|
// If the file was modified, it is good to know, but not
|
|
// actionable at this time. We can set a modification flag
|
|
// for later use. This flag and process may be unnecessary...
|
|
// how can we have a modify followed by CLOSE_NOWRITE?
|
|
|
|
// There's a couple ways a file can be modified. The simplest
|
|
// way is to write(), then close(). For a variety of reasons
|
|
// due to safety, a lot of programs will write some temporary
|
|
// file, then copy or move it in place. This will fail to
|
|
// trigger IN_CLOSE_WRITE, so we need to detect it another
|
|
// way. The best is to watch for events on the parent directory
|
|
// to find move events. Note that using copy will trigger
|
|
// a IN_CLOSE_WRITE. Without building directory watching in,
|
|
// we can use IN_ATTRIB to satisfy the `zig build` use case,
|
|
// which modifies attributes after moving the file.
|
|
//
|
|
// THIS WILL NOT WORK in the generic sense, and ultimately
|
|
// we're going to have to watch the directory as well
|
|
// attrib added as build process moves in place and modifies attributes
|
|
if (ev.mask & std.os.linux.IN.CLOSE_WRITE == std.os.linux.IN.CLOSE_WRITE)
|
|
// ev.mask & std.os.linux.IN.ATTRIB == std.os.linux.IN.ATTRIB)
|
|
{
|
|
for (self.wds, 0..) |wd, inx| {
|
|
if (ev.wd == wd.wd) {
|
|
log.debug("CLOSE_WRITE: {d}", .{wd.wd});
|
|
self.fileChanged(inx);
|
|
break; // stop looking when we found the file
|
|
}
|
|
}
|
|
}
|
|
if (ev.mask & std.os.linux.IN.MOVED_TO == std.os.linux.IN.MOVED_TO) {
|
|
// This mem.span makes me deeply uncomfortable, but is how fs.watch does it
|
|
// TODO: This should be a std.mem.sliceTo(@ptrCast([*:0]u8, name_ptr), ev.len);
|
|
// and returning from C without a sentinal, we can use the same call, like this:
|
|
// std.mem.sliceTo(@ptrCast([*]u8, name_ptr), len);
|
|
const name = std.mem.span(@as([*:0]u8, @ptrCast(name_ptr)));
|
|
log.debug("MOVED_TO({d}/{d}): {s}", .{ name.len, ev.len, name });
|
|
for (self.dir_wds) |dir| {
|
|
if (ev.wd == dir.wd) {
|
|
for (self.wds, 0..) |wd, inx| {
|
|
if (inx >= self.nfds_t) {
|
|
log.info(
|
|
"file moved into watch directory but is not registered watch: {s}",
|
|
.{name},
|
|
);
|
|
break;
|
|
}
|
|
|
|
log.debug(
|
|
"name '{s}', dir '{s}', basename '{s}'",
|
|
.{ name, std.fs.path.dirname(dir.path.?.*).?, wd.path.?.* },
|
|
);
|
|
if (nameMatch(
|
|
wd.path.?.*,
|
|
std.fs.path.dirname(dir.path.?.*).?,
|
|
name,
|
|
)) {
|
|
self.fileChanged(inx);
|
|
break; // stop looking when we found the file
|
|
}
|
|
}
|
|
break; // once we found the directory we need to stop looking
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
fn nameMatch(name: []const u8, dirname: []const u8, basename: []const u8) bool {
|
|
// check total length - should be fastest fail
|
|
if (dirname.len + basename.len + 1 != name.len) return false;
|
|
// check beginning
|
|
if (!std.mem.eql(u8, dirname, name[0..dirname.len])) return false;
|
|
// check end
|
|
if (!std.mem.eql(u8, basename, name[dirname.len + 1 ..])) return false;
|
|
// check path seperator (assuming unix)
|
|
return name[dirname.len] == '/';
|
|
}
|
|
|
|
test "nameMatch" {
|
|
try std.testing.expect(nameMatch(
|
|
"zig-out/lib/libfaas-proxy-sample-lib.so",
|
|
"zig-out/lib",
|
|
"libfaas-proxy-sample-lib.so",
|
|
));
|
|
}
|
|
|
|
/// adds a file to watch. The return will be a handle that will be returned
|
|
/// in the fileChanged event triffered from startWatch
|
|
pub fn addFileWatch(self: *Self, path: *[:0]const u8) !usize {
|
|
self.inotify_fd = self.inotify_fd orelse try std.posix.inotify_init1(std.os.linux.IN.NONBLOCK);
|
|
errdefer {
|
|
std.posix.close(self.inotify_fd.?);
|
|
self.inotify_fd = null;
|
|
}
|
|
// zig build modification pattern: open 20, close_nowrite 10, MOVED_TO (on the directory), attrib 4
|
|
// unix cp: OPEN, MODIFY, CLOSE_WRITE, ATTRIB
|
|
// unix mv: MOVED_TO (on the directory)
|
|
self.wds[self.nfds_t] = .{
|
|
.wd = try std.posix.inotify_add_watchZ(
|
|
self.inotify_fd.?,
|
|
path.*,
|
|
std.os.linux.IN.CLOSE_WRITE,
|
|
),
|
|
.path = path,
|
|
};
|
|
if (self.wds[self.nfds_t].wd == -1)
|
|
@panic("could not set watch");
|
|
log.debug("watch added. fd {d}, wd {d}. Path {s}", .{ self.inotify_fd.?, self.wds[self.nfds_t].wd, path });
|
|
self.nfds_t += 1;
|
|
try self.addDirWatch(path);
|
|
if (self.watch_started) self.reloadWatch() catch @panic("could not reload watch");
|
|
return self.nfds_t - 1;
|
|
}
|
|
|
|
// This will add a hidden directory watch to catch OS moves into place
|
|
fn addDirWatch(self: *Self, path: *[]const u8) !void {
|
|
const dirname = std.fs.path.dirname(path.*).?; // TODO: reimplement std.fs.path.dirname as we're getting a local in here
|
|
log.debug("addDirWatch: dir_nfds_t: {d}, dir: {s}", .{ self.dir_nfds_t, dirname });
|
|
if (self.dir_nfds_t > 1)
|
|
for (0..self.dir_nfds_t) |inx|
|
|
if (self.dir_wds[inx].path) |p|
|
|
if (std.mem.eql(u8, std.fs.path.dirname(p.*).?, dirname))
|
|
return; // We are already watching this directory
|
|
// We do not have a directory watch
|
|
self.dir_wds[self.dir_nfds_t] = .{
|
|
.wd = try std.posix.inotify_add_watch(self.inotify_fd.?, dirname, std.os.linux.IN.MOVED_TO),
|
|
.path = path, // we store path rather than directory because doing this without an allocator is...tough
|
|
};
|
|
self.dir_nfds_t += 1;
|
|
log.debug("directory watch added. fd {d}, wd {d}, dir {s}", .{ self.inotify_fd.?, self.wds[self.nfds_t].wd, dirname });
|
|
}
|
|
|
|
fn reloadWatch(self: Self) !void {
|
|
try self.sendControl('c');
|
|
}
|
|
|
|
pub fn stopWatch(self: Self) !void {
|
|
try self.sendControl('q');
|
|
}
|
|
|
|
fn sendControl(self: Self, control: u8) !void {
|
|
// Sockets...where Unix still pretends everything is a file, but it's not...
|
|
//
|
|
// For client processing, there are a bunch of steps, but the zig stdlib
|
|
// saves us a bunch of work. Once we do std.net.connectUnixSocket(), we
|
|
// get a stream back that has reader() and writer() calls
|
|
//
|
|
// log.debug("request to send control 0x{x}", .{control});
|
|
if (self.control_socket == null) return; // nothing to do
|
|
// log.debug("tid={d} opening stream", .{std.Thread.getCurrentId()});
|
|
var stream = try std.net.connectUnixSocket(self.sock_name);
|
|
defer stream.close();
|
|
log.debug("tid={d} sending control 0x{x} on socket fd={d}", .{ std.Thread.getCurrentId(), control, stream.handle });
|
|
try stream.writer().writeByte(control);
|
|
}
|
|
|
|
/// creates a control socket. This allows for managing the watcher. With it,
|
|
/// you can gracefully terminate the process and you can add files after the fact
|
|
fn addControlSocket(self: *Self, path: [:0]const u8) !void {
|
|
// TODO: I now believe std.net.StreamServer will handle a lot of these details for us.
|
|
// something like: var server = std.net.StreamServer.init(.{ ... }); server.listen(.{...});
|
|
// and use server.sockfd.? as my control socket
|
|
//
|
|
// This function theoretically should work without requiring linux...except this inotify call,
|
|
// which is completely linux specific
|
|
self.inotify_fd = self.inotify_fd orelse try std.posix.inotify_init1(std.os.linux.IN.NONBLOCK);
|
|
log.debug("Established inotify file descriptor {d}", .{self.inotify_fd.?});
|
|
errdefer {
|
|
std.posix.close(self.inotify_fd.?);
|
|
self.inotify_fd = null;
|
|
}
|
|
// this should work on all systems theoretically, but I believe would work only
|
|
// on *nix systems
|
|
//
|
|
// Sockets...where Unix still pretends everything is a file, but it's not...
|
|
// We'll create a unix socket, which looks like a file on the file system
|
|
//
|
|
// For client processing, see comments in the sendControl function
|
|
//
|
|
// From the "server" perspective, we need to to this initially:
|
|
// 1. std.os.socket: create the socket. This file descriptor should be used in poll(2) calls
|
|
// 2. std.os.bind: tell the system where the socket is (here, it's the filesystem path)
|
|
// 3. std.os.listen: tell the system how many simultaneous connections we can have
|
|
//
|
|
// At this point, clients can write to the socket (but that's not typical fs ops either)
|
|
// To read from the socket, we need to:
|
|
//
|
|
// 4. std.os.accept: create a file descriptor from the socket descriptor
|
|
// 5. std.os.recv: works just like read(2). Call lots
|
|
// 6. std.os.close: close the fd
|
|
//
|
|
// On end of use, we need to std.os.closeSocket()
|
|
const sock = try std.posix.socket(
|
|
std.os.linux.AF.LOCAL,
|
|
std.os.linux.SOCK.STREAM | std.posix.SOCK.CLOEXEC,
|
|
0,
|
|
);
|
|
errdefer std.posix.close(sock);
|
|
|
|
const sockaddr = try std.net.Address.initUnix(path);
|
|
|
|
log.debug("binding to path: {s}", .{path});
|
|
try std.posix.bind(sock, &sockaddr.any, sockaddr.getOsSockLen());
|
|
try std.posix.listen(sock, 10);
|
|
self.control_socket = sock;
|
|
log.debug("added control socket with fd={d}", .{sock});
|
|
}
|