Skip to content

Commit

Permalink
Polish.
Browse files Browse the repository at this point in the history
  • Loading branch information
milesj committed Jan 13, 2025
1 parent d52ce66 commit 0c322a7
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 6 deletions.
9 changes: 7 additions & 2 deletions crates/action-pipeline/src/action_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ impl ActionPipeline {

while let Some(mut action) = receiver.recv().await {
if self.bail && action.should_bail() || action.should_abort() {
ProcessRegistry::instance().terminate_children();
abort_token.cancel();
error = Some(action.get_error());
}
Expand All @@ -192,9 +193,13 @@ impl ActionPipeline {

drop(receiver);

// Wait for the queue to abort all running tasks
// Wait for the queue to abort/close all running tasks
let _ = queue_handle.await;

ProcessRegistry::instance()
.wait_for_children_to_shutdown()
.await;

// Force abort the signal handler
signal_handle.abort();

Expand Down Expand Up @@ -340,7 +345,7 @@ impl ActionPipeline {
// we need to continually check if they've been aborted or
// cancelled, otherwise we will end up with zombie processes
loop {
sleep(Duration::from_millis(150)).await;
sleep(Duration::from_millis(50)).await;

// No tasks running, so don't hang forever
if job_context.result_sender.is_closed() {
Expand Down
8 changes: 5 additions & 3 deletions crates/process/src/process_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,10 @@ async fn shutdown_processes_with_signal(
mut receiver: Receiver<SignalType>,
processes: Arc<RwLock<FxHashMap<u32, SharedChild>>>,
) {
// TODO
let signal: SignalType;

loop {
let _signal = match receiver.recv().await {
signal = match receiver.recv().await {
Ok(signal) => signal,
Err(RecvError::Closed) => SignalType::Terminate,
_ => continue,
Expand All @@ -123,14 +124,15 @@ async fn shutdown_processes_with_signal(

debug!(
pids = ?children.keys().collect::<Vec<_>>(),
signal = ?signal,
"Shutting down {} running child processes",
children.len()
);

for (pid, child) in children.drain() {
trace!(pid, "Killing child process");

let _ = child.kill().await;
let _ = child.kill_with_signal(signal).await;

drop(child);
}
Expand Down
35 changes: 35 additions & 0 deletions crates/process/src/shared_child.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::signal::SignalType;
use core::unreachable;
use std::io;
use std::process::{ExitStatus, Output};
Expand Down Expand Up @@ -51,6 +52,40 @@ impl SharedChild {
Ok(())
}

pub async fn kill_with_signal(&self, signal: SignalType) -> io::Result<()> {
let mut child = self.1.lock().await;

if let Some(mut child) = child.take() {
let pid = self.id() as i32;

#[cfg(unix)]
{
let result = unsafe {
libc::kill(
pid,
match signal {
SignalType::Interrupt => 2, // SIGINT
SignalType::Terminate => 15, // SIGTERM
},
)
};

if result != 0 {
return Err(io::Error::last_os_error());
}
}

#[cfg(windows)]
{
child.start_kill().await?;
}

child.wait().await?;
}

Ok(())
}

pub async fn wait(&self) -> io::Result<ExitStatus> {
let mut child = self.1.lock().await;

Expand Down
2 changes: 1 addition & 1 deletion crates/process/src/signal.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#[derive(Clone)]
#[derive(Clone, Copy, Debug)]
pub enum SignalType {
Interrupt,
Terminate,
Expand Down

0 comments on commit 0c322a7

Please sign in to comment.