Skip to content

Commit 660f76f

Browse files
authored
channel: Add recv (#2947)
1 parent 27c23a0 commit 660f76f

File tree

2 files changed

+104
-2
lines changed

2 files changed

+104
-2
lines changed

futures-channel/src/mpsc/mod.rs

+60-1
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,11 @@
7878
// happens-before semantics required for the acquire / release semantics used
7979
// by the queue structure.
8080

81+
use core::future::Future;
8182
use futures_core::stream::{FusedStream, Stream};
8283
use futures_core::task::__internal::AtomicWaker;
8384
use futures_core::task::{Context, Poll, Waker};
85+
use futures_core::FusedFuture;
8486
use std::fmt;
8587
use std::pin::Pin;
8688
use std::sync::atomic::AtomicUsize;
@@ -167,7 +169,7 @@ enum SendErrorKind {
167169
Disconnected,
168170
}
169171

170-
/// The error type returned from [`try_recv`](Receiver::try_recv).
172+
/// Error returned by [`Receiver::try_recv`] or [`UnboundedReceiver::try_recv`].
171173
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
172174
pub enum TryRecvError {
173175
/// The channel is empty but not closed.
@@ -177,6 +179,18 @@ pub enum TryRecvError {
177179
Closed,
178180
}
179181

182+
/// Error returned by the future returned by [`Receiver::recv()`] or [`UnboundedReceiver::recv()`].
183+
/// Received when the channel is empty and closed.
184+
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
185+
pub struct RecvError;
186+
187+
/// Future returned by [`Receiver::recv()`] or [`UnboundedReceiver::recv()`].
188+
#[derive(Debug)]
189+
#[must_use = "futures do nothing unless you `.await` or poll them"]
190+
pub struct Recv<'a, St: ?Sized> {
191+
stream: &'a mut St,
192+
}
193+
180194
impl fmt::Display for SendError {
181195
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
182196
if self.is_full() {
@@ -189,6 +203,14 @@ impl fmt::Display for SendError {
189203

190204
impl std::error::Error for SendError {}
191205

206+
impl fmt::Display for RecvError {
207+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
208+
write!(f, "receive failed because channel is empty and closed")
209+
}
210+
}
211+
212+
impl std::error::Error for RecvError {}
213+
192214
impl SendError {
193215
/// Returns `true` if this error is a result of the channel being full.
194216
pub fn is_full(&self) -> bool {
@@ -979,6 +1001,12 @@ impl<T> fmt::Debug for UnboundedSender<T> {
9791001
*/
9801002

9811003
impl<T> Receiver<T> {
1004+
/// Waits for a message from the channel.
1005+
/// If the channel is empty and closed, returns [`RecvError`].
1006+
pub fn recv(&mut self) -> Recv<'_, Self> {
1007+
Recv::new(self)
1008+
}
1009+
9821010
/// Closes the receiving half of a channel, without dropping it.
9831011
///
9841012
/// This prevents any further messages from being sent on the channel while
@@ -1121,6 +1149,31 @@ impl<T> Stream for Receiver<T> {
11211149
}
11221150
}
11231151

1152+
impl<St: ?Sized + Unpin> Unpin for Recv<'_, St> {}
1153+
impl<'a, St: ?Sized + Stream + Unpin> Recv<'a, St> {
1154+
fn new(stream: &'a mut St) -> Self {
1155+
Self { stream }
1156+
}
1157+
}
1158+
1159+
impl<St: ?Sized + FusedStream + Unpin> FusedFuture for Recv<'_, St> {
1160+
fn is_terminated(&self) -> bool {
1161+
self.stream.is_terminated()
1162+
}
1163+
}
1164+
1165+
impl<St: ?Sized + Stream + Unpin> Future for Recv<'_, St> {
1166+
type Output = Result<St::Item, RecvError>;
1167+
1168+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1169+
match Pin::new(&mut self.stream).poll_next(cx) {
1170+
Poll::Ready(Some(msg)) => Poll::Ready(Ok(msg)),
1171+
Poll::Ready(None) => Poll::Ready(Err(RecvError)),
1172+
Poll::Pending => Poll::Pending,
1173+
}
1174+
}
1175+
}
1176+
11241177
impl<T> Drop for Receiver<T> {
11251178
fn drop(&mut self) {
11261179
// Drain the channel of all pending messages
@@ -1164,6 +1217,12 @@ impl<T> fmt::Debug for Receiver<T> {
11641217
}
11651218

11661219
impl<T> UnboundedReceiver<T> {
1220+
/// Waits for a message from the channel.
1221+
/// If the channel is empty and closed, returns [`RecvError`].
1222+
pub fn recv(&mut self) -> Recv<'_, Self> {
1223+
Recv::new(self)
1224+
}
1225+
11671226
/// Closes the receiving half of a channel, without dropping it.
11681227
///
11691228
/// This prevents any further messages from being sent on the channel while

futures-channel/tests/mpsc.rs

+44-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use futures::future::{poll_fn, FutureExt};
44
use futures::sink::{Sink, SinkExt};
55
use futures::stream::{Stream, StreamExt};
66
use futures::task::{Context, Poll};
7-
use futures_channel::mpsc::TryRecvError;
7+
use futures_channel::mpsc::{RecvError, TryRecvError};
88
use futures_test::task::{new_count_waker, noop_context};
99
use std::pin::pin;
1010
use std::sync::atomic::{AtomicUsize, Ordering};
@@ -429,6 +429,49 @@ fn stress_poll_ready() {
429429
stress(16);
430430
}
431431

432+
#[test]
433+
fn test_bounded_recv() {
434+
let (dropped_tx, dropped_rx) = oneshot::channel();
435+
let (tx, mut rx) = mpsc::channel(1);
436+
thread::spawn(move || {
437+
block_on(async move {
438+
send_one_two_three(tx).await;
439+
dropped_tx.send(()).unwrap();
440+
});
441+
});
442+
443+
let res = block_on(async move {
444+
let mut res = Vec::new();
445+
for _ in 0..3 {
446+
res.push(rx.recv().await.unwrap());
447+
}
448+
dropped_rx.await.unwrap();
449+
assert_eq!(rx.recv().await, Err(RecvError));
450+
res
451+
});
452+
assert_eq!(res, [1, 2, 3]);
453+
}
454+
455+
#[test]
456+
fn test_unbounded_recv() {
457+
let (mut tx, mut rx) = mpsc::unbounded();
458+
459+
let res = block_on(async move {
460+
let mut res = Vec::new();
461+
for i in 1..=3 {
462+
tx.send(i).await.unwrap();
463+
}
464+
drop(tx);
465+
466+
for _ in 0..3 {
467+
res.push(rx.recv().await.unwrap());
468+
}
469+
assert_eq!(rx.recv().await, Err(RecvError));
470+
res
471+
});
472+
assert_eq!(res, [1, 2, 3]);
473+
}
474+
432475
#[test]
433476
fn try_send_1() {
434477
const N: usize = if cfg!(miri) { 100 } else { 3000 };

0 commit comments

Comments
 (0)