diff --git a/futures-util/Cargo.toml b/futures-util/Cargo.toml index ff00e43996..d9101ab0b3 100644 --- a/futures-util/Cargo.toml +++ b/futures-util/Cargo.toml @@ -19,6 +19,7 @@ name = "futures_util" [features] std = ["futures-core-preview/std", "futures-io-preview/std", "futures-sink-preview/std", "either/use_std", "slab"] default = ["std", "futures-core-preview/either", "futures-sink-preview/either"] +compat = ["std", "futures"] bench = [] nightly = [] @@ -29,6 +30,7 @@ futures-io-preview = { path = "../futures-io", version = "0.3.0-alpha.2", defaul futures-sink-preview = { path = "../futures-sink", version = "0.3.0-alpha.2", default-features = false} either = { version = "1.4", default-features = false } slab = { version = "0.4", optional = true } +futures = { version = "0.1", optional = true } [dev-dependencies] futures-preview = { path = "../futures", version = "0.3.0-alpha.2" } diff --git a/futures-util/src/compat/compat.rs b/futures-util/src/compat/compat.rs new file mode 100644 index 0000000000..77943837fc --- /dev/null +++ b/futures-util/src/compat/compat.rs @@ -0,0 +1,20 @@ +/// Converts a futures 0.3 `TryFuture` into a futures 0.1 `Future` +/// and vice versa. +#[derive(Debug)] +#[must_use = "futures do nothing unless polled"] +pub struct Compat { + crate future: Fut, + crate executor: Option, +} + +impl Compat { + /// Returns the inner future. + pub fn into_inner(self) -> Fut { + self.future + } + + /// Creates a new `Compat`. + crate fn new(future: Fut, executor: Option) -> Compat { + Compat { future, executor } + } +} diff --git a/futures-util/src/compat/compat01to03.rs b/futures-util/src/compat/compat01to03.rs new file mode 100644 index 0000000000..626824c753 --- /dev/null +++ b/futures-util/src/compat/compat01to03.rs @@ -0,0 +1,61 @@ +use super::Compat; +use futures::Async as Async01; +use futures::Future as Future01; +use futures::executor::{self as executor01, NotifyHandle as NotifyHandle01, + Notify as Notify01, UnsafeNotify as UnsafeNotify01}; +use futures_core::Future as Future03; +use futures_core::task as task03; +use std::mem::PinMut; + +impl Future03 for Compat { + type Output = Result; + + fn poll( + self: PinMut, + cx: &mut task03::Context + ) -> task03::Poll { + let notify = &WakerToHandle(cx.waker()); + + executor01::with_notify(notify, 0, move || { + unsafe { + match PinMut::get_mut_unchecked(self).future.poll() { + Ok(Async01::Ready(t)) => task03::Poll::Ready(Ok(t)), + Ok(Async01::NotReady) => task03::Poll::Pending, + Err(e) => task03::Poll::Ready(Err(e)), + } + } + }) + } +} + +struct NotifyWaker(task03::Waker); + +#[derive(Clone)] +struct WakerToHandle<'a>(&'a task03::Waker); + +impl<'a> From> for NotifyHandle01 { + fn from(handle: WakerToHandle<'a>) -> NotifyHandle01 { + let ptr = Box::new(NotifyWaker(handle.0.clone())); + + unsafe { + NotifyHandle01::new(Box::into_raw(ptr)) + } + } +} + +impl Notify01 for NotifyWaker { + fn notify(&self, _: usize) { + self.0.wake(); + } +} + +unsafe impl UnsafeNotify01 for NotifyWaker { + unsafe fn clone_raw(&self) -> NotifyHandle01 { + WakerToHandle(&self.0).into() + } + + unsafe fn drop_raw(&self) { + let ptr: *const dyn UnsafeNotify01 = self; + drop(Box::from_raw(ptr as *mut dyn UnsafeNotify01)); + } +} diff --git a/futures-util/src/compat/compat03to01.rs b/futures-util/src/compat/compat03to01.rs new file mode 100644 index 0000000000..da8c261266 --- /dev/null +++ b/futures-util/src/compat/compat03to01.rs @@ -0,0 +1,41 @@ +use super::Compat; +use futures::Future as Future01; +use futures::Poll as Poll01; +use futures::task as task01; +use futures::Async as Async01; +use futures_core::TryFuture as TryFuture03; +use futures_core::task as task03; +use std::marker::Unpin; +use std::mem::PinMut; +use std::sync::Arc; + +impl Future01 for Compat +where Fut: TryFuture03 + Unpin, + Ex: task03::Executor +{ + type Item = Fut::Ok; + type Error = Fut::Error; + + fn poll(&mut self) -> Poll01 { + let waker = current_as_waker(); + let mut cx = task03::Context::new(&waker, self.executor.as_mut().unwrap()); + match PinMut::new(&mut self.future).try_poll(&mut cx) { + task03::Poll::Ready(Ok(t)) => Ok(Async01::Ready(t)), + task03::Poll::Pending => Ok(Async01::NotReady), + task03::Poll::Ready(Err(e)) => Err(e), + } + } +} + +fn current_as_waker() -> task03::LocalWaker { + let arc_waker = Arc::new(Current(task01::current())); + task03::local_waker_from_nonlocal(arc_waker) +} + +struct Current(task01::Task); + +impl task03::Wake for Current { + fn wake(arc_self: &Arc) { + arc_self.0.notify(); + } +} diff --git a/futures-util/src/compat/executor.rs b/futures-util/src/compat/executor.rs new file mode 100644 index 0000000000..377d7be96b --- /dev/null +++ b/futures-util/src/compat/executor.rs @@ -0,0 +1,71 @@ + +use super::Compat; +use crate::{TryFutureExt, FutureExt, future::UnitError}; +use futures::future::Executor as Executor01; +use futures_core::task::Executor as Executor03; +use futures_core::task as task03; +use futures_core::future::FutureObj; + +pub struct BoxedExecutor03(Box); + +impl Executor03 for BoxedExecutor03 { + fn spawn_obj( + &mut self, + future: FutureObj<'static, ()>, + ) -> Result<(), task03::SpawnObjError> { + (&mut *self.0).spawn_obj(future) + } +} + +/// A future that can run on a futures 0.1 executor. +pub type Executor01Future = Compat>, BoxedExecutor03>; + +/// Extension trait for futures 0.1 Executors. +pub trait Executor01CompatExt: Executor01 + + Clone + Send + 'static +{ + /// Creates an `Executor` compatable with futures 0.3. + fn compat(self) -> Executor01As03 + where Self: Sized; +} + +impl Executor01CompatExt for Ex +where Ex: Executor01 + Clone + Send + 'static +{ + fn compat(self) -> Executor01As03 { + Executor01As03 { + executor01: self, + } + } +} + +/// Converts a futures 0.1 `Executor` into a futures 0.3 `Executor`. +#[derive(Clone)] +pub struct Executor01As03 { + executor01: Ex +} + +impl Executor03 for Executor01As03 +where Ex: Executor01, + Ex: Clone + Send + 'static, +{ + fn spawn_obj( + &mut self, + future: FutureObj<'static, ()>, + ) -> Result<(), task03::SpawnObjError> { + let future = future.unit_error().compat(BoxedExecutor03(Box::new(self.clone()))); + + match self.executor01.execute(future) { + Ok(()) => Ok(()), + Err(err) => { + use futures_core::task::{SpawnObjError, SpawnErrorKind}; + + let fut = err.into_future().into_inner().unwrap_or_else(|_| ()); + Err(SpawnObjError { + kind: SpawnErrorKind::shutdown(), + future: Box::new(fut).into(), + }) + } + } + } +} diff --git a/futures-util/src/compat/future01ext.rs b/futures-util/src/compat/future01ext.rs new file mode 100644 index 0000000000..01fb4f114b --- /dev/null +++ b/futures-util/src/compat/future01ext.rs @@ -0,0 +1,18 @@ +use super::Compat; +use futures::Future as Future01; + +impl Future01CompatExt for Fut {} + +/// Extension trait for futures 0.1 Futures. +pub trait Future01CompatExt: Future01 { + /// Converts a futures 0.1 `Future` into a + /// futures 0.3 `Future>`. + fn compat(self) -> Compat where Self: Sized { + Compat { + future: self, + executor: None, + } + } +} + + diff --git a/futures-util/src/compat/mod.rs b/futures-util/src/compat/mod.rs new file mode 100644 index 0000000000..69edd02614 --- /dev/null +++ b/futures-util/src/compat/mod.rs @@ -0,0 +1,15 @@ +//! Futures 0.1 / 0.3 shims + +#![allow(missing_debug_implementations)] + +mod executor; +pub use self::executor::{Executor01CompatExt, Executor01Future, Executor01As03}; + +mod compat; +pub use self::compat::Compat; + +mod compat01to03; +mod compat03to01; + +mod future01ext; +pub use self::future01ext::Future01CompatExt; diff --git a/futures-util/src/future/mod.rs b/futures-util/src/future/mod.rs index 6ef478c9e7..1ba7279673 100644 --- a/futures-util/src/future/mod.rs +++ b/futures-util/src/future/mod.rs @@ -57,6 +57,9 @@ pub use self::then::Then; mod inspect; pub use self::inspect::Inspect; +mod unit_error; +pub use self::unit_error::UnitError; + mod with_executor; pub use self::with_executor::WithExecutor; @@ -65,6 +68,8 @@ mod chain; crate use self::chain::Chain; if_std! { + use std::boxed::PinBox; + mod abortable; pub use self::abortable::{abortable, Abortable, AbortHandle, AbortRegistration, Aborted}; @@ -632,6 +637,21 @@ pub trait FutureExt: Future { Shared::new(self) } + /// Wrap the future in a Box, pinning it. + #[cfg(feature = "std")] + fn boxed(self) -> PinBox + where Self: Sized + { + PinBox::new(self) + } + + /// Turns a `Future` into a `TryFuture` with `Error = ()`. + fn unit_error(self) -> UnitError + where Self: Sized + { + UnitError::new(self) + } + /// Assigns the provided `Executor` to be used when spawning tasks /// from within the future. /// diff --git a/futures-util/src/future/unit_error.rs b/futures-util/src/future/unit_error.rs new file mode 100644 index 0000000000..608a044ac9 --- /dev/null +++ b/futures-util/src/future/unit_error.rs @@ -0,0 +1,34 @@ +use core::marker::Unpin; +use core::mem::PinMut; +use futures_core::future::Future; +use futures_core::task::{self, Poll}; + +/// Future for the `unit_error` combinator, turning a `Future` into a `TryFuture`. +/// +/// This is created by the `FutureExt::unit_error` method. +#[derive(Debug)] +#[must_use = "futures do nothing unless polled"] +pub struct UnitError { + future: Fut, +} + +impl UnitError { + unsafe_pinned!(future: Fut); + + /// Creates a new UnitError. + pub(super) fn new(future: Fut) -> UnitError { + UnitError { future } + } +} + +impl Unpin for UnitError {} + +impl Future for UnitError + where Fut: Future, +{ + type Output = Result; + + fn poll(mut self: PinMut, cx: &mut task::Context) -> Poll> { + self.future().poll(cx).map(Ok) + } +} diff --git a/futures-util/src/lib.rs b/futures-util/src/lib.rs index 1e67a13630..7a680c0361 100644 --- a/futures-util/src/lib.rs +++ b/futures-util/src/lib.rs @@ -75,6 +75,9 @@ pub mod sink; pub mod task; +#[cfg(feature = "compat")] +pub mod compat; + if_std! { // FIXME: currently async/await is only available with std pub mod async_await; diff --git a/futures-util/src/try_future/mod.rs b/futures-util/src/try_future/mod.rs index 234f57ad0e..16d6ea7ea5 100644 --- a/futures-util/src/try_future/mod.rs +++ b/futures-util/src/try_future/mod.rs @@ -6,6 +6,15 @@ use futures_core::future::TryFuture; use futures_sink::Sink; +#[cfg(feature = "compat")] +use crate::compat::Compat; + +#[cfg(feature = "compat")] +use futures_core::task::Executor; + +#[cfg(feature = "compat")] +use core::marker::Unpin; + /* TODO mod join; mod select; @@ -477,6 +486,17 @@ pub trait TryFutureExt: TryFuture { UnwrapOrElse::new(self, f) } + /// Wraps a [`TryFuture`] into a future compatable with libraries using + /// futures 0.1 future definitons. Requires the `compat` feature to enable. + /// + #[cfg(feature = "compat")] + fn compat(self, executor: E) -> Compat + where Self: Sized + Unpin, + E: Executor, + { + Compat::new(self, Some(executor)) + } + /// Wraps a [`TryFuture`] into a type that implements /// [`Future`](std::future::Future). /// @@ -498,7 +518,7 @@ pub trait TryFutureExt: TryFuture { /// fn take_future(future: impl Future>) { /* ... */ } /// /// take_future(make_try_future().into_future()); - /// ``` + /// ``` fn into_future(self) -> IntoFuture where Self: Sized, { diff --git a/futures/Cargo.toml b/futures/Cargo.toml index 052ba64b4c..d004741863 100644 --- a/futures/Cargo.toml +++ b/futures/Cargo.toml @@ -36,3 +36,4 @@ futures-util-preview = { path = "../futures-util", version = "0.3.0-alpha.2", de nightly = ["futures-util-preview/nightly"] std = ["futures-core-preview/std", "futures-executor-preview/std", "futures-io-preview/std", "futures-sink-preview/std", "futures-util-preview/std"] default = ["std"] +compat = ["std", "futures-util-preview/compat"] diff --git a/futures/src/lib.rs b/futures/src/lib.rs index ed06a2e2b2..291426a397 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -79,6 +79,20 @@ pub mod channel { pub use futures_channel::{oneshot, mpsc}; } +#[cfg(feature = "compat")] +pub mod compat { + //! Interop between `futures` 0.1 and 0.3. + //! + + pub use futures_util::compat::{ + Compat, + Executor01Future, + Executor01As03, + Executor01CompatExt, + Future01CompatExt, + }; +} + #[cfg(feature = "std")] pub mod executor { //! Task execution.