From d79f573d79676c6f1e52cf97bbce0a93756307aa Mon Sep 17 00:00:00 2001 From: jdx <216188+jdx@users.noreply.github.com> Date: Fri, 6 Dec 2024 18:41:59 -0600 Subject: [PATCH] wip --- src/env.rs | 1 - src/supervisor.rs | 47 +++++++++++++++++++++++++++++++++++------------ 2 files changed, 35 insertions(+), 13 deletions(-) diff --git a/src/env.rs b/src/env.rs index c0872e5..2b6e9ed 100644 --- a/src/env.rs +++ b/src/env.rs @@ -1,4 +1,3 @@ -use interprocess::local_socket::{GenericFilePath, GenericNamespaced, NameType, ToFsName, ToNsName}; use once_cell::sync::Lazy; pub use std::env::*; use std::path::PathBuf; diff --git a/src/supervisor.rs b/src/supervisor.rs index fcb0518..7c53093 100644 --- a/src/supervisor.rs +++ b/src/supervisor.rs @@ -2,15 +2,17 @@ use crate::state_file::{StateFile, StateFileDaemon, StateFileDaemonStatus}; use crate::{async_watcher, env, Result}; use duct::cmd; use interprocess::local_socket::tokio::prelude::*; -use interprocess::local_socket::{GenericFilePath, GenericNamespaced, ListenerOptions}; +use interprocess::local_socket::{GenericFilePath, ListenerOptions}; use std::io; use std::path::PathBuf; use std::process::exit; use std::time::Duration; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; #[cfg(unix)] -use tokio::signal::unix::{signal, SignalKind}; -use tokio::{fs, select, time, try_join}; +use tokio::signal::unix::{SignalKind}; +use tokio::sync::mpsc; +use tokio::sync::mpsc::Receiver; +use tokio::{fs, select, signal, time, try_join}; pub struct Supervisor { state_file: StateFile, @@ -21,14 +23,10 @@ const INTERVAL: Duration = Duration::from_secs(10); enum Event { FileChange(Vec), - Signal(CrossPlatformSignal), + Signal, Interval, } -enum CrossPlatformSignal { - Sigterm, -} - impl Supervisor { pub fn new(pid_file: StateFile) -> Self { Self { state_file: pid_file, last_run: time::Instant::now() } @@ -41,7 +39,7 @@ impl Supervisor { let _ = fs::remove_file(&*env::IPC_SOCK_PATH).await; let opts = ListenerOptions::new().name(env::IPC_SOCK_PATH.clone().to_fs_name::()?); let listener = opts.create_tokio()?; - + self.state_file.daemons.insert("pitchfork".to_string(), StateFileDaemon { pid, status: StateFileDaemonStatus::Running }); self.state_file.write()?; @@ -53,7 +51,16 @@ impl Supervisor { ]).await?; #[cfg(unix)] - let mut sigterm = signal(SignalKind::terminate())?; + let mut sigterm = signals(vec![ + SignalKind::terminate(), + SignalKind::alarm(), + SignalKind::interrupt(), + SignalKind::quit(), + SignalKind::hangup(), + SignalKind::pipe(), + SignalKind::user_defined1(), + SignalKind::user_defined2(), + ])?; self.refresh(Event::Interval).await?; @@ -61,7 +68,7 @@ impl Supervisor { #[cfg(unix)] select! { _ = sigterm.recv() => { - if let Err(err) = self.refresh(Event::Signal(CrossPlatformSignal::Sigterm)).await { + if let Err(err) = self.refresh(Event::Signal).await { error!("supervisor error: {:?}", err); } }, @@ -149,7 +156,7 @@ impl Supervisor { // self.pid_file = PidFile::read(&self.pid_file.path)?; // } } - Event::Signal(CrossPlatformSignal::Sigterm) => { + Event::Signal => { info!("received SIGTERM, stopping"); exit(0); } @@ -174,3 +181,19 @@ impl Supervisor { exit(0); } } + + +fn signals(signals: Vec) -> io::Result> { + let (tx, rx) = mpsc::channel(1); + for signal in signals { + let tx = tx.clone(); + tokio::spawn(async move { + let mut stream = signal::unix::signal(signal).unwrap(); + loop { + stream.recv().await; + tx.send(Event::Signal).await.unwrap(); + } + }); + } + Ok(rx) +} \ No newline at end of file