-
Notifications
You must be signed in to change notification settings - Fork 655
Open
Labels
Description
Here describes an scenario that hangs stream.forward(sink)
in the polling function of Forward
(code). The code below is simplified version of the polling function. It goes back and forth between part A and B in the case that the stream
always returns an item when being polled, and sink
is always ready to receive an item. There is no chance to escape the loop.
loop {
if buffered_item.is_some() {
// part A
ready!(si.as_mut().poll_ready(cx))?; // return here if pending or error
si.as_mut().start_send(buffered_item.take().unwrap())?;
}
match stream.as_mut().poll_next(cx) {
Poll::Ready(Some(item)) => {
// part B
*buffered_item = Some(item);
}
/* return in other cases */
}
}
A simple experiment can be done in the example below, in which the polling of future fut1
never ends. The future fut2
has no chance to be executed.
let fut1 = stream::repeat(()).map(Ok).forward(futures::sink::drain());
let fut2 = async {
rt::sleep(Duration::from_secs(1)).await;
dbg!(); // never print this.
};
futures::join!(fut1, fut2);
A simple fix is to remove the loop and call cx.waker().wake_by_ref()
to request polling next time. It slightly adds up runtime cost but effectively eliminates the hanging issue.
if buffered_item.is_some() {
// part A
ready!(si.as_mut().poll_ready(cx))?; // return here if pending or error
si.as_mut().start_send(buffered_item.take().unwrap())?;
}
match stream.as_mut().poll_next(cx) {
Poll::Ready(Some(item)) => {
// part B
*buffered_item = Some(item);
// Fix: wake itself and return
cx.waker().wake_by_ref();
return Pending;
}
/* return in other cases */
}