Skip to content

Commit cdf4014

Browse files
Greg Bonikcramertj
Greg Bonik
authored andcommitted
Add TryStreamExt::try_buffer_unordered
1 parent 5619150 commit cdf4014

File tree

2 files changed

+165
-0
lines changed

2 files changed

+165
-0
lines changed

futures-util/src/try_stream/mod.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ pub use self::try_for_each::TryForEach;
2828
mod try_filter_map;
2929
pub use self::try_filter_map::TryFilterMap;
3030

31+
mod try_buffer_unordered;
32+
pub use self::try_buffer_unordered::TryBufferUnordered;
33+
3134
if_std! {
3235
mod try_collect;
3336
pub use self::try_collect::TryCollect;
@@ -292,4 +295,68 @@ pub trait TryStreamExt: TryStream {
292295
{
293296
TryFilterMap::new(self, f)
294297
}
298+
299+
/// Attempt to execute several futures from a stream concurrently.
300+
///
301+
/// This stream's `Ok` type must be a [`TryFuture`] with an `Error` type
302+
/// that matches the stream's `Error` type.
303+
///
304+
/// This adaptor will buffer up to `n` futures and then return their
305+
/// outputs in the order in which they complete. If the underlying stream
306+
/// returns an error, it will be immediately propagated.
307+
///
308+
/// The returned stream will be a stream of results, each containing either
309+
/// an error or a future's output. An error can be produced either by the
310+
/// underlying stream itself or by one of the futures it yielded.
311+
///
312+
/// # Examples
313+
///
314+
/// Results are returned in the order of completion:
315+
/// ```
316+
/// #![feature(async_await, await_macro)]
317+
/// # futures::executor::block_on(async {
318+
/// use futures::channel::oneshot;
319+
/// use futures::stream::{self, StreamExt, TryStreamExt};
320+
///
321+
/// let (send_one, recv_one) = oneshot::channel();
322+
/// let (send_two, recv_two) = oneshot::channel();
323+
///
324+
/// let stream_of_futures = stream::iter(vec![Ok(recv_one), Ok(recv_two)]);
325+
///
326+
/// let mut buffered = stream_of_futures.try_buffer_unordered(10);
327+
///
328+
/// send_two.send(2i32);
329+
/// assert_eq!(await!(buffered.next()), Some(Ok(2i32)));
330+
///
331+
/// send_one.send(1i32);
332+
/// assert_eq!(await!(buffered.next()), Some(Ok(1i32)));
333+
///
334+
/// assert_eq!(await!(buffered.next()), None);
335+
/// # })
336+
/// ```
337+
///
338+
/// Errors from the underlying stream itself are propagated:
339+
/// ```
340+
/// #![feature(async_await, await_macro)]
341+
/// # futures::executor::block_on(async {
342+
/// use futures::channel::mpsc;
343+
/// use futures::future;
344+
/// use futures::stream::{StreamExt, TryStreamExt};
345+
///
346+
/// let (sink, stream_of_futures) = mpsc::unbounded();
347+
/// let mut buffered = stream_of_futures.try_buffer_unordered(10);
348+
///
349+
/// sink.unbounded_send(Ok(future::ready(Ok(7i32))));
350+
/// assert_eq!(await!(buffered.next()), Some(Ok(7i32)));
351+
///
352+
/// sink.unbounded_send(Err("error in the stream"));
353+
/// assert_eq!(await!(buffered.next()), Some(Err("error in the stream")));
354+
/// # })
355+
/// ```
356+
fn try_buffer_unordered(self, n: usize) -> TryBufferUnordered<Self>
357+
where Self::Ok: TryFuture<Error = Self::Error>,
358+
Self: Sized
359+
{
360+
TryBufferUnordered::new(self, n)
361+
}
295362
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
use crate::stream::{Fuse, FuturesUnordered, StreamExt};
2+
use crate::try_future::{IntoFuture, TryFutureExt};
3+
use crate::try_stream::IntoStream;
4+
use futures_core::future::TryFuture;
5+
use futures_core::stream::{Stream, TryStream};
6+
use futures_core::task::{self, Poll};
7+
use std::marker::Unpin;
8+
use std::mem::PinMut;
9+
10+
/// A stream returned by the
11+
/// [`try_buffer_unordered`](super::TryStreamExt::try_buffer_unordered) method
12+
#[derive(Debug)]
13+
#[must_use = "streams do nothing unless polled"]
14+
pub struct TryBufferUnordered<St>
15+
where St: TryStream
16+
{
17+
stream: Fuse<IntoStream<St>>,
18+
in_progress_queue: FuturesUnordered<IntoFuture<St::Ok>>,
19+
max: usize,
20+
}
21+
22+
impl<St> Unpin for TryBufferUnordered<St>
23+
where St: TryStream + Unpin
24+
{}
25+
26+
impl<St> TryBufferUnordered<St>
27+
where St: TryStream,
28+
St::Ok: TryFuture,
29+
{
30+
unsafe_pinned!(stream: Fuse<IntoStream<St>>);
31+
unsafe_unpinned!(in_progress_queue: FuturesUnordered<IntoFuture<St::Ok>>);
32+
33+
pub(super) fn new(stream: St, n: usize) -> Self {
34+
TryBufferUnordered {
35+
stream: IntoStream::new(stream).fuse(),
36+
in_progress_queue: FuturesUnordered::new(),
37+
max: n,
38+
}
39+
}
40+
41+
/// Acquires a reference to the underlying stream that this combinator is
42+
/// pulling from.
43+
pub fn get_ref(&self) -> &St {
44+
self.stream.get_ref().get_ref()
45+
}
46+
47+
/// Acquires a mutable reference to the underlying stream that this
48+
/// combinator is pulling from.
49+
///
50+
/// Note that care must be taken to avoid tampering with the state of the
51+
/// stream which may otherwise confuse this combinator.
52+
pub fn get_mut(&mut self) -> &mut St {
53+
self.stream.get_mut().get_mut()
54+
}
55+
56+
/// Consumes this combinator, returning the underlying stream.
57+
///
58+
/// Note that this may discard intermediate state of this combinator, so
59+
/// care should be taken to avoid losing resources when this is called.
60+
pub fn into_inner(self) -> St {
61+
self.stream.into_inner().into_inner()
62+
}
63+
}
64+
65+
impl<St> Stream for TryBufferUnordered<St>
66+
where St: TryStream,
67+
St::Ok: TryFuture<Error = St::Error>,
68+
{
69+
type Item = Result<<St::Ok as TryFuture>::Ok, St::Error>;
70+
71+
fn poll_next(
72+
mut self: PinMut<Self>,
73+
cx: &mut task::Context,
74+
) -> Poll<Option<Self::Item>> {
75+
// First up, try to spawn off as many futures as possible by filling up
76+
// our slab of futures. Propagate errors from the stream immediately.
77+
while self.in_progress_queue.len() < self.max {
78+
match self.stream().poll_next(cx) {
79+
Poll::Ready(Some(Ok(fut))) => self.in_progress_queue().push(fut.into_future()),
80+
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
81+
Poll::Ready(None) | Poll::Pending => break,
82+
}
83+
}
84+
85+
// Attempt to pull the next value from the in_progress_queue
86+
match PinMut::new(self.in_progress_queue()).poll_next(cx) {
87+
x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x,
88+
Poll::Ready(None) => {}
89+
}
90+
91+
// If more values are still coming from the stream, we're not done yet
92+
if self.stream.is_done() {
93+
Poll::Ready(None)
94+
} else {
95+
Poll::Pending
96+
}
97+
}
98+
}

0 commit comments

Comments
 (0)