Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/local-cluster-runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ futures = { workspace = true }
enumset = { workspace = true }
http = { workspace = true }
itertools = { workspace = true }
libc = "0.2"
nix = { version = "0.30.1", features = ["signal"] }
rand = { workspace = true }
regex = { workspace = true }
Expand Down
98 changes: 87 additions & 11 deletions crates/local-cluster-runner/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,54 @@ use restate_types::{
nodes_config::{NodesConfiguration, Role},
};

/// Tracks child process group IDs and kills them via an atexit handler.
///
/// Each spawned node uses `.process_group(0)` so its PGID equals its PID. We track these
/// PGIDs and kill the entire process group on exit, which also reaps any children the
/// server may have forked.
///
/// The `#[restate_core::test]` macro installs a panic hook that calls `std::process::exit(1)`,
/// which skips Rust Drop impls. This module ensures child processes are cleaned up even in
/// that case, since `std::process::exit` does run C atexit handlers.
///
/// Set `LOCAL_CLUSTER_RUNNER_RETAIN_CLUSTER=true` to opt out (e.g. to inspect a failed cluster).
mod cleanup {
use std::sync::Mutex;

/// Process group IDs to kill on exit. Since `.process_group(0)` is used, PGID == child PID.
static CHILD_PGIDS: Mutex<Vec<u32>> = Mutex::new(Vec::new());
static REGISTERED: std::sync::Once = std::sync::Once::new();

pub(super) fn register(pgid: u32) {
REGISTERED.call_once(|| unsafe {
libc::atexit(kill_process_groups);
});
if let Ok(mut pgids) = CHILD_PGIDS.lock() {
pgids.push(pgid);
}
}

pub(super) fn unregister(pgid: u32) {
if let Ok(mut pgids) = CHILD_PGIDS.lock() {
pgids.retain(|&p| p != pgid);
}
}

extern "C" fn kill_process_groups() {
if let Ok("true" | "1") = std::env::var("LOCAL_CLUSTER_RUNNER_RETAIN_CLUSTER").as_deref() {
return;
}
if let Ok(pgids) = CHILD_PGIDS.lock() {
for &pgid in pgids.iter() {
// Negative PID = kill the entire process group
unsafe {
libc::kill(-(pgid as i32), libc::SIGKILL);
}
}
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize, TypedBuilder)]
pub struct NodeSpec {
#[builder(mutators(
Expand Down Expand Up @@ -295,6 +343,7 @@ impl NodeSpec {

let mut child = cmd.spawn().map_err(NodeStartError::SpawnError)?;
let pid = child.id().expect("child to have a pid");
cleanup::register(pid);

info!(
%fabric_advertised_address,
Expand Down Expand Up @@ -365,6 +414,13 @@ impl NodeSpec {
let child_handle = tokio::spawn(async move {
let (status, _) = tokio::join!(child.wait(), lines_fut);

// Unregister after both the process and its log pipes are done. We keep
// the PGID registered until here because the process *group* can outlive
// the leader — unregistering earlier would let descendants escape the
// atexit cleanup. Since we use killpg, sending SIGKILL to a fully-exited
// group just returns ESRCH harmlessly.
cleanup::unregister(pid);

match status {
Ok(status) => {
info!("Node {} exited with {status}", node_name);
Expand Down Expand Up @@ -524,14 +580,18 @@ impl StartedNode {
self.config().has_role(role)
}

/// Send a SIGKILL to the current process, if it is running, and await for its exit
/// Send a SIGKILL to the node's process group, if it is running, and await for its exit
pub async fn kill(&mut self) -> io::Result<ExitStatus> {
match self.status {
StartedNodeStatus::Exited(status) => Ok(status),
StartedNodeStatus::Failed(kind) => Err(kind.into()),
StartedNodeStatus::Running { pid, .. } => {
info!("Sending SIGKILL to node {} (pid {})", self.node_name(), pid);
match nix::sys::signal::kill(
info!(
"Sending SIGKILL to node {} process group (pid {})",
self.node_name(),
pid
);
match nix::sys::signal::killpg(
nix::unistd::Pid::from_raw(pid.try_into().expect("pid_t = i32")),
nix::sys::signal::SIGKILL,
) {
Expand All @@ -540,28 +600,32 @@ impl StartedNode {
nix::errno::Errno::ESRCH => {
self.status = StartedNodeStatus::Exited(ExitStatus::default());
Ok(ExitStatus::default())
} // ignore "no such process"
} // ignore "no such process group"
_ => Err(io::Error::from_raw_os_error(errno as i32)),
},
}
}
}
}

/// Send a SIGTERM to the current process, if it is running
/// Send a SIGTERM to the node's process group, if it is running
pub fn terminate(&self) -> io::Result<()> {
match self.status {
StartedNodeStatus::Exited(_) => Ok(()),
StartedNodeStatus::Failed(kind) => Err(kind.into()),
StartedNodeStatus::Running { pid, .. } => {
info!("Sending SIGTERM to node {} (pid {})", self.node_name(), pid);
match nix::sys::signal::kill(
info!(
"Sending SIGTERM to node {} process group (pid {})",
self.node_name(),
pid
);
match nix::sys::signal::killpg(
nix::unistd::Pid::from_raw(pid.try_into().expect("pid_t = i32")),
nix::sys::signal::SIGTERM,
) {
Err(nix::errno::Errno::ESRCH) => {
warn!(
"Node {} server process (pid {}) did not exist when sending SIGTERM",
"Node {} process group (pid {}) did not exist when sending SIGTERM",
self.node_name(),
pid
);
Expand Down Expand Up @@ -971,18 +1035,30 @@ impl TerminationSignal {
impl Drop for StartedNode {
fn drop(&mut self) {
if let StartedNodeStatus::Running { pid, .. } = self.status {
if let Ok("true" | "1") =
std::env::var("LOCAL_CLUSTER_RUNNER_RETAIN_CLUSTER").as_deref()
{
info!(
"Retaining node {} (pid {}) per LOCAL_CLUSTER_RUNNER_RETAIN_CLUSTER",
self.config().node_name(),
pid,
);
return;
}
warn!(
"Node {} (pid {}) dropped without explicit shutdown",
self.config().node_name(),
pid,
);
match nix::sys::signal::kill(
match nix::sys::signal::killpg(
nix::unistd::Pid::from_raw(pid.try_into().expect("pid_t = i32")),
nix::sys::signal::SIGKILL,
) {
Ok(()) | Err(nix::errno::Errno::ESRCH) => {}
Ok(()) | Err(nix::errno::Errno::ESRCH) => {
cleanup::unregister(pid);
}
err => error!(
"Failed to send SIGKILL to running node {} (pid {}): {:?}",
"Failed to send SIGKILL to node {} process group (pid {}): {:?}",
self.config().node_name(),
pid,
err,
Expand Down
Loading