add file watcher
This commit is contained in:
		
							parent
							
								
									08dec6a978
								
							
						
					
					
						commit
						5e09b735c6
					
				
					 4 changed files with 277 additions and 2 deletions
				
			
		|  | @ -24,6 +24,8 @@ pub fn build(b: *std.Build) void { | |||
|         .optimize = optimize, | ||||
|     }); | ||||
| 
 | ||||
|     exe.linkLibC(); | ||||
| 
 | ||||
|     const lib = b.addSharedLibrary(.{ | ||||
|         .name = "faas-proxy-sample-lib", | ||||
|         // In this case the main source file is merely a path, however, in more | ||||
|  | @ -60,6 +62,9 @@ pub fn build(b: *std.Build) void { | |||
|         run_cmd.addArgs(args); | ||||
|     } | ||||
| 
 | ||||
|     const run_step = b.step("run", "Run the app"); | ||||
|     run_step.dependOn(&run_cmd.step); | ||||
| 
 | ||||
|     // Creates a step for unit testing. This only builds the test executable | ||||
|     // but does not run it. | ||||
|     const main_tests = b.addTest(.{ | ||||
|  |  | |||
							
								
								
									
										126
									
								
								src/Watch.zig
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										126
									
								
								src/Watch.zig
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,126 @@ | |||
| const builtin = @import("builtin"); | ||||
| const std = @import("std"); | ||||
| const c = @cImport({ | ||||
|     @cInclude("poll.h"); | ||||
| }); | ||||
| 
 | ||||
| const MAX_FDS = 1024; | ||||
| 
 | ||||
| const Self = @This(); | ||||
| 
 | ||||
| fileChanged: *const fn (usize) void, | ||||
| inotify_fd: ?std.os.fd_t = null, | ||||
| // sizeof(std.os.pollfd) == 8, so this is 8k | ||||
| // fds: [MAX_FDS]std.os.pollfd = [_]std.os.pollfd{.{ .fd = 0, .events = 0, .revents = 0 }} ** MAX_FDS, | ||||
| nfds_t: usize = 0, | ||||
| wds: [MAX_FDS]i32 = [_]i32{0} ** MAX_FDS, | ||||
| modified: [MAX_FDS]bool = [_]bool{false} ** MAX_FDS, | ||||
| 
 | ||||
| pub fn init(file_changed: *const fn (usize) void) Self { | ||||
|     if (builtin.os.tag != .linux) | ||||
|         @compileError("Unsupported OS"); | ||||
|     return .{ | ||||
|         .fileChanged = file_changed, | ||||
|     }; | ||||
| } | ||||
| 
 | ||||
| pub fn deinit(self: *Self) void { | ||||
|     if (self.inotify_fd) |fd| { | ||||
|         for (self.wds) |wd| { | ||||
|             const rc = std.os.linux.inotify_rm_watch(fd, wd); | ||||
|             // Errno can only be EBADF, EINVAL if either the inotify fs or the wd are invalid | ||||
|             std.debug.assert(rc == 0); | ||||
|         } | ||||
|         std.os.close(fd); | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| pub fn watchFds(self: *Self) void { | ||||
|     while (true) { | ||||
|         if (self.nfds_t == 0) { | ||||
|             std.time.sleep(250); | ||||
|             continue; | ||||
|         } | ||||
|         var fds = &[_]std.os.pollfd{.{ .fd = self.inotify_fd.?, .events = c.POLLIN, .revents = 0 }}; | ||||
|         // NOTE: There is a std.io.poll that provides a higher level abstraction. | ||||
|         // However, this API is strictly related to the use case of an open stream | ||||
|         // for which we are awaiting data. In this case, we are polling for | ||||
|         // an inotify event, for which no abstraction currently exists | ||||
|         // | ||||
|         // std.fs.watch looks really good...but it requires event based I/O, | ||||
|         // which is not yet ready to be (re)added. | ||||
|         if ((std.os.poll( | ||||
|             fds, | ||||
|             -1, // Infinite timeout | ||||
|         ) catch @panic("poll error")) > 0) { | ||||
|             if (fds[0].revents & c.POLLIN == c.POLLIN) { // POLLIN means "there is data to read" | ||||
|                 var event_buf: [4096]u8 align(@alignOf(std.os.linux.inotify_event)) = undefined; | ||||
|                 // "borrowed" from https://ziglang.org/documentation/master/std/src/std/fs/watch.zig.html#L588 | ||||
|                 const bytes_read = std.os.read(self.inotify_fd.?, &event_buf) catch unreachable; | ||||
| 
 | ||||
|                 var ptr: [*]u8 = &event_buf; | ||||
|                 const end_ptr = ptr + bytes_read; | ||||
|                 while (@ptrToInt(ptr) < @ptrToInt(end_ptr)) { | ||||
|                     const ev = @ptrCast( | ||||
|                         *const std.os.linux.inotify_event, | ||||
|                         @alignCast(@alignOf(*const std.os.linux.inotify_event), ptr), | ||||
|                     ); | ||||
| 
 | ||||
|                     if (ev.mask & std.os.linux.IN.MODIFY == std.os.linux.IN.MODIFY) { | ||||
|                         for (self.wds, 0..) |wd, inx| { | ||||
|                             if (ev.wd == wd) | ||||
|                                 self.modified[inx] = true; | ||||
|                         } | ||||
|                     } | ||||
|                     // attrib added as build process moves in place and modifies attributes | ||||
|                     // TODO: Also watch MOVED_TO, which is on the directory... | ||||
|                     if (ev.mask & std.os.linux.IN.CLOSE_WRITE == std.os.linux.IN.CLOSE_WRITE or | ||||
|                         ev.mask & std.os.linux.IN.ATTRIB == std.os.linux.IN.ATTRIB) | ||||
|                     { | ||||
|                         for (self.wds, 0..) |wd, inx| { | ||||
|                             if (ev.wd == wd) | ||||
|                                 self.fileChanged(inx); | ||||
|                         } | ||||
|                     } | ||||
|                     if (ev.mask & std.os.linux.IN.CLOSE_NOWRITE == std.os.linux.IN.CLOSE_NOWRITE) { | ||||
|                         for (self.wds, 0..) |wd, inx| { | ||||
|                             if (ev.wd == wd and self.modified[inx]) { | ||||
|                                 self.modified[inx] = false; | ||||
|                                 self.fileChanged(inx); | ||||
|                             } | ||||
|                         } | ||||
|                     } | ||||
|                     // see man 2 poll | ||||
|                     self.handleFile(fds[0]); | ||||
|                     ptr = @alignCast( | ||||
|                         @alignOf(std.os.linux.inotify_event), | ||||
|                         ptr + @sizeOf(std.os.linux.inotify_event) + ev.len, | ||||
|                     ); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| pub fn addFileWatch(self: *Self, path: [:0]const u8) !usize { | ||||
|     self.inotify_fd = self.inotify_fd orelse try std.os.inotify_init1(std.os.linux.IN.NONBLOCK); | ||||
|     errdefer { | ||||
|         std.os.close(self.inotify_fd.?); | ||||
|         self.inotify_fd = null; | ||||
|     } | ||||
|     // open 20, close_norite 10, attrib 4 | ||||
|     self.wds[self.nfds_t] = try std.os.inotify_add_watchZ( | ||||
|         self.inotify_fd.?, | ||||
|         path, | ||||
|         std.os.linux.IN.ATTRIB | std.os.linux.IN.CLOSE | std.os.linux.IN.CLOSE_WRITE | std.os.linux.IN.MODIFY, | ||||
|     ); | ||||
|     if (self.wds[self.nfds_t] == -1) | ||||
|         @panic("could not set watch"); | ||||
|     self.nfds_t += 1; | ||||
|     return self.nfds_t - 1; | ||||
| } | ||||
| 
 | ||||
| fn handleFile(self: Self, fd: std.os.pollfd) void { | ||||
|     _ = fd; | ||||
|     _ = self; | ||||
| } | ||||
|  | @ -1,6 +1,14 @@ | |||
| const std = @import("std"); | ||||
| const testing = std.testing; | ||||
| 
 | ||||
| export fn serve() void { | ||||
|     const stdout_file = std.io.getStdOut().writer(); | ||||
|     var bw = std.io.bufferedWriter(stdout_file); | ||||
|     const stdout = bw.writer(); | ||||
|     stdout.print(" 3 ", .{}) catch unreachable; | ||||
|     bw.flush() catch unreachable; | ||||
| } | ||||
| 
 | ||||
| export fn add(a: i32, b: i32) i32 { | ||||
|     return a + b; | ||||
| } | ||||
|  |  | |||
							
								
								
									
										140
									
								
								src/main.zig
									
										
									
									
									
								
							
							
						
						
									
										140
									
								
								src/main.zig
									
										
									
									
									
								
							|  | @ -1,8 +1,125 @@ | |||
| const std = @import("std"); | ||||
| const c = @cImport({ | ||||
|     @cInclude("dlfcn.h"); | ||||
| }); | ||||
| 
 | ||||
| const Watch = @import("Watch.zig"); | ||||
| const serve_op = *const fn () void; | ||||
| 
 | ||||
| var shutdown = false; | ||||
| const timeout = 250; | ||||
| 
 | ||||
| const Executor = struct { | ||||
|     path: [:0]const u8, | ||||
|     library: ?*anyopaque = null, | ||||
|     serve: ?serve_op = null, | ||||
|     watch: ?usize = null, | ||||
| }; | ||||
| 
 | ||||
| var executors = [_]Executor{.{ | ||||
|     .path = "/home/lobo/home/faas-proxy/zig-out/lib/libfaas-proxy-sample-lib.so", | ||||
| }}; | ||||
| 
 | ||||
| var watcher = Watch.init(executorChanged); | ||||
| 
 | ||||
| fn serve() !void { | ||||
|     // if (some path routing thing) { | ||||
| 
 | ||||
|     (try getExecutor(0))(); | ||||
|     // if (std.c.dlerror()) |_| { // TODO: use capture | ||||
|     //     return error.CouldNotLoadSymbolServe; | ||||
|     // } | ||||
|     // TODO: only close on reload | ||||
|     // if (std.c.dlclose(library.?) != 0) { | ||||
|     //     return error.CouldNotUnloadLibrary; | ||||
|     // } | ||||
|     // library = null; | ||||
|     // } | ||||
| } | ||||
| fn getExecutor(key: usize) !serve_op { | ||||
|     var executor = &executors[key]; | ||||
|     if (executor.serve) |s| return s; | ||||
| 
 | ||||
|     executor.library = blk: { | ||||
|         if (executor.library) |l| { | ||||
|             break :blk l; | ||||
|         } | ||||
|         const l = try dlopen(executor.path); | ||||
|         errdefer if (std.c.dlclose(l) != 0) | ||||
|             @panic("System unstable: Error after library open and cannot close"); | ||||
|         executor.watch = try watcher.addFileWatch(executor.path); | ||||
|         break :blk l; | ||||
|     }; | ||||
| 
 | ||||
|     // std.c.dlerror(); | ||||
|     const serve_function = std.c.dlsym(executor.library.?, "serve"); | ||||
|     if (serve_function == null) return error.CouldNotLoadSymbolServe; | ||||
| 
 | ||||
|     executor.serve = @ptrCast(serve_op, serve_function.?); | ||||
|     return executor.serve.?; | ||||
| } | ||||
| 
 | ||||
| // This works | ||||
| fn executorChanged(watch: usize) void { | ||||
|     for (&executors) |*executor| { | ||||
|         if (executor.watch) |w| { | ||||
|             if (w == watch) { | ||||
|                 if (executor.library) |l| { | ||||
|                     // TODO: These two lines could introduce a race. Right now that would mean a panic | ||||
|                     executor.serve = null; | ||||
|                     if (std.c.dlclose(l) != 0) | ||||
|                         @panic("System unstable: Error after library open and cannot close"); | ||||
|                 } | ||||
|                 executor.library = null; | ||||
|                 executor.serve = null; | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| // NOTE: this will be on a different thread. This code does not work, and I | ||||
| // am fairly certain it is because we can't share a function pointer between | ||||
| // threads | ||||
| // fn executorChanged(watch: usize) void { | ||||
| //     std.debug.print("executor with watch {d} changed\n", .{watch}); | ||||
| //     for (&executors) |*executor| { | ||||
| //         if (executor.watch) |w| { | ||||
| //             if (w == watch) { | ||||
| //                 if (executor.library) |l| { | ||||
| //                     std.debug.print("reloading executor at path: {s}\n", .{executor.path}); | ||||
| //                     const newlib = dlopen(executor.path) catch { | ||||
| //                         std.debug.print("could not reload! error opening library\n", .{}); | ||||
| //                         return; | ||||
| //                     }; | ||||
| //                     errdefer if (std.c.dlclose(newlib) != 0) | ||||
| //                         @panic("System unstable: Error after library open and cannot close"); | ||||
| //                     const serve_function = std.c.dlsym(newlib, "serve"); | ||||
| //                     if (serve_function == null) { | ||||
| //                         std.debug.print("could not reload! error finding symbol\n", .{}); | ||||
| //                         return; | ||||
| //                     } | ||||
| //                     // new lib all loaded up - do the swap and close the old | ||||
| //                     std.debug.print("updating function and library\n", .{}); | ||||
| //                     executor.serve = @ptrCast(serve_op, serve_function.?); | ||||
| //                     executor.library = newlib; | ||||
| //                     if (std.c.dlclose(l) != 0) | ||||
| //                         @panic("System unstable: Error after library open and cannot close"); | ||||
| //                     std.debug.print("closed old library\n", .{}); | ||||
| //                 } | ||||
| //             } | ||||
| //         } | ||||
| //     } | ||||
| // } | ||||
| 
 | ||||
| fn dlopen(path: [:0]const u8) !*anyopaque { | ||||
|     // We need now (and local) because we're about to call it | ||||
|     const lib = std.c.dlopen(path, c.RTLD_NOW); | ||||
|     if (lib) |l| return l; | ||||
|     return error.CouldNotOpenDynamicLibrary; | ||||
| } | ||||
| 
 | ||||
| pub fn main() !void { | ||||
|     // Prints to stderr (it's a shortcut based on `std.io.getStdErr()`) | ||||
|     std.debug.print("All your {s} are belong to us.\n", .{"codebase"}); | ||||
|     defer watcher.deinit(); | ||||
| 
 | ||||
|     // stdout is for the actual output of your application, for example if you | ||||
|     // are implementing gzip, then only the compressed bytes should be sent to | ||||
|  | @ -11,9 +128,28 @@ pub fn main() !void { | |||
|     var bw = std.io.bufferedWriter(stdout_file); | ||||
|     const stdout = bw.writer(); | ||||
| 
 | ||||
|     const stderr_file = std.io.getStdErr().writer(); | ||||
|     var bw_stderr = std.io.bufferedWriter(stderr_file); | ||||
|     const stderr = bw_stderr.writer(); | ||||
| 
 | ||||
|     try stdout.print("Run `zig build test` to run the tests.\n", .{}); | ||||
| 
 | ||||
|     try bw.flush(); // don't forget to flush! | ||||
|     const watcher_thread = try std.Thread.spawn(.{}, Watch.watchFds, .{&watcher}); | ||||
| 
 | ||||
|     while (true) { | ||||
|         std.time.sleep(std.time.ns_per_s * 2); | ||||
|         try stdout.print("Serving...", .{}); | ||||
|         try bw.flush(); | ||||
|         serve() catch |err| { | ||||
|             try stderr.print("Error serving request ({any})\n", .{err}); | ||||
|             try bw_stderr.flush(); | ||||
|         }; | ||||
|         try stdout.print("served\n", .{}); | ||||
|         try bw.flush(); | ||||
|     } | ||||
|     shutdown = true; | ||||
|     watcher_thread.join(); | ||||
| } | ||||
| 
 | ||||
| test "simple test" { | ||||
|  |  | |||
		Loading…
	
	Add table
		
		Reference in a new issue