Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion miner-apps/jd-client/src/lib/io_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,21 @@ pub fn spawn_io_tasks(
match res {
Ok(frame) => {
trace!("Sending outbound frame");
if let Err(e) = writer.write_frame(frame.into()).await {
let write_result = tokio::select! {
biased;
_ = cancellation_token.cancelled() => {
trace!("Received shutdown signal during write");
inbound_tx_clone.close();
break;
}
_ = fallback.cancelled(), if fallback.is_enabled() => {
trace!("Received fallback signal during write");
inbound_tx_clone.close();
break;
}
result = writer.write_frame(frame.into()) => result,
};
if let Err(e) = write_result {
error!(error=?e, "Writer error");
outbound_rx.close();
break;
Expand Down
16 changes: 15 additions & 1 deletion miner-apps/translator/src/lib/io_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,21 @@ pub fn spawn_io_tasks(
match res {
Ok(frame) => {
trace!("Sending outbound frame");
if let Err(e) = writer.write_frame(frame.into()).await {
let write_result = tokio::select! {
biased;
_ = cancellation_token.cancelled() => {
trace!("Received app shutdown signal during write");
inbound_tx_clone.close();
break;
}
_ = fallback_token.cancelled() => {
trace!("Received fallback signal during write");
inbound_tx_clone.close();
break;
}
result = writer.write_frame(frame.into()) => result,
};
if let Err(e) = write_result {
error!(error=?e, "Writer error");
outbound_rx.close();
break;
Expand Down
21 changes: 14 additions & 7 deletions stratum-apps/src/fallback_coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,21 @@ impl FallbackCoordinator {
tracing::debug!("FallbackCoordinator: triggering fallback");
self.signal.cancel();

if self.pending_tasks.load(Ordering::Acquire) == 0 {
return; // all tasks already done
}
loop {
let notified = self.notify.notified();

if self.pending_tasks.load(Ordering::Acquire) == 0 {
tracing::debug!(
"FallbackCoordinator: finished waiting for components to complete cleanup"
);
return; // all tasks already done
}

// there's still some tasks running,
// wait for the last task to notify us
self.notify.notified().await;
tracing::debug!("FallbackCoordinator: finished waiting for components to complete cleanup");
// There's still some tasks running, wait for the next completion
// signal. Creating the Notified future before checking the counter
// avoids losing the final wakeup if a task exits concurrently.
notified.await;
}
}
}

Expand Down
Loading