Skip to content

Commit 54e1f76

Browse files
authored
fix(listener): handle shutdown during realtime audio and await control sends (#1878)
- Prevent blocking when initiating realtime audio by selecting on a shutdown receiver. If shutdown is triggered while calling client.from_realtime_audio, return early instead of awaiting indefinitely. This is added in two places where outbound streams are created. - Unify result handling by matching on the selected result (res) and preserve existing error reporting when from_realtime_audio fails. - Make control message forwarding asynchronous: replace try_send with awaitable send calls so control messages are properly delivered to mic and speaker channels. - - Ensure graceful stream termination by sending shutdown signals to c and speaker shutdown channels when the inbound stream ends. - These changes improve responsiveness to shutdown and ensure control - ssages are reliably forwarded.
1 parent 609c121 commit 54e1f76

File tree

1 file changed

+22
-4
lines changed

1 file changed

+22
-4
lines changed

plugins/listener/src/actors/listener.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,15 @@ async fn run_single_stream(
259259

260260
let outbound = tokio_stream::wrappers::ReceiverStream::new(rx);
261261

262-
let (listen_stream, handle) = match client.from_realtime_audio(outbound).await {
262+
let mut shutdown_rx = shutdown_rx;
263+
let res = tokio::select! {
264+
res = client.from_realtime_audio(outbound) => res,
265+
_ = &mut shutdown_rx => {
266+
return;
267+
}
268+
};
269+
270+
let (listen_stream, handle) = match res {
263271
Ok(res) => res,
264272
Err(e) => {
265273
let _ = myself.send_message(ListenerMsg::StreamStartFailed(format!("{:?}", e)));
@@ -296,7 +304,15 @@ async fn run_dual_stream(
296304

297305
let outbound = tokio_stream::wrappers::ReceiverStream::new(rx);
298306

299-
let (listen_stream, handle) = match client.from_realtime_audio(outbound).await {
307+
let mut shutdown_rx = shutdown_rx;
308+
let res = tokio::select! {
309+
res = client.from_realtime_audio(outbound) => res,
310+
_ = &mut shutdown_rx => {
311+
return;
312+
}
313+
};
314+
315+
let (listen_stream, handle) = match res {
300316
Ok(res) => res,
301317
Err(e) => {
302318
let _ = myself.send_message(ListenerMsg::StreamStartFailed(format!("{:?}", e)));
@@ -448,10 +464,12 @@ async fn spawn_rx_task_dual_split(
448464
let _ = spk_tx.try_send(MixedMessage::Audio(spk));
449465
}
450466
Some(MixedMessage::Control(ctrl)) => {
451-
let _ = mic_tx.try_send(MixedMessage::Control(ctrl.clone()));
452-
let _ = spk_tx.try_send(MixedMessage::Control(ctrl));
467+
let _ = mic_tx.send(MixedMessage::Control(ctrl.clone())).await;
468+
let _ = spk_tx.send(MixedMessage::Control(ctrl)).await;
453469
}
454470
None => {
471+
let _ = shutdown_tx_mic.send(());
472+
let _ = shutdown_tx_spk.send(());
455473
break;
456474
}
457475
}

0 commit comments

Comments
 (0)