Skip to content

Commit d526cce

Browse files
Merge branch 'master' into compat
2 parents 8bdff6b + f6f8c77 commit d526cce

File tree

15 files changed

+667
-24
lines changed

15 files changed

+667
-24
lines changed

futures-util/src/async_await/join.rs

+81-7
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,23 @@
11
//! The `join` macro.
22
33
/// Polls multiple futures simultaneously, returning a tuple
4-
/// of all results once complete.
5-
///
4+
/// of all results once complete.
5+
///
66
/// While `join!(a, b)` is similar to `(await!(a), await!(b))`,
77
/// `join!` polls both futures concurrently and therefore is more efficent.
8-
///
8+
///
99
/// This macro is only usable inside of async functions, closures, and blocks.
10-
///
10+
///
1111
/// # Examples
12-
///
12+
///
1313
/// ```
1414
/// #![feature(pin, async_await, await_macro, futures_api)]
1515
/// # futures::executor::block_on(async {
1616
/// use futures::{join, future};
1717
///
1818
/// let a = future::ready(1);
1919
/// let b = future::ready(2);
20-
///
20+
///
2121
/// assert_eq!(join!(a, b), (1, 2));
2222
/// # });
2323
/// ```
@@ -31,7 +31,7 @@ macro_rules! join {
3131
loop {
3232
let mut all_done = true;
3333
$(
34-
if let $crate::core_reexport::task::Poll::Pending = $crate::poll!($fut.reborrow()) {
34+
if $crate::poll!($fut.reborrow()).is_pending() {
3535
all_done = false;
3636
}
3737
)*
@@ -47,3 +47,77 @@ macro_rules! join {
4747
)*)
4848
} }
4949
}
50+
51+
/// Polls multiple futures simultaneously, resolving to a [`Result`] containing
52+
/// either a tuple of the successful outputs or an error.
53+
///
54+
/// `try_join!` is similar to [`join!`], but completes immediately if any of
55+
/// the futures return an error.
56+
///
57+
/// This macro is only usable inside of async functions, closures, and blocks.
58+
///
59+
/// # Examples
60+
///
61+
/// When used on multiple futures that return `Ok`, `try_join!` will return
62+
/// `Ok` of a tuple of the values:
63+
///
64+
/// ```
65+
/// #![feature(pin, async_await, await_macro, futures_api)]
66+
/// # futures::executor::block_on(async {
67+
/// use futures::{try_join, future};
68+
///
69+
/// let a = future::ready(Ok::<i32, i32>(1));
70+
/// let b = future::ready(Ok::<u64, i32>(2));
71+
///
72+
/// assert_eq!(try_join!(a, b), Ok((1, 2)));
73+
/// # });
74+
/// ```
75+
///
76+
/// If one of the futures resolves to an error, `try_join!` will return
77+
/// that error:
78+
///
79+
/// ```
80+
/// #![feature(pin, async_await, await_macro, futures_api)]
81+
/// # futures::executor::block_on(async {
82+
/// use futures::{try_join, future};
83+
///
84+
/// let a = future::ready(Ok::<i32, i32>(1));
85+
/// let b = future::ready(Err::<u64, i32>(2));
86+
///
87+
/// assert_eq!(try_join!(a, b), Err(2));
88+
/// # });
89+
/// ```
90+
#[macro_export]
91+
macro_rules! try_join {
92+
($($fut:ident),*) => { {
93+
$(
94+
let mut $fut = $crate::future::maybe_done($fut);
95+
$crate::pin_mut!($fut);
96+
)*
97+
let res: $crate::core_reexport::result::Result<(), _> = loop {
98+
let mut all_done = true;
99+
$(
100+
if $crate::poll!($fut.reborrow()).is_pending() {
101+
all_done = false;
102+
} else if $fut.reborrow().output_mut().unwrap().is_err() {
103+
// `.err().unwrap()` rather than `.unwrap_err()` so that we don't introduce
104+
// a `T: Debug` bound.
105+
break $crate::core_reexport::result::Result::Err(
106+
$fut.reborrow().take_output().unwrap().err().unwrap()
107+
);
108+
}
109+
)*
110+
if all_done {
111+
break $crate::core_reexport::result::Result::Ok(());
112+
} else {
113+
$crate::pending!();
114+
}
115+
};
116+
117+
res.map(|()| ($(
118+
// `.ok().unwrap()` rather than `.unwrap()` so that we don't introduce
119+
// an `E: Debug` bound.
120+
$fut.reborrow().take_output().unwrap().ok().unwrap(),
121+
)*))
122+
} }
123+
}

futures-util/src/future/join.rs

-3
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,6 @@ macro_rules! generate {
3737
$($Fut: maybe_done($Fut)),*
3838
}
3939
}
40-
}
41-
42-
impl<$($Fut: Future),*> $Join<$($Fut),*> {
4340
$(
4441
unsafe_pinned!($Fut: MaybeDone<$Fut>);
4542
)*

futures-util/src/future/maybe_done.rs

+17
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,25 @@ pub fn maybe_done<Fut: Future>(future: Fut) -> MaybeDone<Fut> {
4444
}
4545

4646
impl<Fut: Future> MaybeDone<Fut> {
47+
/// Returns an [`Option`] containing a mutable reference to the output of the future.
48+
/// The output of this method will be [`Some`] if and only if the inner
49+
/// future has been completed and [`take_output`](MaybeDone::take_output)
50+
/// has not yet been called.
51+
#[inline]
52+
#[allow(needless_lifetimes)] // https://github.com/rust-lang/rust/issues/52675
53+
pub fn output_mut<'a>(self: PinMut<'a, Self>) -> Option<&'a mut Fut::Output> {
54+
unsafe {
55+
let this = PinMut::get_mut_unchecked(self);
56+
match this {
57+
MaybeDone::Done(res) => Some(res),
58+
_ => None,
59+
}
60+
}
61+
}
62+
4763
/// Attempt to take the output of a `MaybeDone` without driving it
4864
/// towards completion.
65+
#[inline]
4966
pub fn take_output(self: PinMut<Self>) -> Option<Fut::Output> {
5067
unsafe {
5168
let this = PinMut::get_mut_unchecked(self);

futures-util/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ macro_rules! if_std {
2323

2424
#[doc(hidden)]
2525
pub mod core_reexport {
26-
pub use core::{mem, marker, future, task};
26+
pub use core::*;
2727
}
2828

2929
macro_rules! delegate_sink {

futures-util/src/macros/pin.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@
3131
#[macro_export]
3232
macro_rules! unsafe_pinned {
3333
($f:tt: $t:ty) => (
34-
fn $f<'a>(
35-
self: &'a mut $crate::core_reexport::mem::PinMut<Self>
36-
) -> $crate::core_reexport::mem::PinMut<'a, $t> {
34+
fn $f<'__a>(
35+
self: &'__a mut $crate::core_reexport::mem::PinMut<Self>
36+
) -> $crate::core_reexport::mem::PinMut<'__a, $t> {
3737
unsafe {
3838
$crate::core_reexport::mem::PinMut::map_unchecked(
3939
self.reborrow(), |x| &mut x.$f
@@ -70,9 +70,9 @@ macro_rules! unsafe_pinned {
7070
#[macro_export]
7171
macro_rules! unsafe_unpinned {
7272
($f:tt: $t:ty) => (
73-
fn $f<'a>(
74-
self: &'a mut $crate::core_reexport::mem::PinMut<Self>
75-
) -> &'a mut $t {
73+
fn $f<'__a>(
74+
self: &'__a mut $crate::core_reexport::mem::PinMut<Self>
75+
) -> &'__a mut $t {
7676
unsafe {
7777
&mut $crate::core_reexport::mem::PinMut::get_mut_unchecked(
7878
self.reborrow()

futures-util/src/stream/buffer_unordered.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,9 @@ where
126126

127127
// If more values are still coming from the stream, we're not done yet
128128
if self.stream.is_done() {
129-
Poll::Pending
130-
} else {
131129
Poll::Ready(None)
130+
} else {
131+
Poll::Pending
132132
}
133133
}
134134
}

futures-util/src/stream/buffered.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,9 @@ where
122122

123123
// If more values are still coming from the stream, we're not done yet
124124
if self.stream.is_done() {
125-
Poll::Pending
126-
} else {
127125
Poll::Ready(None)
126+
} else {
127+
Poll::Pending
128128
}
129129
}
130130
}

futures-util/src/stream/mod.rs

+24
Original file line numberDiff line numberDiff line change
@@ -740,6 +740,30 @@ pub trait StreamExt: Stream {
740740
///
741741
/// This method is only available when the `std` feature of this
742742
/// library is activated, and it is activated by default.
743+
///
744+
/// # Examples
745+
///
746+
/// ```
747+
/// #![feature(async_await, await_macro)]
748+
/// # futures::executor::block_on(async {
749+
/// use futures::channel::oneshot;
750+
/// use futures::stream::{self, StreamExt};
751+
///
752+
/// let (send_one, recv_one) = oneshot::channel();
753+
/// let (send_two, recv_two) = oneshot::channel();
754+
///
755+
/// let stream_of_futures = stream::iter(vec![recv_one, recv_two]);
756+
/// let mut buffered = stream_of_futures.buffer_unordered(10);
757+
///
758+
/// send_two.send(2i32);
759+
/// assert_eq!(await!(buffered.next()), Some(Ok(2i32)));
760+
///
761+
/// send_one.send(1i32);
762+
/// assert_eq!(await!(buffered.next()), Some(Ok(1i32)));
763+
///
764+
/// assert_eq!(await!(buffered.next()), None);
765+
/// # })
766+
/// ```
743767
#[cfg(feature = "std")]
744768
fn buffer_unordered(self, n: usize) -> BufferUnordered<Self>
745769
where Self::Item: Future,

0 commit comments

Comments
 (0)