Skip to content

Commit ae9979e

Browse files
authored
Merge pull request #980 from aturon/0.3-util
Convert basic futures combinators to futures-core 0.3
2 parents 4f9d185 + f9c468e commit ae9979e

32 files changed

+1081
-1051
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,5 @@ members = [
1010
# "futures-macro-await",
1111
# "futures-sink",
1212
# "futures-stable",
13-
# "futures-util",
13+
"futures-util",
1414
]

futures-util/Cargo.toml

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "futures-util"
3-
version = "0.2.0"
3+
version = "0.3.0-alpha"
44
authors = ["Alex Crichton <[email protected]>"]
55
license = "MIT/Apache-2.0"
66
repository = "https://github.com/rust-lang-nursery/futures-rs"
@@ -11,18 +11,20 @@ Common utilities and extension traits for the futures-rs library.
1111
"""
1212

1313
[features]
14-
std = ["futures-core/std", "futures-io/std", "futures-sink/std", "either/use_std"]
15-
default = ["std", "futures-core/either", "futures-sink/either"]
14+
# std = ["futures-core/std", "futures-io/std", "futures-sink/std", "either/use_std"]
15+
std = ["futures-core/std", "either/use_std"]
16+
# default = ["std", "futures-core/either", "futures-sink/either"]
17+
default = ["std", "futures-core/either"]
1618
bench = []
1719

1820
[dependencies]
19-
futures-core = { path = "../futures-core", version = "0.2.0", default-features = false }
20-
futures-channel = { path = "../futures-channel", version = "0.2.0", default-features = false }
21-
futures-io = { path = "../futures-io", version = "0.2.0", default-features = false }
22-
futures-sink = { path = "../futures-sink", version = "0.2.0", default-features = false}
21+
futures-core = { path = "../futures-core", version = "0.3.0-alpha", default-features = false }
22+
futures-channel = { path = "../futures-channel", version = "0.3.0-alpha", default-features = false }
23+
# futures-io = { path = "../futures-io", version = "0.2.0", default-features = false }
24+
# futures-sink = { path = "../futures-sink", version = "0.2.0", default-features = false}
2325
either = { version = "1.4", default-features = false }
2426

2527
[dev-dependencies]
26-
futures = { path = "../futures", version = "0.2.0" }
27-
futures-executor = { path = "../futures-executor", version = "0.2.0" }
28-
futures-channel = { path = "../futures-channel", version = "0.2.0" }
28+
# futures = { path = "../futures", version = "0.2.0" }
29+
# futures-executor = { path = "../futures-executor", version = "0.2.0" }
30+
# futures-channel = { path = "../futures-channel", version = "0.2.0" }

futures-util/src/future/and_then.rs

Lines changed: 0 additions & 40 deletions
This file was deleted.
Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1+
use std::mem::Pin;
12
use std::prelude::v1::*;
23
use std::any::Any;
34
use std::panic::{catch_unwind, UnwindSafe, AssertUnwindSafe};
45

5-
use futures_core::{Future, Poll, Async};
6+
use futures_core::{Future, Poll};
67
use futures_core::task;
78

89
/// Future for the `catch_unwind` combinator.
@@ -11,35 +12,25 @@ use futures_core::task;
1112
#[derive(Debug)]
1213
#[must_use = "futures do nothing unless polled"]
1314
pub struct CatchUnwind<F> where F: Future {
14-
future: Option<F>,
15+
future: F,
1516
}
1617

1718
pub fn new<F>(future: F) -> CatchUnwind<F>
1819
where F: Future + UnwindSafe,
1920
{
20-
CatchUnwind {
21-
future: Some(future),
22-
}
21+
CatchUnwind { future }
2322
}
2423

2524
impl<F> Future for CatchUnwind<F>
2625
where F: Future + UnwindSafe,
2726
{
28-
type Item = Result<F::Item, F::Error>;
29-
type Error = Box<Any + Send>;
27+
type Output = Result<F::Output, Box<Any + Send>>;
3028

31-
fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
32-
let mut future = self.future.take().expect("cannot poll twice");
33-
let (res, future) = catch_unwind(AssertUnwindSafe(|| {
34-
(future.poll(cx), future)
35-
}))?;
36-
match res {
37-
Ok(Async::Pending) => {
38-
self.future = Some(future);
39-
Ok(Async::Pending)
40-
}
41-
Ok(Async::Ready(t)) => Ok(Async::Ready(Ok(t))),
42-
Err(e) => Ok(Async::Ready(Err(e))),
29+
fn poll(mut self: Pin<Self>, cx: &mut task::Context) -> Poll<Self::Output> {
30+
let fut = unsafe { pinned_field!(self, future) };
31+
match catch_unwind(AssertUnwindSafe(|| fut.poll(cx))) {
32+
Ok(res) => res.map(Ok),
33+
Err(e) => Poll::Ready(Err(e))
4334
}
4435
}
4536
}

futures-util/src/future/chain.rs

Lines changed: 38 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,53 @@
1-
use core::mem;
1+
use core::mem::Pin;
22

3-
use futures_core::{Future, Poll, Async};
3+
use futures_core::{Future, Poll};
44
use futures_core::task;
55

66
#[must_use = "futures do nothing unless polled"]
77
#[derive(Debug)]
8-
pub enum Chain<A, B, C> where A: Future {
9-
First(A, C),
10-
Second(B),
11-
Done,
8+
pub enum Chain<Fut1, Fut2, Data> {
9+
First(Fut1, Option<Data>),
10+
Second(Fut2),
1211
}
1312

14-
impl<A, B, C> Chain<A, B, C>
15-
where A: Future,
16-
B: Future,
13+
impl<Fut1, Fut2, Data> Chain<Fut1, Fut2, Data>
14+
where Fut1: Future,
15+
Fut2: Future,
1716
{
18-
pub fn new(a: A, c: C) -> Chain<A, B, C> {
19-
Chain::First(a, c)
17+
pub fn new(fut1: Fut1, data: Data) -> Chain<Fut1, Fut2, Data> {
18+
Chain::First(fut1, Some(data))
2019
}
2120

22-
pub fn poll<F>(&mut self, cx: &mut task::Context, f: F) -> Poll<B::Item, B::Error>
23-
where F: FnOnce(Result<A::Item, A::Error>, C)
24-
-> Result<Result<B::Item, B>, B::Error>,
21+
pub fn poll<F>(mut self: Pin<Self>, cx: &mut task::Context, f: F) -> Poll<Fut2::Output>
22+
where F: FnOnce(Fut1::Output, Data) -> Fut2,
2523
{
26-
let a_result = match *self {
27-
Chain::First(ref mut a, _) => {
28-
match a.poll(cx) {
29-
Ok(Async::Pending) => return Ok(Async::Pending),
30-
Ok(Async::Ready(t)) => Ok(t),
31-
Err(e) => Err(e),
24+
let mut f = Some(f);
25+
26+
loop {
27+
// safe to `get_mut` here because we don't move out
28+
let fut2 = match *unsafe { Pin::get_mut(&mut self) } {
29+
Chain::First(ref mut fut1, ref mut data) => {
30+
// safe to create a new `Pin` because `fut1` will never move
31+
// before it's dropped.
32+
match unsafe { Pin::new_unchecked(fut1) }.poll(cx) {
33+
Poll::Pending => return Poll::Pending,
34+
Poll::Ready(t) => {
35+
(f.take().unwrap())(t, data.take().unwrap())
36+
}
37+
}
3238
}
33-
}
34-
Chain::Second(ref mut b) => return b.poll(cx),
35-
Chain::Done => panic!("cannot poll a chained future twice"),
36-
};
37-
let data = match mem::replace(self, Chain::Done) {
38-
Chain::First(_, c) => c,
39-
_ => panic!(),
40-
};
41-
match f(a_result, data)? {
42-
Ok(e) => Ok(Async::Ready(e)),
43-
Err(mut b) => {
44-
let ret = b.poll(cx);
45-
*self = Chain::Second(b);
46-
ret
39+
Chain::Second(ref mut fut2) => {
40+
// safe to create a new `Pin` because `fut2` will never move
41+
// before it's dropped; once we're in `Chain::Second` we stay
42+
// there forever.
43+
return unsafe { Pin::new_unchecked(fut2) }.poll(cx)
44+
}
45+
};
46+
47+
// safe because we're using the `&mut` to do an assignment, not for moving out
48+
unsafe {
49+
// note: it's safe to move the `fut2` here because we haven't yet polled it
50+
*Pin::get_mut(&mut self) = Chain::Second(fut2);
4751
}
4852
}
4953
}

futures-util/src/future/empty.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,32 @@
11
//! Definition of the Empty combinator, a future that's never ready.
22
3+
use core::mem::Pin;
34
use core::marker;
45

5-
use futures_core::{Future, Poll, Async};
6+
use futures_core::{Future, Poll};
67
use futures_core::task;
78

89
/// A future which is never resolved.
910
///
1011
/// This future can be created with the `empty` function.
1112
#[derive(Debug)]
1213
#[must_use = "futures do nothing unless polled"]
13-
pub struct Empty<T, E> {
14-
_data: marker::PhantomData<(T, E)>,
14+
pub struct Empty<T> {
15+
_data: marker::PhantomData<T>,
1516
}
1617

1718
/// Creates a future which never resolves, representing a computation that never
1819
/// finishes.
1920
///
2021
/// The returned future will forever return `Async::Pending`.
21-
pub fn empty<T, E>() -> Empty<T, E> {
22+
pub fn empty<T>() -> Empty<T> {
2223
Empty { _data: marker::PhantomData }
2324
}
2425

25-
impl<T, E> Future for Empty<T, E> {
26-
type Item = T;
27-
type Error = E;
26+
impl<T> Future for Empty<T> {
27+
type Output = T;
2828

29-
fn poll(&mut self, _: &mut task::Context) -> Poll<T, E> {
30-
Ok(Async::Pending)
29+
fn poll(self: Pin<Self>, _: &mut task::Context) -> Poll<T> {
30+
Poll::Pending
3131
}
3232
}

futures-util/src/future/err_into.rs

Lines changed: 0 additions & 38 deletions
This file was deleted.

futures-util/src/future/flatten.rs

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use core::fmt;
2+
use core::mem::Pin;
23

3-
use futures_core::{Future, IntoFuture, Poll};
4+
use futures_core::{Future, Poll};
45
use futures_core::task;
56

67
use super::chain::Chain;
@@ -11,14 +12,13 @@ use super::chain::Chain;
1112
///
1213
/// This is created by the `Future::flatten` method.
1314
#[must_use = "futures do nothing unless polled"]
14-
pub struct Flatten<A> where A: Future, A::Item: IntoFuture {
15-
state: Chain<A, <A::Item as IntoFuture>::Future, ()>,
15+
pub struct Flatten<A> where A: Future, A::Output: Future {
16+
state: Chain<A, A::Output, ()>,
1617
}
1718

1819
impl<A> fmt::Debug for Flatten<A>
1920
where A: Future + fmt::Debug,
20-
A::Item: IntoFuture,
21-
<<A as Future>::Item as IntoFuture>::Future: fmt::Debug,
21+
A::Output: Future + fmt::Debug,
2222
{
2323
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
2424
fmt.debug_struct("Flatten")
@@ -29,7 +29,7 @@ impl<A> fmt::Debug for Flatten<A>
2929

3030
pub fn new<A>(future: A) -> Flatten<A>
3131
where A: Future,
32-
A::Item: IntoFuture,
32+
A::Output: Future,
3333
{
3434
Flatten {
3535
state: Chain::new(future, ()),
@@ -38,16 +38,11 @@ pub fn new<A>(future: A) -> Flatten<A>
3838

3939
impl<A> Future for Flatten<A>
4040
where A: Future,
41-
A::Item: IntoFuture,
42-
<<A as Future>::Item as IntoFuture>::Error: From<<A as Future>::Error>
41+
A::Output: Future,
4342
{
44-
type Item = <<A as Future>::Item as IntoFuture>::Item;
45-
type Error = <<A as Future>::Item as IntoFuture>::Error;
43+
type Output = <A::Output as Future>::Output;
4644

47-
fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
48-
self.state.poll(cx, |a, ()| {
49-
let future = a?.into_future();
50-
Ok(Err(future))
51-
})
45+
fn poll(mut self: Pin<Self>, cx: &mut task::Context) -> Poll<Self::Output> {
46+
unsafe { pinned_field!(self, state) }.poll(cx, |a, ()| a)
5247
}
5348
}

0 commit comments

Comments
 (0)