Skip to content

Commit ca26c7b

Browse files
taiki-ecramertj
authored andcommitted
Add TryFutureExt::try_flatten_stream
1 parent dea5014 commit ca26c7b

File tree

4 files changed

+153
-20
lines changed

4 files changed

+153
-20
lines changed

futures-util/src/future/flatten_stream.rs

+10-19
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use core::pin::Pin;
33
use futures_core::future::Future;
44
use futures_core::stream::{FusedStream, Stream};
55
use futures_core::task::{Context, Poll};
6+
use pin_utils::unsafe_pinned;
67

78
/// Stream for the [`flatten_stream`](super::FutureExt::flatten_stream) method.
89
#[must_use = "streams do nothing unless polled"]
@@ -11,6 +12,8 @@ pub struct FlattenStream<Fut: Future> {
1112
}
1213

1314
impl<Fut: Future> FlattenStream<Fut> {
15+
unsafe_pinned!(state: State<Fut>);
16+
1417
pub(super) fn new(future: Fut) -> FlattenStream<Fut> {
1518
FlattenStream {
1619
state: State::Future(future)
@@ -58,34 +61,22 @@ impl<Fut> Stream for FlattenStream<Fut>
5861
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
5962
loop {
6063
// safety: data is never moved via the resulting &mut reference
61-
let stream = match &mut unsafe { Pin::get_unchecked_mut(self.as_mut()) }.state {
64+
match &mut unsafe { Pin::get_unchecked_mut(self.as_mut()) }.state {
6265
State::Future(f) => {
6366
// safety: the future we're re-pinning here will never be moved;
6467
// it will just be polled, then dropped in place
65-
match unsafe { Pin::new_unchecked(f) }.poll(cx) {
66-
Poll::Pending => {
67-
// State is not changed, early return.
68-
return Poll::Pending
69-
},
70-
Poll::Ready(stream) => {
71-
// Future resolved to stream.
72-
// We do not return, but poll that
73-
// stream in the next loop iteration.
74-
stream
75-
}
76-
}
68+
let stream = ready!(unsafe { Pin::new_unchecked(f) }.poll(cx));
69+
70+
// Future resolved to stream.
71+
// We do not return, but poll that
72+
// stream in the next loop iteration.
73+
self.as_mut().state().set(State::Stream(stream));
7774
}
7875
State::Stream(s) => {
7976
// safety: the stream we're repinning here will never be moved;
8077
// it will just be polled, then dropped in place
8178
return unsafe { Pin::new_unchecked(s) }.poll_next(cx);
8279
}
83-
};
84-
85-
unsafe {
86-
// safety: we use the &mut only for an assignment, which causes
87-
// only an in-place drop
88-
Pin::get_unchecked_mut(self.as_mut()).state = State::Stream(stream);
8980
}
9081
}
9182
}

futures-util/src/try_future/mod.rs

+37
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
66
use core::pin::Pin;
77
use futures_core::future::TryFuture;
8+
use futures_core::stream::TryStream;
89
use futures_core::task::{Context, Poll};
910
use futures_sink::Sink;
1011

@@ -51,6 +52,9 @@ pub use self::map_ok::MapOk;
5152
mod or_else;
5253
pub use self::or_else::OrElse;
5354

55+
mod try_flatten_stream;
56+
pub use self::try_flatten_stream::TryFlattenStream;
57+
5458
mod unwrap_or_else;
5559
pub use self::unwrap_or_else::UnwrapOrElse;
5660

@@ -318,6 +322,39 @@ pub trait TryFutureExt: TryFuture {
318322
OrElse::new(self, f)
319323
}
320324

325+
/// Flatten the execution of this future when the successful result of this
326+
/// future is a stream.
327+
///
328+
/// This can be useful when stream initialization is deferred, and it is
329+
/// convenient to work with that stream as if stream was available at the
330+
/// call site.
331+
///
332+
/// Note that this function consumes this future and returns a wrapped
333+
/// version of it.
334+
///
335+
/// # Examples
336+
///
337+
/// ```
338+
/// #![feature(async_await)]
339+
/// # futures::executor::block_on(async {
340+
/// use futures::future::{self, TryFutureExt};
341+
/// use futures::stream::{self, TryStreamExt};
342+
///
343+
/// let stream_items = vec![17, 18, 19].into_iter().map(Ok);
344+
/// let future_of_a_stream = future::ok::<_, ()>(stream::iter(stream_items));
345+
///
346+
/// let stream = future_of_a_stream.try_flatten_stream();
347+
/// let list = stream.try_collect::<Vec<_>>().await;
348+
/// assert_eq!(list, Ok(vec![17, 18, 19]));
349+
/// # });
350+
/// ```
351+
fn try_flatten_stream(self) -> TryFlattenStream<Self>
352+
where Self::Ok: TryStream<Error = Self::Error>,
353+
Self: Sized
354+
{
355+
TryFlattenStream::new(self)
356+
}
357+
321358
/// Unwraps this future's ouput, producing a future with this future's
322359
/// [`Ok`](TryFuture::Ok) type as its
323360
/// [`Output`](std::future::Future::Output) type.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
use core::fmt;
2+
use core::pin::Pin;
3+
use futures_core::future::TryFuture;
4+
use futures_core::stream::{FusedStream, Stream, TryStream};
5+
use futures_core::task::{Context, Poll};
6+
use pin_utils::unsafe_pinned;
7+
8+
/// Stream for the [`try_flatten_stream`](super::TryFutureExt::try_flatten_stream) method.
9+
#[must_use = "streams do nothing unless polled"]
10+
pub struct TryFlattenStream<Fut>
11+
where
12+
Fut: TryFuture,
13+
{
14+
state: State<Fut>
15+
}
16+
17+
impl<Fut: TryFuture> TryFlattenStream<Fut>
18+
where
19+
Fut: TryFuture,
20+
Fut::Ok: TryStream<Error = Fut::Error>,
21+
{
22+
unsafe_pinned!(state: State<Fut>);
23+
24+
pub(super) fn new(future: Fut) -> Self {
25+
Self {
26+
state: State::Future(future)
27+
}
28+
}
29+
}
30+
31+
impl<Fut> fmt::Debug for TryFlattenStream<Fut>
32+
where
33+
Fut: TryFuture + fmt::Debug,
34+
Fut::Ok: fmt::Debug,
35+
{
36+
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
37+
fmt.debug_struct("TryFlattenStream")
38+
.field("state", &self.state)
39+
.finish()
40+
}
41+
}
42+
43+
#[derive(Debug)]
44+
enum State<Fut: TryFuture> {
45+
// future is not yet called or called and not ready
46+
Future(Fut),
47+
// future resolved to Stream
48+
Stream(Fut::Ok),
49+
// future resolved to error
50+
Done,
51+
}
52+
53+
impl<Fut> FusedStream for TryFlattenStream<Fut>
54+
where
55+
Fut: TryFuture,
56+
Fut::Ok: TryStream<Error = Fut::Error> + FusedStream,
57+
{
58+
fn is_terminated(&self) -> bool {
59+
match &self.state {
60+
State::Future(_) => false,
61+
State::Stream(stream) => stream.is_terminated(),
62+
State::Done => true,
63+
}
64+
}
65+
}
66+
67+
impl<Fut> Stream for TryFlattenStream<Fut>
68+
where
69+
Fut: TryFuture,
70+
Fut::Ok: TryStream<Error = Fut::Error>,
71+
{
72+
type Item = Result<<Fut::Ok as TryStream>::Ok, Fut::Error>;
73+
74+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
75+
loop {
76+
// safety: data is never moved via the resulting &mut reference
77+
match &mut unsafe { Pin::get_unchecked_mut(self.as_mut()) }.state {
78+
State::Future(f) => {
79+
// safety: the future we're re-pinning here will never be moved;
80+
// it will just be polled, then dropped in place
81+
match ready!(unsafe { Pin::new_unchecked(f) }.try_poll(cx)) {
82+
Ok(stream) => {
83+
// Future resolved to stream.
84+
// We do not return, but poll that
85+
// stream in the next loop iteration.
86+
self.as_mut().state().set(State::Stream(stream));
87+
}
88+
Err(e) => {
89+
// Future resolved to error.
90+
// We have neither a pollable stream nor a future.
91+
self.as_mut().state().set(State::Done);
92+
return Poll::Ready(Some(Err(e)));
93+
}
94+
}
95+
}
96+
State::Stream(s) => {
97+
// safety: the stream we're repinning here will never be moved;
98+
// it will just be polled, then dropped in place
99+
return unsafe { Pin::new_unchecked(s) }.try_poll_next(cx);
100+
}
101+
State::Done => return Poll::Ready(None),
102+
}
103+
}
104+
}
105+
}

futures/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ pub mod future {
251251

252252
TryFutureExt,
253253
AndThen, ErrInto, FlattenSink, IntoFuture, MapErr, MapOk, OrElse,
254-
UnwrapOrElse,
254+
TryFlattenStream, UnwrapOrElse,
255255
};
256256

257257
#[cfg(feature = "never-type")]

0 commit comments

Comments
 (0)