Skip to content

Commit 1a83bba

Browse files
Merge pull request #1119 from tinaun/compat
`Compat` implementation
2 parents 393f733 + 8bdb4ef commit 1a83bba

File tree

13 files changed

+321
-1
lines changed

13 files changed

+321
-1
lines changed

futures-util/Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ name = "futures_util"
1919
[features]
2020
std = ["futures-core-preview/std", "futures-io-preview/std", "futures-sink-preview/std", "either/use_std", "slab"]
2121
default = ["std", "futures-core-preview/either", "futures-sink-preview/either"]
22+
compat = ["std", "futures"]
2223
bench = []
2324
nightly = []
2425

@@ -29,6 +30,7 @@ futures-io-preview = { path = "../futures-io", version = "0.3.0-alpha.2", defaul
2930
futures-sink-preview = { path = "../futures-sink", version = "0.3.0-alpha.2", default-features = false}
3031
either = { version = "1.4", default-features = false }
3132
slab = { version = "0.4", optional = true }
33+
futures = { version = "0.1", optional = true }
3234

3335
[dev-dependencies]
3436
futures-preview = { path = "../futures", version = "0.3.0-alpha.2" }

futures-util/src/compat/compat.rs

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/// Converts a futures 0.3 `TryFuture` into a futures 0.1 `Future`
2+
/// and vice versa.
3+
#[derive(Debug)]
4+
#[must_use = "futures do nothing unless polled"]
5+
pub struct Compat<Fut, Ex> {
6+
crate future: Fut,
7+
crate executor: Option<Ex>,
8+
}
9+
10+
impl<Fut, Ex> Compat<Fut, Ex> {
11+
/// Returns the inner future.
12+
pub fn into_inner(self) -> Fut {
13+
self.future
14+
}
15+
16+
/// Creates a new `Compat`.
17+
crate fn new(future: Fut, executor: Option<Ex>) -> Compat<Fut, Ex> {
18+
Compat { future, executor }
19+
}
20+
}
+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
use super::Compat;
2+
use futures::Async as Async01;
3+
use futures::Future as Future01;
4+
use futures::executor::{self as executor01, NotifyHandle as NotifyHandle01,
5+
Notify as Notify01, UnsafeNotify as UnsafeNotify01};
6+
use futures_core::Future as Future03;
7+
use futures_core::task as task03;
8+
use std::mem::PinMut;
9+
10+
impl<Fut: Future01> Future03 for Compat<Fut, ()> {
11+
type Output = Result<Fut::Item, Fut::Error>;
12+
13+
fn poll(
14+
self: PinMut<Self>,
15+
cx: &mut task03::Context
16+
) -> task03::Poll<Self::Output> {
17+
let notify = &WakerToHandle(cx.waker());
18+
19+
executor01::with_notify(notify, 0, move || {
20+
unsafe {
21+
match PinMut::get_mut_unchecked(self).future.poll() {
22+
Ok(Async01::Ready(t)) => task03::Poll::Ready(Ok(t)),
23+
Ok(Async01::NotReady) => task03::Poll::Pending,
24+
Err(e) => task03::Poll::Ready(Err(e)),
25+
}
26+
}
27+
})
28+
}
29+
}
30+
31+
struct NotifyWaker(task03::Waker);
32+
33+
#[derive(Clone)]
34+
struct WakerToHandle<'a>(&'a task03::Waker);
35+
36+
impl<'a> From<WakerToHandle<'a>> for NotifyHandle01 {
37+
fn from(handle: WakerToHandle<'a>) -> NotifyHandle01 {
38+
let ptr = Box::new(NotifyWaker(handle.0.clone()));
39+
40+
unsafe {
41+
NotifyHandle01::new(Box::into_raw(ptr))
42+
}
43+
}
44+
}
45+
46+
impl Notify01 for NotifyWaker {
47+
fn notify(&self, _: usize) {
48+
self.0.wake();
49+
}
50+
}
51+
52+
unsafe impl UnsafeNotify01 for NotifyWaker {
53+
unsafe fn clone_raw(&self) -> NotifyHandle01 {
54+
WakerToHandle(&self.0).into()
55+
}
56+
57+
unsafe fn drop_raw(&self) {
58+
let ptr: *const dyn UnsafeNotify01 = self;
59+
drop(Box::from_raw(ptr as *mut dyn UnsafeNotify01));
60+
}
61+
}
+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
use super::Compat;
2+
use futures::Future as Future01;
3+
use futures::Poll as Poll01;
4+
use futures::task as task01;
5+
use futures::Async as Async01;
6+
use futures_core::TryFuture as TryFuture03;
7+
use futures_core::task as task03;
8+
use std::marker::Unpin;
9+
use std::mem::PinMut;
10+
use std::sync::Arc;
11+
12+
impl<Fut, Ex> Future01 for Compat<Fut, Ex>
13+
where Fut: TryFuture03 + Unpin,
14+
Ex: task03::Executor
15+
{
16+
type Item = Fut::Ok;
17+
type Error = Fut::Error;
18+
19+
fn poll(&mut self) -> Poll01<Self::Item, Self::Error> {
20+
let waker = current_as_waker();
21+
let mut cx = task03::Context::new(&waker, self.executor.as_mut().unwrap());
22+
match PinMut::new(&mut self.future).try_poll(&mut cx) {
23+
task03::Poll::Ready(Ok(t)) => Ok(Async01::Ready(t)),
24+
task03::Poll::Pending => Ok(Async01::NotReady),
25+
task03::Poll::Ready(Err(e)) => Err(e),
26+
}
27+
}
28+
}
29+
30+
fn current_as_waker() -> task03::LocalWaker {
31+
let arc_waker = Arc::new(Current(task01::current()));
32+
task03::local_waker_from_nonlocal(arc_waker)
33+
}
34+
35+
struct Current(task01::Task);
36+
37+
impl task03::Wake for Current {
38+
fn wake(arc_self: &Arc<Self>) {
39+
arc_self.0.notify();
40+
}
41+
}

futures-util/src/compat/executor.rs

+71
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
2+
use super::Compat;
3+
use crate::{TryFutureExt, FutureExt, future::UnitError};
4+
use futures::future::Executor as Executor01;
5+
use futures_core::task::Executor as Executor03;
6+
use futures_core::task as task03;
7+
use futures_core::future::FutureObj;
8+
9+
pub struct BoxedExecutor03(Box<dyn Executor03 + Send>);
10+
11+
impl Executor03 for BoxedExecutor03 {
12+
fn spawn_obj(
13+
&mut self,
14+
future: FutureObj<'static, ()>,
15+
) -> Result<(), task03::SpawnObjError> {
16+
(&mut *self.0).spawn_obj(future)
17+
}
18+
}
19+
20+
/// A future that can run on a futures 0.1 executor.
21+
pub type Executor01Future = Compat<UnitError<FutureObj<'static, ()>>, BoxedExecutor03>;
22+
23+
/// Extension trait for futures 0.1 Executors.
24+
pub trait Executor01CompatExt: Executor01<Executor01Future> +
25+
Clone + Send + 'static
26+
{
27+
/// Creates an `Executor` compatable with futures 0.3.
28+
fn compat(self) -> Executor01As03<Self>
29+
where Self: Sized;
30+
}
31+
32+
impl<Ex> Executor01CompatExt for Ex
33+
where Ex: Executor01<Executor01Future> + Clone + Send + 'static
34+
{
35+
fn compat(self) -> Executor01As03<Self> {
36+
Executor01As03 {
37+
executor01: self,
38+
}
39+
}
40+
}
41+
42+
/// Converts a futures 0.1 `Executor` into a futures 0.3 `Executor`.
43+
#[derive(Clone)]
44+
pub struct Executor01As03<Ex> {
45+
executor01: Ex
46+
}
47+
48+
impl<Ex> Executor03 for Executor01As03<Ex>
49+
where Ex: Executor01<Executor01Future>,
50+
Ex: Clone + Send + 'static,
51+
{
52+
fn spawn_obj(
53+
&mut self,
54+
future: FutureObj<'static, ()>,
55+
) -> Result<(), task03::SpawnObjError> {
56+
let future = future.unit_error().compat(BoxedExecutor03(Box::new(self.clone())));
57+
58+
match self.executor01.execute(future) {
59+
Ok(()) => Ok(()),
60+
Err(err) => {
61+
use futures_core::task::{SpawnObjError, SpawnErrorKind};
62+
63+
let fut = err.into_future().into_inner().unwrap_or_else(|_| ());
64+
Err(SpawnObjError {
65+
kind: SpawnErrorKind::shutdown(),
66+
future: Box::new(fut).into(),
67+
})
68+
}
69+
}
70+
}
71+
}
+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
use super::Compat;
2+
use futures::Future as Future01;
3+
4+
impl<Fut: Future01> Future01CompatExt for Fut {}
5+
6+
/// Extension trait for futures 0.1 Futures.
7+
pub trait Future01CompatExt: Future01 {
8+
/// Converts a futures 0.1 `Future<Item = T, Error = E>` into a
9+
/// futures 0.3 `Future<Output = Result<T, E>>`.
10+
fn compat(self) -> Compat<Self, ()> where Self: Sized {
11+
Compat {
12+
future: self,
13+
executor: None,
14+
}
15+
}
16+
}
17+
18+

futures-util/src/compat/mod.rs

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
//! Futures 0.1 / 0.3 shims
2+
3+
#![allow(missing_debug_implementations)]
4+
5+
mod executor;
6+
pub use self::executor::{Executor01CompatExt, Executor01Future, Executor01As03};
7+
8+
mod compat;
9+
pub use self::compat::Compat;
10+
11+
mod compat01to03;
12+
mod compat03to01;
13+
14+
mod future01ext;
15+
pub use self::future01ext::Future01CompatExt;

futures-util/src/future/mod.rs

+20
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ pub use self::then::Then;
5757
mod inspect;
5858
pub use self::inspect::Inspect;
5959

60+
mod unit_error;
61+
pub use self::unit_error::UnitError;
62+
6063
mod with_executor;
6164
pub use self::with_executor::WithExecutor;
6265

@@ -65,6 +68,8 @@ mod chain;
6568
crate use self::chain::Chain;
6669

6770
if_std! {
71+
use std::boxed::PinBox;
72+
6873
mod abortable;
6974
pub use self::abortable::{abortable, Abortable, AbortHandle, AbortRegistration, Aborted};
7075

@@ -632,6 +637,21 @@ pub trait FutureExt: Future {
632637
Shared::new(self)
633638
}
634639

640+
/// Wrap the future in a Box, pinning it.
641+
#[cfg(feature = "std")]
642+
fn boxed(self) -> PinBox<Self>
643+
where Self: Sized
644+
{
645+
PinBox::new(self)
646+
}
647+
648+
/// Turns a `Future` into a `TryFuture` with `Error = ()`.
649+
fn unit_error(self) -> UnitError<Self>
650+
where Self: Sized
651+
{
652+
UnitError::new(self)
653+
}
654+
635655
/// Assigns the provided `Executor` to be used when spawning tasks
636656
/// from within the future.
637657
///

futures-util/src/future/unit_error.rs

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
use core::marker::Unpin;
2+
use core::mem::PinMut;
3+
use futures_core::future::Future;
4+
use futures_core::task::{self, Poll};
5+
6+
/// Future for the `unit_error` combinator, turning a `Future` into a `TryFuture`.
7+
///
8+
/// This is created by the `FutureExt::unit_error` method.
9+
#[derive(Debug)]
10+
#[must_use = "futures do nothing unless polled"]
11+
pub struct UnitError<Fut> {
12+
future: Fut,
13+
}
14+
15+
impl<Fut> UnitError<Fut> {
16+
unsafe_pinned!(future: Fut);
17+
18+
/// Creates a new UnitError.
19+
pub(super) fn new(future: Fut) -> UnitError<Fut> {
20+
UnitError { future }
21+
}
22+
}
23+
24+
impl<Fut: Unpin> Unpin for UnitError<Fut> {}
25+
26+
impl<Fut, T> Future for UnitError<Fut>
27+
where Fut: Future<Output = T>,
28+
{
29+
type Output = Result<T, ()>;
30+
31+
fn poll(mut self: PinMut<Self>, cx: &mut task::Context) -> Poll<Result<T, ()>> {
32+
self.future().poll(cx).map(Ok)
33+
}
34+
}

futures-util/src/lib.rs

+3
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ pub mod sink;
7575

7676
pub mod task;
7777

78+
#[cfg(feature = "compat")]
79+
pub mod compat;
80+
7881
if_std! {
7982
// FIXME: currently async/await is only available with std
8083
pub mod async_await;

futures-util/src/try_future/mod.rs

+21-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,15 @@
66
use futures_core::future::TryFuture;
77
use futures_sink::Sink;
88

9+
#[cfg(feature = "compat")]
10+
use crate::compat::Compat;
11+
12+
#[cfg(feature = "compat")]
13+
use futures_core::task::Executor;
14+
15+
#[cfg(feature = "compat")]
16+
use core::marker::Unpin;
17+
918
/* TODO
1019
mod join;
1120
mod select;
@@ -477,6 +486,17 @@ pub trait TryFutureExt: TryFuture {
477486
UnwrapOrElse::new(self, f)
478487
}
479488

489+
/// Wraps a [`TryFuture`] into a future compatable with libraries using
490+
/// futures 0.1 future definitons. Requires the `compat` feature to enable.
491+
///
492+
#[cfg(feature = "compat")]
493+
fn compat<E>(self, executor: E) -> Compat<Self, E>
494+
where Self: Sized + Unpin,
495+
E: Executor,
496+
{
497+
Compat::new(self, Some(executor))
498+
}
499+
480500
/// Wraps a [`TryFuture`] into a type that implements
481501
/// [`Future`](std::future::Future).
482502
///
@@ -498,7 +518,7 @@ pub trait TryFutureExt: TryFuture {
498518
/// fn take_future(future: impl Future<Output = Result<T, E>>) { /* ... */ }
499519
///
500520
/// take_future(make_try_future().into_future());
501-
/// ```
521+
/// ```
502522
fn into_future(self) -> IntoFuture<Self>
503523
where Self: Sized,
504524
{

futures/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,4 @@ futures-util-preview = { path = "../futures-util", version = "0.3.0-alpha.2", de
3636
nightly = ["futures-util-preview/nightly"]
3737
std = ["futures-core-preview/std", "futures-executor-preview/std", "futures-io-preview/std", "futures-sink-preview/std", "futures-util-preview/std"]
3838
default = ["std"]
39+
compat = ["std", "futures-util-preview/compat"]

0 commit comments

Comments
 (0)