Skip to content

Commit 4bfa613

Browse files
tomhoulecramertj
authored andcommitted
Port futures_util::future::join_all to the new API
1 parent a0f58c2 commit 4bfa613

File tree

5 files changed

+172
-171
lines changed

5 files changed

+172
-171
lines changed

futures-util/src/future/disabled/join_all.rs

Lines changed: 0 additions & 138 deletions
This file was deleted.

futures-util/src/future/join_all.rs

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
//! Definition of the `JoinAll` combinator, waiting for all of a list of futures
2+
//! to finish.
3+
4+
use std::fmt;
5+
use std::future::Future;
6+
use std::iter::FromIterator;
7+
use std::mem;
8+
use std::pin::{Pin, Unpin};
9+
use std::prelude::v1::*;
10+
use std::task::Poll;
11+
12+
#[derive(Debug)]
13+
enum ElemState<F>
14+
where
15+
F: Future,
16+
{
17+
Pending(F),
18+
Done(F::Output),
19+
}
20+
21+
/// A future which takes a list of futures and resolves with a vector of the
22+
/// completed values.
23+
///
24+
/// This future is created with the `join_all` method.
25+
#[must_use = "futures do nothing unless polled"]
26+
pub struct JoinAll<F>
27+
where
28+
F: Future,
29+
{
30+
elems: Vec<ElemState<F>>,
31+
}
32+
33+
impl<F> fmt::Debug for JoinAll<F>
34+
where
35+
F: Future + fmt::Debug,
36+
F::Output: fmt::Debug,
37+
{
38+
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
39+
fmt.debug_struct("JoinAll")
40+
.field("elems", &self.elems)
41+
.finish()
42+
}
43+
}
44+
45+
/// Creates a future which represents a collection of the outputs of the futures
46+
/// given.
47+
///
48+
/// The returned future will drive execution for all of its underlying futures,
49+
/// collecting the results into a destination `Vec<T>` in the same order as they
50+
/// were provided.
51+
///
52+
/// # Examples
53+
///
54+
/// ```
55+
/// use futures_util::future::{FutureExt, join_all, ready};
56+
///
57+
/// let f = join_all(vec![
58+
/// ready::<u32>(1),
59+
/// ready::<u32>(2),
60+
/// ready::<u32>(3),
61+
/// ]);
62+
/// let f = f.map(|x| {
63+
/// assert_eq!(x, [1, 2, 3]);
64+
/// });
65+
/// ```
66+
pub fn join_all<I>(i: I) -> JoinAll<I::Item>
67+
where
68+
I: IntoIterator,
69+
I::Item: Future,
70+
{
71+
let elems = i.into_iter().map(ElemState::Pending).collect();
72+
JoinAll { elems }
73+
}
74+
75+
impl<F> Future for JoinAll<F>
76+
where
77+
F: Future + Unpin,
78+
F::Output: Unpin,
79+
{
80+
type Output = Vec<F::Output>;
81+
82+
fn poll(
83+
mut self: Pin<&mut Self>,
84+
lw: &::std::task::LocalWaker,
85+
) -> Poll<Self::Output> {
86+
let mut all_done = true;
87+
88+
for elem in self.as_mut().elems.iter_mut() {
89+
match elem {
90+
ElemState::Pending(ref mut t) => match Pin::new(t).poll(lw) {
91+
Poll::Ready(v) => *elem = ElemState::Done(v),
92+
Poll::Pending => {
93+
all_done = false;
94+
continue;
95+
}
96+
},
97+
ElemState::Done(ref mut _v) => (),
98+
};
99+
}
100+
101+
if all_done {
102+
let elems = mem::replace(&mut self.elems, Vec::new());
103+
let result = elems
104+
.into_iter()
105+
.map(|e| match e {
106+
ElemState::Done(t) => t,
107+
_ => unreachable!(),
108+
})
109+
.collect();
110+
Poll::Ready(result)
111+
} else {
112+
Poll::Pending
113+
}
114+
}
115+
}
116+
117+
impl<F: Future> FromIterator<F> for JoinAll<F> {
118+
fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
119+
join_all(iter)
120+
}
121+
}

futures-util/src/future/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,11 @@ mod remote_handle;
7979
#[cfg(feature = "std")]
8080
pub use self::remote_handle::{Remote, RemoteHandle};
8181

82-
// ToDo
83-
// #[cfg(feature = "std")]
84-
// mod join_all;
85-
// #[cfg(feature = "std")]
86-
// pub use self::join_all::{join_all, JoinAll};
82+
#[cfg(feature = "std")]
83+
mod join_all;
84+
85+
#[cfg(feature = "std")]
86+
pub use self::join_all::{join_all, JoinAll};
8787

8888
// #[cfg(feature = "std")]
8989
// mod select_all;

futures/tests/join_all.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
#![feature(async_await, pin, arbitrary_self_types, futures_api)]
2+
3+
extern crate futures_util;
4+
extern crate futures;
5+
6+
use futures_util::future::*;
7+
use std::future::Future;
8+
use futures::executor::block_on;
9+
use std::fmt::Debug;
10+
use std::pin::Unpin;
11+
12+
fn assert_done<T: PartialEq + Debug, F: FnOnce() -> Box<Future<Output=T> + Unpin>>(actual_fut: F, expected: T) {
13+
let output = block_on(actual_fut());
14+
15+
assert_eq!(output, expected);
16+
}
17+
18+
#[test]
19+
fn collect_collects() {
20+
assert_done(|| Box::new(join_all(vec![ready(1), ready(2)])), vec![1, 2]);
21+
assert_done(|| Box::new(join_all(vec![ready(1)])), vec![1]);
22+
// REVIEW: should this be implemented?
23+
// assert_done(|| Box::new(join_all(Vec::<i32>::new())), vec![]);
24+
25+
// TODO: needs more tests
26+
}
27+
28+
#[test]
29+
fn join_all_iter_lifetime() {
30+
// In futures-rs version 0.1, this function would fail to typecheck due to an overly
31+
// conservative type parameterization of `JoinAll`.
32+
fn sizes<'a>(bufs: Vec<&'a [u8]>) -> Box<Future<Output=Vec<usize>> + Unpin> {
33+
let iter = bufs.into_iter().map(|b| ready::<usize>(b.len()));
34+
Box::new(join_all(iter))
35+
}
36+
37+
assert_done(|| sizes(vec![&[1,2,3], &[], &[0]]), vec![3 as usize, 0, 1]);
38+
}
39+
40+
#[test]
41+
fn join_all_from_iter() {
42+
assert_done(
43+
|| Box::new(vec![ready(1), ready(2)].into_iter().collect::<JoinAll<_>>()),
44+
vec![1, 2],
45+
)
46+
}

futures/tests_disabled/all.rs

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -243,34 +243,6 @@ fn join_incomplete() {
243243
})
244244
}
245245

246-
#[test]
247-
fn collect_collects() {
248-
assert_done(|| join_all(vec![f_ok(1), f_ok(2)]), Ok(vec![1, 2]));
249-
assert_done(|| join_all(vec![f_ok(1)]), Ok(vec![1]));
250-
assert_done(|| join_all(Vec::<Result<i32, u32>>::new()), Ok(vec![]));
251-
252-
// TODO: needs more tests
253-
}
254-
255-
#[test]
256-
fn join_all_iter_lifetime() {
257-
// In futures-rs version 0.1, this function would fail to typecheck due to an overly
258-
// conservative type parameterization of `JoinAll`.
259-
fn sizes<'a>(bufs: Vec<&'a [u8]>) -> Box<Future<Item=Vec<usize>, Error=()> + 'static> {
260-
let iter = bufs.into_iter().map(|b| future::ok::<usize, ()>(b.len()));
261-
Box::new(join_all(iter))
262-
}
263-
264-
assert_done(|| sizes(vec![&[1,2,3], &[], &[0]]), Ok(vec![3, 0, 1]));
265-
}
266-
267-
#[test]
268-
fn join_all_from_iter() {
269-
assert_done(
270-
|| vec![f_ok(1), f_ok(2)].into_iter().collect::<JoinAll<_>>(),
271-
Ok(vec![1, 2]),
272-
)
273-
}
274246

275247
#[test]
276248
fn select2() {

0 commit comments

Comments
 (0)