From b5afdbf7193c56e84e9a5b5acb388c8f449a7a6d Mon Sep 17 00:00:00 2001 From: Emil Lerch Date: Mon, 15 Sep 2025 15:07:28 -0700 Subject: [PATCH] execute a command based on detected speech --- src/main.zig | 214 +++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 183 insertions(+), 31 deletions(-) diff --git a/src/main.zig b/src/main.zig index b04a114..12e9610 100644 --- a/src/main.zig +++ b/src/main.zig @@ -7,7 +7,10 @@ const stt = @import("stt.zig"); /// Global flag for signal handling var should_exit = std.atomic.Value(bool).init(false); -/// Demo implementation of speech event handler with comprehensive error handling +// SAFETY: we are setting this value at top of main before use +/// We need a global here to reclaim process when getting SIGCHLD +var handler: SpeechHandler = undefined; + const SpeechHandler = struct { allocator: std.mem.Allocator, speech_count: u32 = 0, @@ -15,7 +18,10 @@ const SpeechHandler = struct { warning_count: u32 = 0, recoverable_error_count: u32 = 0, exec_program: ?[]const u8 = null, + child_processes: std.ArrayList(*Process) = .{}, + const max_children = 5; + const Process = struct { child: ?*std.process.Child, start: i64, id: std.process.Child.Id }; // why id? /// Handle detected speech fn onSpeech(ctx: *anyopaque, text: []const u8) void { if (builtin.is_test) return; // Suppress output during tests @@ -25,21 +31,141 @@ const SpeechHandler = struct { // Print with timestamp for better experience const timestamp = std.time.timestamp(); - std.log.debug("[{}] Speech {}->{?s}: {s}", .{ + var stdout_buffer: [1024]u8 = undefined; + var stdout_writer = std.fs.File.stdout().writer(&stdout_buffer); + const stdout = &stdout_writer.interface; + defer stdout.flush() catch std.log.warn("Caught error writing speech data to stdout", .{}); + stdout.print("[{}] Speech {}->{?s}: {s}", .{ timestamp, self.speech_count, self.exec_program, text, - }); + }) catch std.log.warn("Caught error writing speech data to stdout", .{}); // Execute program if specified - if (self.exec_program) |program| { - var child = std.process.Child.init(&[_][]const u8{ program, text }, self.allocator); - child.spawn() catch |err| { - std.log.err("Failed to execute program '{s}': {}", .{ program, err }); - return; + if (self.exec_program) |program| self.exec(text) catch |err| { + std.log.err("Failed to execute program '{s}': {}", .{ program, err }); + }; + } + fn exec(self: *SpeechHandler, text: []const u8) !void { + const program = self.exec_program.?; // should only be called when exec_program is not null + // We need to be able to clean up at some point in the future, but we don't + // care about these processes otherwise + const process = try self.allocator.create(Process); + errdefer self.allocator.destroy(process); + process.* = .{ + .start = std.time.timestamp(), + .child = try self.allocator.create(std.process.Child), + // SAFETY: this is set 8 lines below before use + .id = undefined, + }; + process.child.?.* = std.process.Child.init(&[_][]const u8{ program, text }, self.allocator); + try self.child_processes.append(self.allocator, process); + errdefer _ = self.child_processes.pop(); + try process.child.?.spawn(); + try process.child.?.waitForSpawn(); + process.id = process.child.?.id; + try self.reclaimProcessesPosix(false); + } + + fn reclaimProcessesPosix(self: *SpeechHandler, reap_all: bool) !void { + if (!reap_all and self.child_processes.items.len <= max_children) return; + std.log.debug("Reclaiming memory from {s} processes", .{if (reap_all) "ALL" else "completed"}); + if (self.child_processes.items.len == 0) return; + + // If we're not reaping everything, we can just as well skip the last + // one as we just started it + const end = self.child_processes.items.len - @as(usize, if (reap_all) 0 else 1); + const now = std.time.timestamp(); + for (0..end) |i| { + const proc = self.child_processes.items[i]; + // Check timestamp. If we're either a) whacking everything, or b) 10 seconds have elapsed, + // we kill it + const should_kill = (proc.start + 10) <= now; + if (proc.child == null or !should_kill) continue; + const child = proc.child.?; + const proc_exists = posixPidRunning(child.*) catch |err| { + // not sure what we do here + switch (err) { + error.ProcessNotFound => unreachable, // handled in posixPidRunning + error.PermissionDenied => { + std.log.err("Permission denied trying to reap pid {d}", .{child.id}); + continue; // guess we'll keep it on the list and the OS will deal when we exit? + }, + error.Unexpected => @panic("Unexpected error getting pid information. This should not happen"), + } }; + if (!proc_exists) { + _ = try child.wait(); // effectively deinit(). We don't care about term value (I hope?) + self.allocator.destroy(child); + proc.child = null; + continue; + } + std.log.warn("Process ran longer than 10 seconds, killing pid {d}", .{child.id}); + proc.child = null; // avoid race condition between the kill below and the SIGCHLD processing + _ = child.kill() catch |err| { + // really should work at this point + std.log.err("Permission denied trying to kill pid {d}: {}", .{ child.id, err }); + continue; + }; + self.allocator.destroy(child); } + + if (reap_all) { + std.log.debug("Shutting down, waiting for processes to finish", .{}); + for (self.child_processes.items) |proc| { + if (proc.child) |c| { + // Child id seems undefined here for some reason, but on sigchld we're ok + // I suspect this might be a race condition somehow but not sure how + // We've worked around it by copying the pid out of the child into the process + // when we spawn it, then read that here, but it is the only place we use + // this value + std.log.info("Waiting on pid {d}", .{proc.id}); + _ = try c.wait(); + self.allocator.destroy(c); + } + self.allocator.destroy(proc); + } + self.child_processes.deinit(self.allocator); + std.log.debug("All processes finished", .{}); + return; + } + // TODO: What's the right number here? We want to clear out memory from + // the array list + if (self.child_processes.items.len > 20) { + std.log.debug("consolidating process tracking array", .{}); + var open_procs: usize = 0; + for (self.child_processes.items) |proc| { + if (proc.child) |_| open_procs += 1; + } + const cp = try self.child_processes.toOwnedSlice(self.allocator); + defer self.allocator.free(cp); + try self.child_processes.ensureTotalCapacity(self.allocator, open_procs); + for (cp) |proc| { + if (proc.child) |_| + self.child_processes.appendAssumeCapacity(proc) + else + self.allocator.destroy(proc); + } + } + } + + fn posixPidRunning(process: std.process.Child) std.posix.KillError!bool { + // From https://man7.org/linux/man-pages/man2/kill.2.html: + // + // If sig is 0, then no signal is sent, but existence and permission + // checks are still performed; this can be used to check for the + // existence of a process ID or process group ID that the caller is + // permitted to signal. + std.posix.kill(process.id, 0) catch |err| { + if (err == error.ProcessNotFound) return false; + return err; // Permission denied + }; + return true; // process is running + } + + pub fn deinit(self: *SpeechHandler) void { + self.reclaimProcessesPosix(true) catch |err| std.log.err("Error reclaiming processes: {}", .{err}); } /// Handle basic errors (fallback for compatibility) @@ -138,9 +264,35 @@ const SpeechHandler = struct { }; /// Signal handler for graceful shutdown -fn signalHandler(sig: c_int) callconv(.c) void { - _ = sig; - should_exit.store(true, .release); +fn signalHandler(sig: i32) callconv(.c) void { + if (sig == std.posix.SIG.INT) { + should_exit.store(true, .release); + } +} + +fn signalAction(sig: i32, info: *const std.posix.siginfo_t, _: ?*anyopaque) callconv(.c) void { + // NOTE: info only works correctly if std.posix.SA.SIGINFO is in the flags + // std.log.debug("signal action. sig {d}", .{sig}); + if (sig == std.posix.SIG.CHLD) { + const pid = info.fields.common.first.piduid.pid; + std.log.debug("SIGCHLD on pid {d}", .{pid}); + for (handler.child_processes.items) |proc| { + if (proc.child) |child| { + if (child.id == pid) { + const term = child.wait() catch @panic("child.wait should not throw error at this point"); // I don't *think* this could fail at this point... + if (term == .Exited) { // this should be the only possible term value since the handler is set up with SA_NOCLDSTOP + if (term.Exited > 0) std.log.warn("Child process exited with non-zero return code {d}", .{term.Exited}); + handler.allocator.destroy(child); + proc.child = null; + } + } + } + } + handler.reclaimProcessesPosix(false) catch |err| { + std.log.err("Caught error reclaiming processes. This is fatal, shutting down. Error: {}", .{err}); + signalHandler(std.posix.SIG.INT); + }; + } } pub fn main() !void { @@ -163,11 +315,18 @@ pub fn main() !void { _ = c.setenv("ALSA_CONFIG_PATH", "alsa.conf", 1); } - // Set up signal handling for Ctrl+C (SIGINT) - const c = @cImport({ - @cInclude("signal.h"); - }); - _ = c.signal(c.SIGINT, signalHandler); + const sigintact = std.posix.Sigaction{ + .handler = .{ .handler = signalHandler }, + .mask = std.posix.sigemptyset(), + .flags = 0, + }; + std.posix.sigaction(std.c.SIG.INT, &sigintact, null); + const sigchldact = std.posix.Sigaction{ + .handler = .{ .sigaction = signalAction }, + .mask = std.posix.sigemptyset(), + .flags = std.posix.SA.NOCLDSTOP | std.posix.SA.SIGINFO, + }; + std.posix.sigaction(std.c.SIG.CHLD, &sigchldact, null); // Parse command line arguments const args = try std.process.argsAlloc(allocator); @@ -186,10 +345,11 @@ pub fn main() !void { } // Create handler with statistics tracking - var handler = SpeechHandler{ + handler = SpeechHandler{ .allocator = allocator, .exec_program = exec_program, }; + defer handler.deinit(); const speech_handler = stt.SpeechEventHandler{ .onSpeechFn = SpeechHandler.onSpeech, .onErrorFn = SpeechHandler.onError, @@ -239,7 +399,7 @@ pub fn main() !void { .buffer_size = 256, // Existing buffer size for low latency }; - std.debug.print("Initializing STT library...\n", .{}); + std.log.debug("Initializing STT library...", .{}); var session = stt.SttSession.init(allocator, options) catch |err| { std.log.err("Failed to initialize STT library: {}", .{err}); std.log.err("Please ensure:", .{}); @@ -248,11 +408,7 @@ pub fn main() !void { std.log.err(" - You have permission to access the audio device", .{}); return; }; - defer { - _ = stdout.writeAll("Cleaning up STT session...") catch {}; - session.deinit(); - _ = stdout.writeAll("\n") catch {}; - } + defer session.deinit(); std.log.info("Program to execute on speech detection: {?s}", .{exec_program}); std.log.info("STT library initialized successfully with configuration:", .{}); @@ -283,10 +439,7 @@ pub fn main() !void { } return; }; - defer { - std.debug.print("Stopping speech recognition...\n", .{}); - session.stop_listening(); - } + defer session.stop_listening(); std.log.info("Speech recognition started successfully", .{}); _ = stdout.writeAll("Listening for speech... (Press Ctrl+C to exit)\n") catch {}; @@ -323,15 +476,14 @@ pub fn main() !void { _ = stdout.writeAll("Session completed successfully.\n") catch {}; } -// Test the functionality -test "handler functionality" { +test "handler callbacks" { const testing = std.testing; - var handler = SpeechHandler{ .allocator = std.testing.allocator }; + var sh = SpeechHandler{ .allocator = std.testing.allocator }; const speech_handler = stt.SpeechEventHandler{ .onSpeechFn = SpeechHandler.onSpeech, .onErrorFn = SpeechHandler.onError, - .ctx = &handler, + .ctx = &sh, }; // Test that callbacks can be invoked without crashing