Skip to content

Commit e487533

Browse files
Only return Poll::Ready from Sender if the channel is empty
1 parent 5ac72c7 commit e487533

File tree

3 files changed

+68
-7
lines changed

3 files changed

+68
-7
lines changed

futures-channel/src/mpsc/mod.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -616,6 +616,25 @@ impl<T> BoundedSenderInner<T> {
616616
self.poll_unparked(Some(cx)).map(Ok)
617617
}
618618

619+
/// Polls the channel to determine if it is empty.
620+
///
621+
/// # Return value
622+
///
623+
/// This method returns:
624+
///
625+
/// - `Poll::Ready(()` if there are no messages in the channel;
626+
/// - `Poll::Pending` if there are messages in the channel.
627+
fn poll_is_empty(&mut self, cx: &mut Context<'_>) -> Poll<()> {
628+
let state = decode_state(self.inner.state.load(SeqCst));
629+
if state.num_messages == 0 {
630+
return Poll::Ready(());
631+
}
632+
633+
// If there are messages in the channel, we must park the task unconditionally.
634+
self.sender_task.lock().unwrap().task = Some(cx.waker().clone());
635+
return Poll::Pending;
636+
}
637+
619638
/// Returns whether the senders send to the same receiver.
620639
fn same_receiver(&self, other: &Self) -> bool {
621640
Arc::ptr_eq(&self.inner, &other.inner)
@@ -755,6 +774,23 @@ impl<T> Sender<T> {
755774
let ptr = self.0.as_ref().map(|inner| inner.ptr());
756775
ptr.hash(hasher);
757776
}
777+
778+
/// Polls the channel to determine if it is empty.
779+
///
780+
/// # Return value
781+
///
782+
/// This method returns:
783+
///
784+
/// - `Poll::Ready(()` if there are no messages in the channel or the [`Receiver`] is disconnected.
785+
/// - `Poll::Pending` if there are messages in the channel.
786+
pub(crate) fn poll_is_empty(&mut self, cx: &mut Context<'_>) -> Poll<()> {
787+
let inner = match self.0.as_mut() {
788+
None => return Poll::Ready(()),
789+
Some(inner) => inner,
790+
};
791+
792+
inner.poll_is_empty(cx)
793+
}
758794
}
759795

760796
impl<T> UnboundedSender<T> {

futures-channel/src/mpsc/sink_impl.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,7 @@ impl<T> Sink<T> for Sender<T> {
1515
}
1616

1717
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
18-
match (*self).poll_ready(cx) {
19-
Poll::Ready(Err(ref e)) if e.is_disconnected() => {
20-
// If the receiver disconnected, we consider the sink to be flushed.
21-
Poll::Ready(Ok(()))
22-
}
23-
x => x,
24-
}
18+
(*self).poll_is_empty(cx).map(Ok)
2519
}
2620

2721
fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {

futures-channel/tests/mpsc.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,37 @@ fn send_recv_no_buffer() {
6060
}));
6161
}
6262

63+
#[test]
64+
fn sink_poll_flush() {
65+
// Run on a task context
66+
block_on(poll_fn(move |cx| {
67+
let (tx, rx) = mpsc::channel::<i32>(2);
68+
pin_mut!(tx, rx);
69+
70+
assert!(tx.as_mut().poll_flush(cx).is_ready());
71+
assert!(tx.as_mut().poll_ready(cx).is_ready());
72+
73+
// Send two messages, `poll_flush` should be pending after each of them.
74+
assert!(tx.as_mut().start_send(1).is_ok());
75+
assert!(tx.as_mut().poll_flush(cx).is_pending());
76+
77+
assert!(tx.as_mut().start_send(2).is_ok());
78+
assert!(tx.as_mut().poll_flush(cx).is_pending());
79+
80+
// Take first message
81+
assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(1)));
82+
assert!(tx.as_mut().poll_ready(cx).is_ready());
83+
assert!(tx.as_mut().poll_flush(cx).is_pending());
84+
85+
// Take second message
86+
assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(2)));
87+
assert!(tx.as_mut().poll_ready(cx).is_ready());
88+
assert!(tx.as_mut().poll_flush(cx).is_ready());
89+
90+
Poll::Ready(())
91+
}));
92+
}
93+
6394
#[test]
6495
fn send_shared_recv() {
6596
let (mut tx1, rx) = mpsc::channel::<i32>(16);

0 commit comments

Comments
 (0)