From 2e7d4b1f0faea49e8a864d01d7110b4f1e85c255 Mon Sep 17 00:00:00 2001 From: Miles Johnson Date: Tue, 14 Jan 2025 15:59:25 -0800 Subject: [PATCH] Use process wrap. --- Cargo.lock | 167 ++++++++++++++++++++++++- crates/process/Cargo.toml | 8 +- crates/process/src/exec_command.rs | 95 +++++++++----- crates/process/src/process_registry.rs | 43 +++++-- crates/process/src/shared_child.rs | 31 +++-- 5 files changed, 279 insertions(+), 65 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9070dbc5eae..65fc9b1cf97 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -844,6 +844,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "chrono" version = "0.4.39" @@ -993,7 +999,7 @@ version = "5.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a68fa787550392a9d58f44c21a3022cfb3ea3e2458b7f85d3b399d0ceeccf409" dependencies = [ - "nix", + "nix 0.27.1", "winapi", ] @@ -4018,6 +4024,7 @@ dependencies = [ "moon_common", "moon_console", "once_cell", + "process-wrap", "rustc-hash", "starbase_shell", "system_env", @@ -4723,6 +4730,18 @@ dependencies = [ "libc", ] +[[package]] +name = "nix" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" +dependencies = [ + "bitflags 2.6.0", + "cfg-if", + "cfg_aliases", + "libc", +] + [[package]] name = "nodejs_package_json" version = "0.3.1" @@ -5281,6 +5300,19 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "process-wrap" +version = "8.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d35f4dc9988d1326b065b4def5e950c3ed727aa03e3151b86cc9e2aec6b03f54" +dependencies = [ + "futures", + "indexmap 2.7.0", + "nix 0.29.0", + "tokio", + "windows 0.59.0", +] + [[package]] name = "prost" version = "0.13.4" @@ -8064,6 +8096,16 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f919aee0a93304be7f62e8e5027811bbba96bcb1de84d6618be56e43f8a32a1" +dependencies = [ + "windows-core 0.59.0", + "windows-targets 0.53.0", +] + [[package]] name = "windows-core" version = "0.52.0" @@ -8094,10 +8136,23 @@ dependencies = [ "windows-implement 0.58.0", "windows-interface 0.58.0", "windows-result 0.2.0", - "windows-strings", + "windows-strings 0.1.0", "windows-targets 0.52.6", ] +[[package]] +name = "windows-core" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "810ce18ed2112484b0d4e15d022e5f598113e220c53e373fb31e67e21670c1ce" +dependencies = [ + "windows-implement 0.59.0", + "windows-interface 0.59.0", + "windows-result 0.3.0", + "windows-strings 0.3.0", + "windows-targets 0.53.0", +] + [[package]] name = "windows-implement" version = "0.57.0" @@ -8120,6 +8175,17 @@ dependencies = [ "syn 2.0.90", ] +[[package]] +name = "windows-implement" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83577b051e2f49a058c308f17f273b570a6a758386fc291b5f6a934dd84e48c1" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.90", +] + [[package]] name = "windows-interface" version = "0.57.0" @@ -8142,6 +8208,17 @@ dependencies = [ "syn 2.0.90", ] +[[package]] +name = "windows-interface" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb26fd936d991781ea39e87c3a27285081e3c0da5ca0fcbc02d368cc6f52ff01" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.90", +] + [[package]] name = "windows-registry" version = "0.2.0" @@ -8149,7 +8226,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e400001bb720a623c1c69032f8e3e4cf09984deec740f007dd2b03ec864804b0" dependencies = [ "windows-result 0.2.0", - "windows-strings", + "windows-strings 0.1.0", "windows-targets 0.52.6", ] @@ -8171,6 +8248,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-result" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d08106ce80268c4067c0571ca55a9b4e9516518eaa1a1fe9b37ca403ae1d1a34" +dependencies = [ + "windows-targets 0.53.0", +] + [[package]] name = "windows-strings" version = "0.1.0" @@ -8181,6 +8267,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-strings" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b888f919960b42ea4e11c2f408fadb55f78a9f236d5eef084103c8ce52893491" +dependencies = [ + "windows-targets 0.53.0", +] + [[package]] name = "windows-sys" version = "0.48.0" @@ -8232,13 +8327,29 @@ dependencies = [ "windows_aarch64_gnullvm 0.52.6", "windows_aarch64_msvc 0.52.6", "windows_i686_gnu 0.52.6", - "windows_i686_gnullvm", + "windows_i686_gnullvm 0.52.6", "windows_i686_msvc 0.52.6", "windows_x86_64_gnu 0.52.6", "windows_x86_64_gnullvm 0.52.6", "windows_x86_64_msvc 0.52.6", ] +[[package]] +name = "windows-targets" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1e4c7e8ceaaf9cb7d7507c974735728ab453b67ef8f18febdd7c11fe59dca8b" +dependencies = [ + "windows_aarch64_gnullvm 0.53.0", + "windows_aarch64_msvc 0.53.0", + "windows_i686_gnu 0.53.0", + "windows_i686_gnullvm 0.53.0", + "windows_i686_msvc 0.53.0", + "windows_x86_64_gnu 0.53.0", + "windows_x86_64_gnullvm 0.53.0", + "windows_x86_64_msvc 0.53.0", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.48.5" @@ -8251,6 +8362,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86b8d5f90ddd19cb4a147a5fa63ca848db3df085e25fee3cc10b39b6eebae764" + [[package]] name = "windows_aarch64_msvc" version = "0.48.5" @@ -8263,6 +8380,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" +[[package]] +name = "windows_aarch64_msvc" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7651a1f62a11b8cbd5e0d42526e55f2c99886c77e007179efff86c2b137e66c" + [[package]] name = "windows_i686_gnu" version = "0.48.5" @@ -8275,12 +8398,24 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" +[[package]] +name = "windows_i686_gnu" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1dc67659d35f387f5f6c479dc4e28f1d4bb90ddd1a5d3da2e5d97b42d6272c3" + [[package]] name = "windows_i686_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" +[[package]] +name = "windows_i686_gnullvm" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ce6ccbdedbf6d6354471319e781c0dfef054c81fbc7cf83f338a4296c0cae11" + [[package]] name = "windows_i686_msvc" version = "0.48.5" @@ -8293,6 +8428,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" +[[package]] +name = "windows_i686_msvc" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "581fee95406bb13382d2f65cd4a908ca7b1e4c2f1917f143ba16efe98a589b5d" + [[package]] name = "windows_x86_64_gnu" version = "0.48.5" @@ -8305,6 +8446,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" +[[package]] +name = "windows_x86_64_gnu" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e55b5ac9ea33f2fc1716d1742db15574fd6fc8dadc51caab1c16a3d3b4190ba" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" @@ -8317,6 +8464,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a6e035dd0599267ce1ee132e51c27dd29437f63325753051e71dd9e42406c57" + [[package]] name = "windows_x86_64_msvc" version = "0.48.5" @@ -8329,6 +8482,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "windows_x86_64_msvc" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "271414315aff87387382ec3d271b52d7ae78726f5d44ac98b4f4030c91880486" + [[package]] name = "winnow" version = "0.6.20" diff --git a/crates/process/Cargo.toml b/crates/process/Cargo.toml index 79eeb2c1b26..417537a174b 100644 --- a/crates/process/Cargo.toml +++ b/crates/process/Cargo.toml @@ -15,12 +15,18 @@ moon_console = { path = "../console" } cached = { workspace = true } miette = { workspace = true } once_cell = { workspace = true } +process-wrap = { version = "8.2.0", default-features = false, features = [ + "job-object", + "kill-on-drop", + "process-group", + "tokio1", +] } rustc-hash = { workspace = true } starbase_shell = { workspace = true } system_env = { workspace = true } thiserror = { workspace = true } tracing = { workspace = true } -tokio = { workspace = true, features = ["io-util", "sync"] } +tokio = { workspace = true, features = ["io-util", "signal", "sync"] } [target.'cfg(unix)'.dependencies] libc = "0.2.169" diff --git a/crates/process/src/exec_command.rs b/crates/process/src/exec_command.rs index b65b4f401d5..a74c46ad376 100644 --- a/crates/process/src/exec_command.rs +++ b/crates/process/src/exec_command.rs @@ -6,6 +6,7 @@ use crate::process_error::ProcessError; use crate::process_registry::ProcessRegistry; use crate::shared_child::SharedChild; use moon_common::color; +use process_wrap::tokio::*; use rustc_hash::FxHashMap; use std::env; use std::ffi::{OsStr, OsString}; @@ -13,33 +14,53 @@ use std::path::PathBuf; use std::process::{Output, Stdio}; use std::sync::{Arc, RwLock}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; -use tokio::process::{Child, Command as AsyncCommand}; +use tokio::process::Command as TokioCommand; use tokio::task; use tracing::{debug, enabled}; +fn wrap_command(command: TokioCommand) -> TokioCommandWrap { + let mut command = TokioCommandWrap::from(command); + command.wrap(KillOnDrop); + + #[cfg(unix)] + { + command.wrap(ProcessGroup::leader()); + } + + #[cfg(windows)] + { + command.wrap(JobObject); + } + + command +} + impl Command { pub async fn exec_capture_output(&mut self) -> miette::Result { let registry = ProcessRegistry::instance(); let (mut command, line) = self.create_async_command(); let child = if self.should_pass_stdin() { - let mut child = command + command .stdin(Stdio::piped()) .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn() - .map_err(|error| ProcessError::Capture { - bin: self.get_bin_name(), - error: Box::new(error), - })?; + .stderr(Stdio::piped()); + + let mut child = + wrap_command(command) + .spawn() + .map_err(|error| ProcessError::Capture { + bin: self.get_bin_name(), + error: Box::new(error), + })?; self.write_input_to_child(&mut child, &line).await?; child } else { - command - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) + command.stdout(Stdio::piped()).stderr(Stdio::piped()); + + wrap_command(command) .spawn() .map_err(|error| ProcessError::Capture { bin: self.get_bin_name(), @@ -72,24 +93,28 @@ impl Command { let registry = ProcessRegistry::instance(); let (mut command, line) = self.create_async_command(); - let child = - if self.should_pass_stdin() { - let mut child = command.stdin(Stdio::piped()).spawn().map_err(|error| { - ProcessError::Stream { + let child = if self.should_pass_stdin() { + command.stdin(Stdio::piped()); + + let mut child = + wrap_command(command) + .spawn() + .map_err(|error| ProcessError::Stream { bin: self.get_bin_name(), error: Box::new(error), - } - })?; + })?; - self.write_input_to_child(&mut child, &line).await?; + self.write_input_to_child(&mut child, &line).await?; - child - } else { - command.spawn().map_err(|error| ProcessError::Stream { + child + } else { + wrap_command(command) + .spawn() + .map_err(|error| ProcessError::Stream { bin: self.get_bin_name(), error: Box::new(error), })? - }; + }; let shared_child = registry.add_running(child).await; @@ -121,19 +146,22 @@ impl Command { let registry = ProcessRegistry::instance(); let (mut command, line) = self.create_async_command(); - let mut child = command + command .stdin(if self.should_pass_stdin() { Stdio::piped() } else { Stdio::inherit() }) .stderr(Stdio::piped()) - .stdout(Stdio::piped()) - .spawn() - .map_err(|error| ProcessError::StreamCapture { - bin: self.get_bin_name(), - error: Box::new(error), - })?; + .stdout(Stdio::piped()); + + let mut child = + wrap_command(command) + .spawn() + .map_err(|error| ProcessError::StreamCapture { + bin: self.get_bin_name(), + error: Box::new(error), + })?; if self.should_pass_stdin() { self.write_input_to_child(&mut child, &line).await?; @@ -341,12 +369,11 @@ impl Command { // Ok(output) // } - fn create_async_command(&self) -> (AsyncCommand, CommandLine) { + fn create_async_command(&self) -> (TokioCommand, CommandLine) { let command_line = self.create_command_line(); - let mut command = AsyncCommand::new(&command_line.command[0]); + let mut command = TokioCommand::new(&command_line.command[0]); command.args(&command_line.command[1..]); - command.kill_on_drop(true); for (key, value) in &self.env { if let Some(value) = value { @@ -433,12 +460,12 @@ impl Command { async fn write_input_to_child( &self, - child: &mut Child, + child: &mut Box, line: &CommandLine, ) -> miette::Result<()> { let input = line.input.join(OsStr::new(" ")); - let mut stdin = child.stdin.take().unwrap_or_else(|| { + let mut stdin = child.stdin().take().unwrap_or_else(|| { panic!("Unable to write stdin: {}", input.to_string_lossy()); }); diff --git a/crates/process/src/process_registry.rs b/crates/process/src/process_registry.rs index e1bd06f17c2..4c31a5e70ff 100644 --- a/crates/process/src/process_registry.rs +++ b/crates/process/src/process_registry.rs @@ -1,14 +1,14 @@ use crate::shared_child::*; use crate::signal::*; use core::time::Duration; +use process_wrap::tokio::TokioChildWrapper; use rustc_hash::FxHashMap; use std::sync::{Arc, OnceLock}; -use tokio::process::Child; use tokio::sync::broadcast::{self, error::RecvError, Receiver, Sender}; use tokio::sync::RwLock; use tokio::task::JoinHandle; use tokio::time::sleep; -use tracing::{debug, trace, warn}; +use tracing::debug; static INSTANCE: OnceLock> = OnceLock::new(); @@ -49,7 +49,7 @@ impl ProcessRegistry { Arc::clone(INSTANCE.get_or_init(|| Arc::new(ProcessRegistry::default()))) } - pub async fn add_running(&self, child: Child) -> SharedChild { + pub async fn add_running(&self, child: Box) -> SharedChild { let shared = SharedChild::new(child); self.running @@ -82,11 +82,23 @@ impl ProcessRegistry { pub async fn wait_for_running_to_shutdown(&self) { let mut count = 0; + let mut terminated = false; loop { - // Wait for all running processes to have stopped, - // or if we have waited 5 seconds, just quit - if self.running.read().await.is_empty() || count >= 5000 { + // After 1.5 second of waiting, terminate all running, + // as some of them may have "press ctrl+c again" logic + if !terminated && count >= 1500 { + self.terminate_running(); + terminated = true; + } + + // After 3 seconds of waiting, just exit immediately + if count >= 3000 { + break; + } + + // Wait for all running processes to have stopped + if self.running.read().await.is_empty() { break; } @@ -133,13 +145,18 @@ async fn shutdown_processes_with_signal( children.len() ); - for (pid, child) in children.drain() { - trace!(pid, "Killing child process"); + // We are using process groups, so manually killing these + // child processes should not be necessary! Instead, just + // empty the map so that our wait method doesn't hang... + children.clear(); - if let Err(error) = child.kill_with_signal(signal).await { - warn!(pid, "Failed to kill child process: {error}"); - } + // for (pid, child) in children.drain() { + // trace!(pid, "Killing child process"); - drop(child); - } + // if let Err(error) = child.kill_with_signal(signal).await { + // warn!(pid, "Failed to kill child process: {error}"); + // } + + // drop(child); + // } } diff --git a/crates/process/src/shared_child.rs b/crates/process/src/shared_child.rs index e011c108d7d..67a3fcbd87b 100644 --- a/crates/process/src/shared_child.rs +++ b/crates/process/src/shared_child.rs @@ -1,18 +1,19 @@ use crate::signal::*; +use process_wrap::tokio::TokioChildWrapper; use std::io; use std::process::{ExitStatus, Output}; use std::sync::Arc; -use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout}; +use tokio::process::{ChildStderr, ChildStdin, ChildStdout}; use tokio::sync::Mutex; #[derive(Clone)] pub struct SharedChild { - inner: Arc>>, + inner: Arc>>>, pid: u32, } impl SharedChild { - pub fn new(child: Child) -> Self { + pub fn new(child: Box) -> Self { Self { pid: child.id().unwrap(), inner: Arc::new(Mutex::new(Some(child))), @@ -28,7 +29,7 @@ impl SharedChild { .lock() .await .as_mut() - .and_then(|child| child.stdin.take()) + .and_then(|child| child.stdin().take()) } pub async fn take_stdout(&self) -> Option { @@ -36,7 +37,7 @@ impl SharedChild { .lock() .await .as_mut() - .and_then(|child| child.stdout.take()) + .and_then(|child| child.stdout().take()) } pub async fn take_stderr(&self) -> Option { @@ -44,14 +45,14 @@ impl SharedChild { .lock() .await .as_mut() - .and_then(|child| child.stderr.take()) + .and_then(|child| child.stderr().take()) } pub async fn kill(&self) -> io::Result<()> { let mut child = self.inner.lock().await; if let Some(mut child) = child.take() { - child.kill().await?; + Box::into_pin(child.kill()).await?; } Ok(()) @@ -64,7 +65,11 @@ impl SharedChild { // https://github.com/rust-lang/rust/blob/master/library/std/src/sys/pal/unix/process/process_unix.rs#L947 #[cfg(unix)] { - kill(self.id(), signal)?; + child.signal(match signal { + SignalType::Interrupt => 2, // SIGINT + SignalType::Quit => 3, // SIGQUIT + SignalType::Terminate => 15, // SIGTERM + })?; } // https://github.com/rust-lang/rust/blob/master/library/std/src/sys/pal/windows/process.rs#L658 @@ -73,27 +78,27 @@ impl SharedChild { child.start_kill()?; } - child.wait().await?; + Box::into_pin(child.wait()).await?; } Ok(()) } - pub async fn wait(&self) -> io::Result { + pub(crate) async fn wait(&self) -> io::Result { let mut child = self.inner.lock().await; if let Some(child) = child.as_mut() { - return child.wait().await; + return Box::into_pin(child.wait()).await; } unreachable!() } - pub async fn wait_with_output(&self) -> io::Result { + pub(crate) async fn wait_with_output(&self) -> io::Result { let mut child = self.inner.lock().await; if let Some(child) = child.take() { - return child.wait_with_output().await; + return Box::into_pin(child.wait_with_output()).await; } unreachable!()