Skip to content

Commit

Permalink
Try and test things.
Browse files Browse the repository at this point in the history
  • Loading branch information
milesj committed Jan 17, 2025
1 parent 2e7d4b1 commit 4db8b45
Show file tree
Hide file tree
Showing 13 changed files with 199 additions and 127 deletions.
1 change: 1 addition & 0 deletions .moon/workspace.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ projects:
- './packages/*'
- '!packages/cli'
- '!packages/core-*'
- 'scenarios/*'
- 'website'

generator:
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

59 changes: 31 additions & 28 deletions crates/action-pipeline/src/action_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ impl ActionPipeline {
let signal_handle = self.monitor_signals(cancel_token.clone());

// Dispatch jobs from the graph to run actions
let queue_handle = self.dispatch_jobs(action_graph, job_context)?;
let queue_handle = self.dispatch_jobs(action_graph, job_context.clone())?;

// Wait and receive all results coming through
debug!("Waiting for jobs to return results");
Expand Down Expand Up @@ -198,17 +198,20 @@ impl ActionPipeline {

drop(receiver);

// Wait for the queue to shutdown all running tasks
queue_handle.await.into_diagnostic()?;
process_registry.wait_for_running_to_shutdown().await;

// Force abort the signal handler
// Capture and handle any signals
if cancel_token.is_cancelled() {
self.status = signal_handle.await.into_diagnostic()?;
} else {
signal_handle.abort();
}

// Wait for the queue to shutdown all running tasks
process_registry.wait_for_running_to_shutdown().await;

let mut _job_handles = queue_handle.await.into_diagnostic()?;

// exhaust_job_handles(&mut job_handles, &job_context).await;

self.actions = actions;
self.duration = Some(start.elapsed());

Expand All @@ -224,7 +227,7 @@ impl ActionPipeline {
&self,
action_graph: ActionGraph,
job_context: JobContext,
) -> miette::Result<JoinHandle<()>> {
) -> miette::Result<JoinHandle<JoinSet<()>>> {
let node_indices = action_graph.sort_topological()?;
let app_context = Arc::clone(&self.app_context);
let action_context = Arc::clone(&self.action_context);
Expand All @@ -244,11 +247,9 @@ impl ActionPipeline {
// If the pipeline was aborted or cancelled (signal),
// loop through and abort all currently running handles
if job_context.is_aborted_or_cancelled() {
exhaust_job_handles(&mut job_handles, &job_context).await;

// Return instead of break, so that we avoid
// running persistent tasks below
return;
return job_handles;
}

// If none is returned, then we are waiting on other currently running
Expand Down Expand Up @@ -301,18 +302,18 @@ impl ActionPipeline {
if node.is_interactive()
&& exhaust_job_handles(&mut job_handles, &job_context).await
{
return;
return job_handles;
}
}

// Ensure all non-persistent actions have finished
if exhaust_job_handles(&mut job_handles, &job_context).await {
return;
return job_handles;
}

// Then run all persistent actions in parallel
if persistent_indices.is_empty() {
return;
return job_handles;
}

debug!(
Expand Down Expand Up @@ -346,24 +347,26 @@ impl ActionPipeline {
));
});

// Since these tasks are persistent and never complete,
// we need to continually check if they've been aborted or
// cancelled, otherwise we will end up with zombie processes
loop {
sleep(Duration::from_millis(50)).await;
// // Since these tasks are persistent and never complete,
// // we need to continually check if they've been aborted or
// // cancelled, otherwise we will end up with zombie processes
// loop {
// sleep(Duration::from_millis(50)).await;

// No tasks running, so don't hang forever
if job_context.result_sender.is_closed() {
break;
}
// // No tasks running, so don't hang forever
// if job_context.result_sender.is_closed() {
// break;
// }

if job_context.is_aborted_or_cancelled() {
debug!("Shutting down {} persistent jobs", job_handles.len());
// if job_context.is_aborted_or_cancelled() {
// debug!("Shutting down {} persistent jobs", job_handles.len());

exhaust_job_handles(&mut job_handles, &job_context).await;
break;
}
}
// exhaust_job_handles(&mut job_handles, &job_context).await;
// break;
// }
// }

job_handles
}))
}

Expand Down
84 changes: 50 additions & 34 deletions crates/action-pipeline/src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,40 +22,56 @@ impl Job {
let mut action = Action::new(self.node);
action.node_index = self.node_index;

tokio::select! {
// Run conditions in order!
biased;

// Abort if a sibling job has failed
_ = self.context.abort_token.cancelled() => {
trace!(
index = self.node_index,
"Job aborted",
);

action.finish(ActionStatus::Aborted);
}

// Cancel if we receive a shutdown signal
_ = self.context.cancel_token.cancelled() => {
trace!(
index = self.node_index,
"Job cancelled (via signal)",
);

action.finish(ActionStatus::Skipped);
}

// Or run the job to completion
_ = run_action(
&mut action,
self.action_context,
self.app_context,
self.context.workspace_graph.clone(),
self.context.toolchain_registry.clone(),
self.context.emitter.clone(),
) => {},
};
// Don't use `select!` here because if the abort or cancel tokens
// are triggered, then the async task running the task child process
// is cancelled, immediately terminating the process, and ignoring
// any signals we attempt to pass down!

run_action(
&mut action,
self.action_context,
self.app_context,
self.context.workspace_graph.clone(),
self.context.toolchain_registry.clone(),
self.context.emitter.clone(),
)
.await
.unwrap();

// tokio::select! {
// // Run conditions in order!
// biased;

// // Abort if a sibling job has failed
// _ = self.context.abort_token.cancelled() => {
// trace!(
// index = self.node_index,
// "Job aborted",
// );

// action.finish(ActionStatus::Aborted);
// }

// // Cancel if we receive a shutdown signal
// _ = self.context.cancel_token.cancelled() => {
// trace!(
// index = self.node_index,
// "Job cancelled (via signal)",
// );

// action.finish(ActionStatus::Skipped);
// }

// // Or run the job to completion
// _ = run_action(
// &mut action,
// self.action_context,
// self.app_context,
// self.context.workspace_graph.clone(),
// self.context.toolchain_registry.clone(),
// self.context.emitter.clone(),
// ) => {},
// };

// Send the result back to the pipeline
self.context.send_result(action).await;
Expand Down
1 change: 1 addition & 0 deletions crates/process/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ process-wrap = { version = "8.2.0", default-features = false, features = [
"kill-on-drop",
"process-group",
"tokio1",
"tracing",
] }
rustc-hash = { workspace = true }
starbase_shell = { workspace = true }
Expand Down
17 changes: 9 additions & 8 deletions crates/process/src/exec_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ fn wrap_command(command: TokioCommand) -> TokioCommandWrap {
let mut command = TokioCommandWrap::from(command);
command.wrap(KillOnDrop);

#[cfg(unix)]
{
command.wrap(ProcessGroup::leader());
}
// #[cfg(unix)]
// {
// command.wrap(ProcessGroup::leader());
// }

#[cfg(windows)]
{
command.wrap(JobObject);
}
// #[cfg(windows)]
// {
// command.wrap(JobObject);
// }

command
}
Expand Down Expand Up @@ -451,6 +451,7 @@ impl Command {

debug!(
pid = child.id(),
shell = self.shell.as_ref().map(|sh| &sh.bin_name),
env_vars = ?env_vars,
working_dir = ?working_dir,
"Running command {}",
Expand Down
27 changes: 15 additions & 12 deletions crates/process/src/process_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tokio::sync::broadcast::{self, error::RecvError, Receiver, Sender};
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use tracing::debug;
use tracing::{debug, trace, warn};

static INSTANCE: OnceLock<Arc<ProcessRegistry>> = OnceLock::new();

Expand Down Expand Up @@ -145,18 +145,21 @@ async fn shutdown_processes_with_signal(
children.len()
);

// We are using process groups, so manually killing these
// child processes should not be necessary! Instead, just
// empty the map so that our wait method doesn't hang...
children.clear();
// children.clear();

// for (pid, child) in children.drain() {
// trace!(pid, "Killing child process");
let mut futures = vec![];

// if let Err(error) = child.kill_with_signal(signal).await {
// warn!(pid, "Failed to kill child process: {error}");
// }
for (pid, child) in children.drain() {
trace!(pid, "Killing child process");

// drop(child);
// }
futures.push(tokio::spawn(async move {
if let Err(error) = child.kill_with_signal(signal).await {
warn!(pid, "Failed to kill child process: {error}");
}
}));
}

for future in futures {
let _ = future.await;
}
}
Loading

0 comments on commit 4db8b45

Please sign in to comment.