diff --git a/miner-apps/jd-client/src/lib/io_task.rs b/miner-apps/jd-client/src/lib/io_task.rs index bf5a3e3a5..168334bc3 100644 --- a/miner-apps/jd-client/src/lib/io_task.rs +++ b/miner-apps/jd-client/src/lib/io_task.rs @@ -160,7 +160,21 @@ pub fn spawn_io_tasks( match res { Ok(frame) => { trace!("Sending outbound frame"); - if let Err(e) = writer.write_frame(frame.into()).await { + let write_result = tokio::select! { + biased; + _ = cancellation_token.cancelled() => { + trace!("Received shutdown signal during write"); + inbound_tx_clone.close(); + break; + } + _ = fallback.cancelled(), if fallback.is_enabled() => { + trace!("Received fallback signal during write"); + inbound_tx_clone.close(); + break; + } + result = writer.write_frame(frame.into()) => result, + }; + if let Err(e) = write_result { error!(error=?e, "Writer error"); outbound_rx.close(); break; diff --git a/miner-apps/translator/src/lib/io_task.rs b/miner-apps/translator/src/lib/io_task.rs index e3a4e8888..1bd62f869 100644 --- a/miner-apps/translator/src/lib/io_task.rs +++ b/miner-apps/translator/src/lib/io_task.rs @@ -126,7 +126,21 @@ pub fn spawn_io_tasks( match res { Ok(frame) => { trace!("Sending outbound frame"); - if let Err(e) = writer.write_frame(frame.into()).await { + let write_result = tokio::select! { + biased; + _ = cancellation_token.cancelled() => { + trace!("Received app shutdown signal during write"); + inbound_tx_clone.close(); + break; + } + _ = fallback_token.cancelled() => { + trace!("Received fallback signal during write"); + inbound_tx_clone.close(); + break; + } + result = writer.write_frame(frame.into()) => result, + }; + if let Err(e) = write_result { error!(error=?e, "Writer error"); outbound_rx.close(); break; diff --git a/stratum-apps/src/fallback_coordinator.rs b/stratum-apps/src/fallback_coordinator.rs index 8d320345c..3074bd487 100644 --- a/stratum-apps/src/fallback_coordinator.rs +++ b/stratum-apps/src/fallback_coordinator.rs @@ -65,14 +65,21 @@ impl FallbackCoordinator { tracing::debug!("FallbackCoordinator: triggering fallback"); self.signal.cancel(); - if self.pending_tasks.load(Ordering::Acquire) == 0 { - return; // all tasks already done - } + loop { + let notified = self.notify.notified(); + + if self.pending_tasks.load(Ordering::Acquire) == 0 { + tracing::debug!( + "FallbackCoordinator: finished waiting for components to complete cleanup" + ); + return; // all tasks already done + } - // there's still some tasks running, - // wait for the last task to notify us - self.notify.notified().await; - tracing::debug!("FallbackCoordinator: finished waiting for components to complete cleanup"); + // There's still some tasks running, wait for the next completion + // signal. Creating the Notified future before checking the counter + // avoids losing the final wakeup if a task exits concurrently. + notified.await; + } } }