Skip to content

Commit 5543083

Browse files
committed
Compat implementation
1 parent 393f733 commit 5543083

File tree

7 files changed

+264
-1
lines changed

7 files changed

+264
-1
lines changed

futures-util/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ name = "futures_util"
1919
[features]
2020
std = ["futures-core-preview/std", "futures-io-preview/std", "futures-sink-preview/std", "either/use_std", "slab"]
2121
default = ["std", "futures-core-preview/either", "futures-sink-preview/either"]
22+
compat = ["std", "futures"]
2223
bench = []
2324
nightly = []
2425

@@ -29,6 +30,7 @@ futures-io-preview = { path = "../futures-io", version = "0.3.0-alpha.2", defaul
2930
futures-sink-preview = { path = "../futures-sink", version = "0.3.0-alpha.2", default-features = false}
3031
either = { version = "1.4", default-features = false }
3132
slab = { version = "0.4", optional = true }
33+
futures = { version = "0.1", optional = true }
3234

3335
[dev-dependencies]
3436
futures-preview = { path = "../futures", version = "0.3.0-alpha.2" }

futures-util/src/compat/executor.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
2+
pub trait ExecCompat: Executor01<
3+
Compat<FutureObj<'static, ()>, BoxedExecutor>
4+
> + Clone + Send + 'static
5+
{
6+
fn compat(self) -> ExecutorCompat<Self>
7+
where Self: Sized;
8+
}
9+
10+
impl<E> ExecCompat for E
11+
where E: Executor01<
12+
Compat<FutureObj<'static, ()>, BoxedExecutor>
13+
>,
14+
E: Clone + Send + 'static
15+
{
16+
fn compat(self) -> ExecutorCompat<Self> {
17+
ExecutorCompat {
18+
exec: self,
19+
}
20+
}
21+
}
22+
23+
#[derive(Clone)]
24+
pub struct ExecutorCompat<E> {
25+
exec: E
26+
}
27+
28+
impl<E> Executor03 for ExecutorCompat<E>
29+
where E: Executor01<
30+
Compat<FutureObj<'static, ()>, Box<Executor03>>
31+
>,
32+
E: Clone + Send + 'static,
33+
{
34+
fn spawn_obj(&mut self, obj: FutureObj<'static, ()>) -> Result<(), task::SpawnObjError> {
35+
36+
self.exec.execute(obj.compat(Box::new(self.clone())))
37+
.map_err(|exec_err| {
38+
use futures_core::task::{SpawnObjError, SpawnErrorKind};
39+
40+
let fut = exec_err.into_future().compat().map(|_| ());
41+
SpawnObjError {
42+
kind: SpawnErrorKind::shutdown(),
43+
task: Box::new(fut).into(),
44+
}
45+
})
46+
}
47+
}

futures-util/src/compat/mod.rs

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
//! Futures 0.1 / 0.3 shims
2+
//!
3+
4+
#![allow(missing_debug_implementations)]
5+
6+
use futures::Future as Future01;
7+
use futures::Poll as Poll01;
8+
use futures::task as task01;
9+
use futures::task::Task as Task01;
10+
use futures::executor::{with_notify, NotifyHandle, Notify, UnsafeNotify};
11+
12+
use futures_core::Future as Future03;
13+
use futures_core::TryFuture as TryFuture03;
14+
use futures_core::Poll as Poll03;
15+
use futures_core::task;
16+
use futures_core::task::Executor as Executor03;
17+
use futures_core::task::{Wake, Waker, LocalWaker, local_waker_from_nonlocal};
18+
19+
use core::mem::PinMut;
20+
use core::marker::Unpin;
21+
use std::sync::Arc;
22+
23+
/// Converts a futures 0.3 `TryFuture` into a futures 0.1 `Future`
24+
/// and vice versa.
25+
#[derive(Debug)]
26+
#[must_use = "futures do nothing unless polled"]
27+
pub struct Compat<Fut, E> {
28+
crate inner: Fut,
29+
crate executor: Option<E>,
30+
}
31+
32+
impl<Fut, E> Compat<Fut, E> {
33+
/// Returns the inner future.
34+
pub fn into_inner(self) -> Fut {
35+
self.inner
36+
}
37+
38+
/// Creates a new `Compat`.
39+
crate fn new(inner: Fut, executor: Option<E>) -> Compat<Fut, E> {
40+
Compat {
41+
inner,
42+
executor
43+
}
44+
}
45+
}
46+
47+
impl<T> Future03 for Compat<T, ()> where T: Future01 {
48+
type Output = Result<T::Item, T::Error>;
49+
50+
fn poll(self: PinMut<Self>, cx: &mut task::Context) -> Poll03<Self::Output> {
51+
use futures::Async;
52+
53+
let notify = &WakerToHandle(cx.waker());
54+
55+
with_notify(notify, 0, move || {
56+
unsafe {
57+
match PinMut::get_mut_unchecked(self).inner.poll() {
58+
Ok(Async::Ready(t)) => Poll03::Ready(Ok(t)),
59+
Ok(Async::NotReady) => Poll03::Pending,
60+
Err(e) => Poll03::Ready(Err(e)),
61+
}
62+
}
63+
})
64+
}
65+
}
66+
67+
struct NotifyWaker(Waker);
68+
69+
#[derive(Clone)]
70+
struct WakerToHandle<'a>(&'a Waker);
71+
72+
impl<'a> From<WakerToHandle<'a>> for NotifyHandle {
73+
fn from(handle: WakerToHandle<'a>) -> NotifyHandle {
74+
let ptr = Box::new(NotifyWaker(handle.0.clone()));
75+
76+
unsafe {
77+
NotifyHandle::new(Box::into_raw(ptr))
78+
}
79+
}
80+
}
81+
82+
impl Notify for NotifyWaker {
83+
fn notify(&self, _: usize) {
84+
self.0.wake();
85+
}
86+
}
87+
88+
unsafe impl UnsafeNotify for NotifyWaker {
89+
unsafe fn clone_raw(&self) -> NotifyHandle {
90+
WakerToHandle(&self.0).into()
91+
}
92+
93+
unsafe fn drop_raw(&self) {
94+
let ptr: *const dyn UnsafeNotify = self;
95+
drop(Box::from_raw(ptr as *mut dyn UnsafeNotify));
96+
}
97+
}
98+
99+
100+
101+
102+
impl<T, E> Future01 for Compat<T, E> where T: TryFuture03 + Unpin,
103+
E: Executor03
104+
{
105+
type Item = T::Ok;
106+
type Error = T::Error;
107+
108+
fn poll(&mut self) -> Poll01<Self::Item, Self::Error> {
109+
use futures::Async;
110+
111+
let waker = current_as_waker();
112+
let mut cx = task::Context::new(&waker, self.executor.as_mut().unwrap());
113+
match PinMut::new(&mut self.inner).try_poll(&mut cx) {
114+
Poll03::Ready(Ok(t)) => Ok(Async::Ready(t)),
115+
Poll03::Pending => Ok(Async::NotReady),
116+
Poll03::Ready(Err(e)) => Err(e),
117+
}
118+
}
119+
}
120+
121+
fn current_as_waker() -> LocalWaker {
122+
let arc_waker = Arc::new(Current(task01::current()));
123+
local_waker_from_nonlocal(arc_waker)
124+
}
125+
126+
struct Current(Task01);
127+
128+
impl Wake for Current {
129+
fn wake(arc_self: &Arc<Self>) {
130+
arc_self.0.notify();
131+
}
132+
}

futures-util/src/future/mod.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ pub use self::then::Then;
5757
mod inspect;
5858
pub use self::inspect::Inspect;
5959

60+
mod never_error;
61+
pub use self::never_error::NeverError;
62+
6063
mod with_executor;
6164
pub use self::with_executor::WithExecutor;
6265

@@ -83,6 +86,8 @@ if_std! {
8386

8487
mod shared;
8588
pub use self::shared::Shared;
89+
90+
use std::boxed::PinBox;
8691
}
8792

8893
impl<T: ?Sized> FutureExt for T where T: Future {}
@@ -632,6 +637,21 @@ pub trait FutureExt: Future {
632637
Shared::new(self)
633638
}
634639

640+
/// Wrap the future in a Box, pinning it.
641+
#[cfg(feature = "std")]
642+
fn boxed(self) -> PinBox<Self>
643+
where Self: Sized
644+
{
645+
PinBox::new(self)
646+
}
647+
648+
/// Turns a `Future` into a `TryFuture` that never errors
649+
fn never_error(self) -> NeverError<Self>
650+
where Self: Sized
651+
{
652+
NeverError::new(self)
653+
}
654+
635655
/// Assigns the provided `Executor` to be used when spawning tasks
636656
/// from within the future.
637657
///
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
use core::marker::Unpin;
2+
use core::mem::PinMut;
3+
use futures_core::future::Future;
4+
use futures_core::task::{self, Poll};
5+
6+
/// Future for the `never_error` combinator, turning a `Future` into a `TryFuture`.
7+
///
8+
/// This is created by the `FutureExt::never_error` method.
9+
#[derive(Debug)]
10+
#[must_use = "futures do nothing unless polled"]
11+
pub struct NeverError<Fut> {
12+
future: Fut,
13+
}
14+
15+
impl<Fut> NeverError<Fut> {
16+
unsafe_pinned!(future: Fut);
17+
18+
/// Creates a new NeverError.
19+
pub(super) fn new(future: Fut) -> NeverError<Fut> {
20+
NeverError { future }
21+
}
22+
}
23+
24+
impl<Fut: Unpin> Unpin for NeverError<Fut> {}
25+
26+
impl<Fut, T> Future for NeverError<Fut>
27+
where Fut: Future<Output = T>,
28+
{
29+
type Output = Result<T, ()>;
30+
31+
fn poll(mut self: PinMut<Self>, cx: &mut task::Context) -> Poll<Result<T, ()>> {
32+
match self.future().poll(cx) {
33+
Poll::Pending => Poll::Pending,
34+
Poll::Ready(output) => {
35+
Poll::Ready(Ok(output))
36+
}
37+
}
38+
}
39+
}

futures-util/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ pub mod sink;
7575

7676
pub mod task;
7777

78+
#[cfg(feature = "compat")]
79+
pub mod compat;
80+
7881
if_std! {
7982
// FIXME: currently async/await is only available with std
8083
pub mod async_await;

futures-util/src/try_future/mod.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,15 @@
66
use futures_core::future::TryFuture;
77
use futures_sink::Sink;
88

9+
#[cfg(feature = "compat")]
10+
use crate::compat::Compat;
11+
12+
#[cfg(feature = "compat")]
13+
use futures_core::task::Executor;
14+
15+
#[cfg(feature = "compat")]
16+
use core::marker::Unpin;
17+
918
/* TODO
1019
mod join;
1120
mod select;
@@ -477,6 +486,17 @@ pub trait TryFutureExt: TryFuture {
477486
UnwrapOrElse::new(self, f)
478487
}
479488

489+
/// Wraps a [`TryFuture`] into a future compatable with libraries using
490+
/// futures 0.1 future definitons. Requires the `compat` feature to enable.
491+
///
492+
#[cfg(feature = "compat")]
493+
fn compat<E>(self, executor: E) -> Compat<Self, E>
494+
where Self: Sized + Unpin,
495+
E: Executor,
496+
{
497+
Compat::new(self, Some(executor))
498+
}
499+
480500
/// Wraps a [`TryFuture`] into a type that implements
481501
/// [`Future`](std::future::Future).
482502
///
@@ -498,7 +518,7 @@ pub trait TryFutureExt: TryFuture {
498518
/// fn take_future(future: impl Future<Output = Result<T, E>>) { /* ... */ }
499519
///
500520
/// take_future(make_try_future().into_future());
501-
/// ```
521+
/// ```
502522
fn into_future(self) -> IntoFuture<Self>
503523
where Self: Sized,
504524
{

0 commit comments

Comments
 (0)