2023-05-09 16:18:38 +00:00
const builtin = @import ( " builtin " ) ;
const std = @import ( " std " ) ;
const MAX_FDS = 1024 ;
const Self = @This ( ) ;
2023-05-10 22:05:44 +00:00
const log = std . log . scoped ( . watch ) ;
2023-05-09 16:18:38 +00:00
2023-05-11 14:15:27 +00:00
const Wd = struct {
wd : i32 = 0 ,
path : ? * const [ ] const u8 = null ,
} ;
2023-05-09 16:18:38 +00:00
fileChanged : * const fn ( usize ) void ,
2024-05-02 21:38:04 +00:00
inotify_fd : ? std . posix . fd_t = null ,
2023-05-09 17:08:23 +00:00
2023-05-09 16:18:38 +00:00
nfds_t : usize = 0 ,
2023-05-11 14:15:27 +00:00
wds : [ MAX_FDS ] Wd = [ _ ] Wd { . { } } * * MAX_FDS ,
dir_nfds_t : usize = 0 ,
dir_wds : [ MAX_FDS ] Wd = [ _ ] Wd { . { } } * * MAX_FDS ,
2024-05-02 21:38:04 +00:00
control_socket : ? std . posix . socket_t = null ,
2023-05-10 22:05:44 +00:00
watch_started : bool = false ,
2023-10-26 22:40:23 +00:00
sock_name : [ : 0 ] const u8 ,
2023-05-09 16:18:38 +00:00
pub fn init ( file_changed : * const fn ( usize ) void ) Self {
if ( builtin . os . tag ! = . linux )
@compileError ( " Unsupported OS " ) ;
return . {
. fileChanged = file_changed ,
2023-10-26 22:40:23 +00:00
. sock_name = sockName ( ) ,
2023-05-09 16:18:38 +00:00
} ;
}
pub fn deinit ( self : * Self ) void {
2023-05-10 22:05:44 +00:00
if ( self . control_socket ) | s | {
// Sockets...where Unix still pretends everything is a file, but it's not...
log . debug ( " closing control socket " , . { } ) ;
2024-05-02 21:38:04 +00:00
std . posix . close ( s ) ;
2023-05-10 22:05:44 +00:00
}
2023-05-09 16:18:38 +00:00
if ( self . inotify_fd ) | fd | {
2023-05-11 14:15:27 +00:00
for ( 0 . . self . nfds_t + self . dir_nfds_t ) | inx | {
const wd = if ( inx < self . nfds_t ) self . wds [ inx ] . wd else self . dir_wds [ inx - self . nfds_t ] . wd ;
2024-05-02 21:38:04 +00:00
std . posix . inotify_rm_watch ( fd , wd ) ;
2023-05-09 16:18:38 +00:00
}
2024-05-02 21:38:04 +00:00
std . posix . close ( fd ) ;
2023-05-09 16:18:38 +00:00
}
2023-05-10 22:05:44 +00:00
const cwd = std . fs . cwd ( ) ;
2023-10-26 22:40:23 +00:00
cwd . deleteFileZ ( self . sock_name ) catch | e |
log . err ( " error removing socket file {s}: {any} " , . { self . sock_name , e } ) ;
2023-05-09 16:18:38 +00:00
}
2023-05-10 22:05:44 +00:00
const SOCK_NAME = " S.watch-control " ;
2023-10-26 22:40:23 +00:00
var buf = [ _ ] u8 { 0 } * * ( SOCK_NAME . len + " -9223372036854775807 " . len ) ;
fn sockName ( ) [ : 0 ] const u8 {
return std . fmt . bufPrintZ ( buf [ 0 . . ] , " {s}-{d} " , . { SOCK_NAME , std . time . timestamp ( ) } ) catch unreachable ; // buffer designed for Max(i64) with sock name and a trailing \0
}
2023-05-09 17:08:23 +00:00
/// starts the file watch. This function will not return, so it is best
/// to put this function in its own thread:
///
/// const watcher_thread = try std.Thread.spawn(.{}, Watch.startWatch, .{&watcher});
///
/// Due to the nature of the poll(), behavior will almost definitely not work
/// well if files are added after the watch begins. A method for doing this
/// is intended later
pub fn startWatch ( self : * Self ) void {
2023-05-10 22:05:44 +00:00
if ( self . control_socket = = null )
2023-10-26 22:40:23 +00:00
self . addControlSocket ( self . sock_name ) catch @panic ( " could not add control socket " ) ;
2023-05-10 22:05:44 +00:00
std . debug . assert ( self . control_socket ! = null ) ;
2023-05-09 16:18:38 +00:00
while ( true ) {
2023-05-10 22:05:44 +00:00
self . watch_started = true ;
2024-05-02 21:38:04 +00:00
const fds = if ( self . inotify_fd = = null )
@constCast ( & [ _ ] std . posix . pollfd { . { . fd = self . control_socket . ? , . events = std . posix . POLL . IN , . revents = undefined } } )
2023-05-10 22:05:44 +00:00
else
2024-05-02 21:38:04 +00:00
@constCast ( & [ _ ] std . posix . pollfd {
. { . fd = self . control_socket . ? , . events = std . posix . POLL . IN , . revents = undefined } ,
. { . fd = self . inotify_fd . ? , . events = std . posix . POLL . IN , . revents = undefined } ,
2023-05-31 23:19:31 +00:00
} ) ;
2023-05-10 22:05:44 +00:00
const control_fd_inx = 0 ;
const inotify_fd_inx = 1 ;
//
2023-05-09 16:18:38 +00:00
// NOTE: There is a std.io.poll that provides a higher level abstraction.
// However, this API is strictly related to the use case of an open stream
// for which we are awaiting data. In this case, we are polling for
// an inotify event, for which no abstraction currently exists
//
// std.fs.watch looks really good...but it requires event based I/O,
// which is not yet ready to be (re)added.
2023-05-10 22:05:44 +00:00
log . debug ( " tid={d} start poll with {d} fds " , . { std . Thread . getCurrentId ( ) , fds . len } ) ;
2024-05-02 21:38:04 +00:00
if ( ( std . posix . poll (
2023-05-09 16:18:38 +00:00
fds ,
- 1 , // Infinite timeout
) catch @panic ( " poll error " ) ) > 0 ) {
2024-05-02 21:38:04 +00:00
if ( fds [ control_fd_inx ] . revents & std . posix . POLL . IN = = std . posix . POLL . IN ) { // POLLIN means "there is data to read"
2023-05-10 22:05:44 +00:00
log . debug ( " tid={d} control event " , . { std . Thread . getCurrentId ( ) } ) ;
// we only need one byte for what we're doing
var control_buf : [ 1 ] u8 = undefined ;
// self.control_socket_accepted_fd = self.control_socket_accepted_fd orelse acceptSocket(self.control_socket.?);
// const fd = self.control_socket_accepted_fd.?; // let's save some typing
2023-10-26 22:40:23 +00:00
const fd = acceptSocket ( self . sock_name , self . control_socket . ? ) ;
2024-05-02 21:38:04 +00:00
defer std . posix . close ( fd ) ;
2023-05-10 22:05:44 +00:00
2024-05-02 21:38:04 +00:00
const readcount = std . posix . recv ( fd , & control_buf , 0 ) catch unreachable ;
2023-05-10 22:05:44 +00:00
// var other_buf: [1]u8 = undefined;
// if (std.os.recv(fd, &other_buf, 0) catch unreachable != 0)
// @panic("socket contains more data than expected");
log . debug ( " read {d} bytes from socket: {s} " , . { readcount , std . fmt . fmtSliceHexLower ( control_buf [ 0 . . readcount ] ) } ) ;
if ( readcount = = 0 ) {
// then what?
log . err ( " tid={d} control socket with POLL.IN but no data? " , . { std . Thread . getCurrentId ( ) } ) ;
@panic ( " control event received but no data available " ) ;
}
2023-05-09 17:08:23 +00:00
2023-05-10 22:05:44 +00:00
switch ( control_buf [ 0 ] ) {
'c' = > {
log . info ( " tid={d} continue command (reload) received on control socket " , . { std . Thread . getCurrentId ( ) } ) ;
continue ;
} ,
'q' = > {
log . info ( " tid={d} quit command received on control socket " , . { std . Thread . getCurrentId ( ) } ) ;
self . watch_started = false ;
return ;
} ,
else = > {
log . err ( " tid={d} Unexpected command on control socket 0x{x} " , . { std . Thread . getCurrentId ( ) , control_buf [ 0 ] } ) ;
if ( control_buf [ 0 ] = = 0xaa ) // we are in a world of hurt - panic
@panic ( " seems like a buffer overrun! " ) ;
} ,
}
}
// fds[1] is inotify, so if we have data in that file descriptor,
2023-05-09 17:08:23 +00:00
// we can force the data into an inotify_event structure and act on it
2024-05-02 21:38:04 +00:00
if ( self . inotify_fd ! = null and fds [ inotify_fd_inx ] . revents & std . posix . POLL . IN = = std . posix . POLL . IN ) {
2023-05-10 22:05:44 +00:00
log . debug ( " tid={d} inotify event " , . { std . Thread . getCurrentId ( ) } ) ;
2023-05-09 16:18:38 +00:00
var event_buf : [ 4096 ] u8 align ( @alignOf ( std . os . linux . inotify_event ) ) = undefined ;
// "borrowed" from https://ziglang.org/documentation/master/std/src/std/fs/watch.zig.html#L588
2024-05-02 21:38:04 +00:00
const bytes_read = std . posix . read ( self . inotify_fd . ? , & event_buf ) catch unreachable ;
2023-05-09 16:18:38 +00:00
var ptr : [ * ] u8 = & event_buf ;
const end_ptr = ptr + bytes_read ;
2023-07-03 22:27:21 +00:00
while ( @intFromPtr ( ptr ) < @intFromPtr ( end_ptr ) ) {
const ev = @as ( * const std . os . linux . inotify_event , @ptrCast ( @alignCast ( ptr ) ) ) ;
2023-05-09 16:18:38 +00:00
2023-05-09 17:08:23 +00:00
// Read next event from inotify
2023-07-03 22:27:21 +00:00
ptr = ptr + @sizeOf ( std . os . linux . inotify_event ) + ev . len ;
2023-05-11 14:15:27 +00:00
self . processInotifyEvent ( ev , ptr - ev . len ) ;
2023-05-09 16:18:38 +00:00
}
}
}
}
}
2024-05-02 21:38:04 +00:00
fn acceptSocket ( name : [ : 0 ] const u8 , socket : std . posix . socket_t ) std . posix . socket_t {
2023-10-26 22:40:23 +00:00
var sockaddr = std . net . Address . initUnix ( name ) catch @panic ( " could not get sockaddr " ) ;
2024-05-02 21:38:04 +00:00
var sockaddr_len : std . posix . socklen_t = sockaddr . getOsSockLen ( ) ;
2023-05-10 22:05:44 +00:00
log . debug ( " tid={d} accepting on socket fd {d} " , . { std . Thread . getCurrentId ( ) , socket } ) ;
2024-05-02 21:38:04 +00:00
return std . posix . accept (
2023-05-10 22:05:44 +00:00
socket ,
& sockaddr . any ,
& sockaddr_len ,
0 ,
) catch @panic ( " could not accept connection " ) ;
}
2023-05-09 17:08:23 +00:00
/// This will determine whether the inotify event indicates an actionable
/// change to the file, and if so, will call self.fileChanged
2023-05-11 14:15:27 +00:00
fn processInotifyEvent ( self : Self , ev : * const std . os . linux . inotify_event , name_ptr : [ * ] u8 ) void {
2023-05-09 17:08:23 +00:00
// If the file was modified, it is good to know, but not
// actionable at this time. We can set a modification flag
// for later use. This flag and process may be unnecessary...
// how can we have a modify followed by CLOSE_NOWRITE?
2023-05-11 14:15:27 +00:00
2023-05-09 17:08:23 +00:00
// There's a couple ways a file can be modified. The simplest
// way is to write(), then close(). For a variety of reasons
// due to safety, a lot of programs will write some temporary
// file, then copy or move it in place. This will fail to
// trigger IN_CLOSE_WRITE, so we need to detect it another
// way. The best is to watch for events on the parent directory
// to find move events. Note that using copy will trigger
// a IN_CLOSE_WRITE. Without building directory watching in,
// we can use IN_ATTRIB to satisfy the `zig build` use case,
// which modifies attributes after moving the file.
//
// THIS WILL NOT WORK in the generic sense, and ultimately
// we're going to have to watch the directory as well
// attrib added as build process moves in place and modifies attributes
2023-05-11 14:15:27 +00:00
if ( ev . mask & std . os . linux . IN . CLOSE_WRITE = = std . os . linux . IN . CLOSE_WRITE )
// ev.mask & std.os.linux.IN.ATTRIB == std.os.linux.IN.ATTRIB)
2023-05-09 17:08:23 +00:00
{
for ( self . wds , 0 . . ) | wd , inx | {
2023-05-11 14:15:27 +00:00
if ( ev . wd = = wd . wd ) {
log . debug ( " CLOSE_WRITE: {d} " , . { wd . wd } ) ;
2023-05-09 17:08:23 +00:00
self . fileChanged ( inx ) ;
2023-05-11 14:15:27 +00:00
break ; // stop looking when we found the file
}
2023-05-09 17:08:23 +00:00
}
}
2023-05-11 14:15:27 +00:00
if ( ev . mask & std . os . linux . IN . MOVED_TO = = std . os . linux . IN . MOVED_TO ) {
// This mem.span makes me deeply uncomfortable, but is how fs.watch does it
2023-05-13 00:11:36 +00:00
// TODO: This should be a std.mem.sliceTo(@ptrCast([*:0]u8, name_ptr), ev.len);
// and returning from C without a sentinal, we can use the same call, like this:
// std.mem.sliceTo(@ptrCast([*]u8, name_ptr), len);
2023-07-03 22:27:21 +00:00
const name = std . mem . span ( @as ( [ * : 0 ] u8 , @ptrCast ( name_ptr ) ) ) ;
2023-05-11 14:15:27 +00:00
log . debug ( " MOVED_TO({d}/{d}): {s} " , . { name . len , ev . len , name } ) ;
for ( self . dir_wds ) | dir | {
if ( ev . wd = = dir . wd ) {
for ( self . wds , 0 . . ) | wd , inx | {
if ( inx > = self . nfds_t ) {
log . info (
" file moved into watch directory but is not registered watch: {s} " ,
. { name } ,
) ;
break ;
}
log . debug (
" name '{s}', dir '{s}', basename '{s}' " ,
. { name , std . fs . path . dirname ( dir . path . ? . * ) . ? , wd . path . ? . * } ,
) ;
if ( nameMatch (
wd . path . ? . * ,
std . fs . path . dirname ( dir . path . ? . * ) . ? ,
name ,
) ) {
self . fileChanged ( inx ) ;
break ; // stop looking when we found the file
}
}
break ; // once we found the directory we need to stop looking
}
}
}
}
fn nameMatch ( name : [ ] const u8 , dirname : [ ] const u8 , basename : [ ] const u8 ) bool {
// check total length - should be fastest fail
if ( dirname . len + basename . len + 1 ! = name . len ) return false ;
// check beginning
if ( ! std . mem . eql ( u8 , dirname , name [ 0 . . dirname . len ] ) ) return false ;
// check end
if ( ! std . mem . eql ( u8 , basename , name [ dirname . len + 1 . . ] ) ) return false ;
// check path seperator (assuming unix)
return name [ dirname . len ] = = '/' ;
}
test " nameMatch " {
try std . testing . expect ( nameMatch (
" zig-out/lib/libfaas-proxy-sample-lib.so " ,
" zig-out/lib " ,
" libfaas-proxy-sample-lib.so " ,
) ) ;
2023-05-09 17:08:23 +00:00
}
/// adds a file to watch. The return will be a handle that will be returned
/// in the fileChanged event triffered from startWatch
2023-05-11 14:15:27 +00:00
pub fn addFileWatch ( self : * Self , path : * [ : 0 ] const u8 ) ! usize {
2024-05-02 21:38:04 +00:00
self . inotify_fd = self . inotify_fd orelse try std . posix . inotify_init1 ( std . os . linux . IN . NONBLOCK ) ;
2023-05-09 16:18:38 +00:00
errdefer {
2024-05-02 21:38:04 +00:00
std . posix . close ( self . inotify_fd . ? ) ;
2023-05-09 16:18:38 +00:00
self . inotify_fd = null ;
}
2023-05-09 17:08:23 +00:00
// zig build modification pattern: open 20, close_nowrite 10, MOVED_TO (on the directory), attrib 4
// unix cp: OPEN, MODIFY, CLOSE_WRITE, ATTRIB
// unix mv: MOVED_TO (on the directory)
2023-05-11 14:15:27 +00:00
self . wds [ self . nfds_t ] = . {
2024-05-02 21:38:04 +00:00
. wd = try std . posix . inotify_add_watchZ (
2023-05-11 14:15:27 +00:00
self . inotify_fd . ? ,
path . * ,
std . os . linux . IN . CLOSE_WRITE ,
) ,
. path = path ,
} ;
if ( self . wds [ self . nfds_t ] . wd = = - 1 )
2023-05-09 16:18:38 +00:00
@panic ( " could not set watch " ) ;
2023-05-11 14:15:27 +00:00
log . debug ( " watch added. fd {d}, wd {d}. Path {s} " , . { self . inotify_fd . ? , self . wds [ self . nfds_t ] . wd , path } ) ;
2023-05-09 16:18:38 +00:00
self . nfds_t + = 1 ;
2023-05-11 14:15:27 +00:00
try self . addDirWatch ( path ) ;
2023-05-10 22:05:44 +00:00
if ( self . watch_started ) self . reloadWatch ( ) catch @panic ( " could not reload watch " ) ;
2023-05-09 16:18:38 +00:00
return self . nfds_t - 1 ;
}
2023-05-10 22:05:44 +00:00
2023-05-11 14:15:27 +00:00
// This will add a hidden directory watch to catch OS moves into place
fn addDirWatch ( self : * Self , path : * [ ] const u8 ) ! void {
const dirname = std . fs . path . dirname ( path . * ) . ? ; // TODO: reimplement std.fs.path.dirname as we're getting a local in here
log . debug ( " addDirWatch: dir_nfds_t: {d}, dir: {s} " , . { self . dir_nfds_t , dirname } ) ;
if ( self . dir_nfds_t > 1 )
for ( 0 . . self . dir_nfds_t ) | inx |
if ( self . dir_wds [ inx ] . path ) | p |
if ( std . mem . eql ( u8 , std . fs . path . dirname ( p . * ) . ? , dirname ) )
return ; // We are already watching this directory
// We do not have a directory watch
self . dir_wds [ self . dir_nfds_t ] = . {
2024-05-02 21:38:04 +00:00
. wd = try std . posix . inotify_add_watch ( self . inotify_fd . ? , dirname , std . os . linux . IN . MOVED_TO ) ,
2023-05-11 14:15:27 +00:00
. path = path , // we store path rather than directory because doing this without an allocator is...tough
} ;
self . dir_nfds_t + = 1 ;
log . debug ( " directory watch added. fd {d}, wd {d}, dir {s} " , . { self . inotify_fd . ? , self . wds [ self . nfds_t ] . wd , dirname } ) ;
}
2023-05-10 22:05:44 +00:00
fn reloadWatch ( self : Self ) ! void {
try self . sendControl ( 'c' ) ;
}
pub fn stopWatch ( self : Self ) ! void {
try self . sendControl ( 'q' ) ;
}
fn sendControl ( self : Self , control : u8 ) ! void {
// Sockets...where Unix still pretends everything is a file, but it's not...
//
// For client processing, there are a bunch of steps, but the zig stdlib
// saves us a bunch of work. Once we do std.net.connectUnixSocket(), we
// get a stream back that has reader() and writer() calls
//
// log.debug("request to send control 0x{x}", .{control});
if ( self . control_socket = = null ) return ; // nothing to do
// log.debug("tid={d} opening stream", .{std.Thread.getCurrentId()});
2023-10-26 22:40:23 +00:00
var stream = try std . net . connectUnixSocket ( self . sock_name ) ;
2023-05-10 22:05:44 +00:00
defer stream . close ( ) ;
log . debug ( " tid={d} sending control 0x{x} on socket fd={d} " , . { std . Thread . getCurrentId ( ) , control , stream . handle } ) ;
try stream . writer ( ) . writeByte ( control ) ;
}
/// creates a control socket. This allows for managing the watcher. With it,
/// you can gracefully terminate the process and you can add files after the fact
fn addControlSocket ( self : * Self , path : [ : 0 ] const u8 ) ! void {
2023-05-13 00:11:36 +00:00
// TODO: I now believe std.net.StreamServer will handle a lot of these details for us.
// something like: var server = std.net.StreamServer.init(.{ ... }); server.listen(.{...});
// and use server.sockfd.? as my control socket
//
2023-05-10 22:05:44 +00:00
// This function theoretically should work without requiring linux...except this inotify call,
// which is completely linux specific
2024-05-02 21:38:04 +00:00
self . inotify_fd = self . inotify_fd orelse try std . posix . inotify_init1 ( std . os . linux . IN . NONBLOCK ) ;
2023-05-10 22:05:44 +00:00
log . debug ( " Established inotify file descriptor {d} " , . { self . inotify_fd . ? } ) ;
errdefer {
2024-05-02 21:38:04 +00:00
std . posix . close ( self . inotify_fd . ? ) ;
2023-05-10 22:05:44 +00:00
self . inotify_fd = null ;
}
// this should work on all systems theoretically, but I believe would work only
// on *nix systems
//
// Sockets...where Unix still pretends everything is a file, but it's not...
// We'll create a unix socket, which looks like a file on the file system
//
// For client processing, see comments in the sendControl function
//
// From the "server" perspective, we need to to this initially:
// 1. std.os.socket: create the socket. This file descriptor should be used in poll(2) calls
// 2. std.os.bind: tell the system where the socket is (here, it's the filesystem path)
// 3. std.os.listen: tell the system how many simultaneous connections we can have
//
// At this point, clients can write to the socket (but that's not typical fs ops either)
// To read from the socket, we need to:
//
// 4. std.os.accept: create a file descriptor from the socket descriptor
// 5. std.os.recv: works just like read(2). Call lots
// 6. std.os.close: close the fd
//
// On end of use, we need to std.os.closeSocket()
2024-05-02 21:38:04 +00:00
const sock = try std . posix . socket (
2023-05-10 22:05:44 +00:00
std . os . linux . AF . LOCAL ,
2024-05-02 21:38:04 +00:00
std . os . linux . SOCK . STREAM | std . posix . SOCK . CLOEXEC ,
2023-05-10 22:05:44 +00:00
0 ,
) ;
2024-05-02 21:38:04 +00:00
errdefer std . posix . close ( sock ) ;
2023-05-10 22:05:44 +00:00
const sockaddr = try std . net . Address . initUnix ( path ) ;
2023-10-26 22:40:23 +00:00
log . debug ( " binding to path: {s} " , . { path } ) ;
2024-05-02 21:38:04 +00:00
try std . posix . bind ( sock , & sockaddr . any , sockaddr . getOsSockLen ( ) ) ;
try std . posix . listen ( sock , 10 ) ;
2023-05-10 22:05:44 +00:00
self . control_socket = sock ;
log . debug ( " added control socket with fd={d} " , . { sock } ) ;
}