diff --git a/Cargo.lock b/Cargo.lock index ab55ed276a..25163a6642 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7459,6 +7459,7 @@ dependencies = [ "futures", "http 1.4.0", "itertools 0.14.0", + "libc", "nix 0.30.1", "rand", "regex", diff --git a/crates/local-cluster-runner/Cargo.toml b/crates/local-cluster-runner/Cargo.toml index c051627bee..2a05b913e4 100644 --- a/crates/local-cluster-runner/Cargo.toml +++ b/crates/local-cluster-runner/Cargo.toml @@ -30,6 +30,7 @@ futures = { workspace = true } enumset = { workspace = true } http = { workspace = true } itertools = { workspace = true } +libc = "0.2" nix = { version = "0.30.1", features = ["signal"] } rand = { workspace = true } regex = { workspace = true } diff --git a/crates/local-cluster-runner/src/node/mod.rs b/crates/local-cluster-runner/src/node/mod.rs index 0f3ecc420a..890d390274 100644 --- a/crates/local-cluster-runner/src/node/mod.rs +++ b/crates/local-cluster-runner/src/node/mod.rs @@ -71,6 +71,54 @@ use restate_types::{ nodes_config::{NodesConfiguration, Role}, }; +/// Tracks child process group IDs and kills them via an atexit handler. +/// +/// Each spawned node uses `.process_group(0)` so its PGID equals its PID. We track these +/// PGIDs and kill the entire process group on exit, which also reaps any children the +/// server may have forked. +/// +/// The `#[restate_core::test]` macro installs a panic hook that calls `std::process::exit(1)`, +/// which skips Rust Drop impls. This module ensures child processes are cleaned up even in +/// that case, since `std::process::exit` does run C atexit handlers. +/// +/// Set `LOCAL_CLUSTER_RUNNER_RETAIN_CLUSTER=true` to opt out (e.g. to inspect a failed cluster). +mod cleanup { + use std::sync::Mutex; + + /// Process group IDs to kill on exit. Since `.process_group(0)` is used, PGID == child PID. + static CHILD_PGIDS: Mutex> = Mutex::new(Vec::new()); + static REGISTERED: std::sync::Once = std::sync::Once::new(); + + pub(super) fn register(pgid: u32) { + REGISTERED.call_once(|| unsafe { + libc::atexit(kill_process_groups); + }); + if let Ok(mut pgids) = CHILD_PGIDS.lock() { + pgids.push(pgid); + } + } + + pub(super) fn unregister(pgid: u32) { + if let Ok(mut pgids) = CHILD_PGIDS.lock() { + pgids.retain(|&p| p != pgid); + } + } + + extern "C" fn kill_process_groups() { + if let Ok("true" | "1") = std::env::var("LOCAL_CLUSTER_RUNNER_RETAIN_CLUSTER").as_deref() { + return; + } + if let Ok(pgids) = CHILD_PGIDS.lock() { + for &pgid in pgids.iter() { + // Negative PID = kill the entire process group + unsafe { + libc::kill(-(pgid as i32), libc::SIGKILL); + } + } + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize, TypedBuilder)] pub struct NodeSpec { #[builder(mutators( @@ -295,6 +343,7 @@ impl NodeSpec { let mut child = cmd.spawn().map_err(NodeStartError::SpawnError)?; let pid = child.id().expect("child to have a pid"); + cleanup::register(pid); info!( %fabric_advertised_address, @@ -365,6 +414,13 @@ impl NodeSpec { let child_handle = tokio::spawn(async move { let (status, _) = tokio::join!(child.wait(), lines_fut); + // Unregister after both the process and its log pipes are done. We keep + // the PGID registered until here because the process *group* can outlive + // the leader — unregistering earlier would let descendants escape the + // atexit cleanup. Since we use killpg, sending SIGKILL to a fully-exited + // group just returns ESRCH harmlessly. + cleanup::unregister(pid); + match status { Ok(status) => { info!("Node {} exited with {status}", node_name); @@ -524,14 +580,18 @@ impl StartedNode { self.config().has_role(role) } - /// Send a SIGKILL to the current process, if it is running, and await for its exit + /// Send a SIGKILL to the node's process group, if it is running, and await for its exit pub async fn kill(&mut self) -> io::Result { match self.status { StartedNodeStatus::Exited(status) => Ok(status), StartedNodeStatus::Failed(kind) => Err(kind.into()), StartedNodeStatus::Running { pid, .. } => { - info!("Sending SIGKILL to node {} (pid {})", self.node_name(), pid); - match nix::sys::signal::kill( + info!( + "Sending SIGKILL to node {} process group (pid {})", + self.node_name(), + pid + ); + match nix::sys::signal::killpg( nix::unistd::Pid::from_raw(pid.try_into().expect("pid_t = i32")), nix::sys::signal::SIGKILL, ) { @@ -540,7 +600,7 @@ impl StartedNode { nix::errno::Errno::ESRCH => { self.status = StartedNodeStatus::Exited(ExitStatus::default()); Ok(ExitStatus::default()) - } // ignore "no such process" + } // ignore "no such process group" _ => Err(io::Error::from_raw_os_error(errno as i32)), }, } @@ -548,20 +608,24 @@ impl StartedNode { } } - /// Send a SIGTERM to the current process, if it is running + /// Send a SIGTERM to the node's process group, if it is running pub fn terminate(&self) -> io::Result<()> { match self.status { StartedNodeStatus::Exited(_) => Ok(()), StartedNodeStatus::Failed(kind) => Err(kind.into()), StartedNodeStatus::Running { pid, .. } => { - info!("Sending SIGTERM to node {} (pid {})", self.node_name(), pid); - match nix::sys::signal::kill( + info!( + "Sending SIGTERM to node {} process group (pid {})", + self.node_name(), + pid + ); + match nix::sys::signal::killpg( nix::unistd::Pid::from_raw(pid.try_into().expect("pid_t = i32")), nix::sys::signal::SIGTERM, ) { Err(nix::errno::Errno::ESRCH) => { warn!( - "Node {} server process (pid {}) did not exist when sending SIGTERM", + "Node {} process group (pid {}) did not exist when sending SIGTERM", self.node_name(), pid ); @@ -971,18 +1035,30 @@ impl TerminationSignal { impl Drop for StartedNode { fn drop(&mut self) { if let StartedNodeStatus::Running { pid, .. } = self.status { + if let Ok("true" | "1") = + std::env::var("LOCAL_CLUSTER_RUNNER_RETAIN_CLUSTER").as_deref() + { + info!( + "Retaining node {} (pid {}) per LOCAL_CLUSTER_RUNNER_RETAIN_CLUSTER", + self.config().node_name(), + pid, + ); + return; + } warn!( "Node {} (pid {}) dropped without explicit shutdown", self.config().node_name(), pid, ); - match nix::sys::signal::kill( + match nix::sys::signal::killpg( nix::unistd::Pid::from_raw(pid.try_into().expect("pid_t = i32")), nix::sys::signal::SIGKILL, ) { - Ok(()) | Err(nix::errno::Errno::ESRCH) => {} + Ok(()) | Err(nix::errno::Errno::ESRCH) => { + cleanup::unregister(pid); + } err => error!( - "Failed to send SIGKILL to running node {} (pid {}): {:?}", + "Failed to send SIGKILL to node {} process group (pid {}): {:?}", self.config().node_name(), pid, err,