diff --git a/futures-core/Cargo.toml b/futures-core/Cargo.toml index b85c1fce23..5e125fa961 100644 --- a/futures-core/Cargo.toml +++ b/futures-core/Cargo.toml @@ -16,13 +16,12 @@ name = "futures_core" [features] default = ["std"] -std = ["alloc", "either/use_std"] +std = ["alloc"] nightly = [] cfg-target-has-atomic = [] alloc = [] [dependencies] -either = { version = "1.4", default-features = false, optional = true } [dev-dependencies] futures-preview = { path = "../futures", version = "=0.3.0-alpha.14" } diff --git a/futures-core/src/stream/mod.rs b/futures-core/src/stream/mod.rs index b47ba8cc43..69faa24b8e 100644 --- a/futures-core/src/stream/mod.rs +++ b/futures-core/src/stream/mod.rs @@ -4,9 +4,6 @@ use core::ops; use core::pin::Pin; use core::task::{Context, Poll}; -#[cfg(feature = "either")] -use either::Either; - mod stream_obj; pub use self::stream_obj::{StreamObj,LocalStreamObj,UnsafeStreamObj}; @@ -84,23 +81,6 @@ where } } -#[cfg(feature = "either")] -impl Stream for Either - where A: Stream, - B: Stream -{ - type Item = A::Item; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { - match Pin::get_unchecked_mut(self) { - Either::Left(a) => Pin::new_unchecked(a).poll_next(cx), - Either::Right(b) => Pin::new_unchecked(b).poll_next(cx), - } - } - } -} - /// A `Stream` or `TryStream` which tracks whether or not the underlying stream /// should no longer be polled. /// diff --git a/futures-sink/Cargo.toml b/futures-sink/Cargo.toml index f778606b55..a9b196dcd7 100644 --- a/futures-sink/Cargo.toml +++ b/futures-sink/Cargo.toml @@ -15,12 +15,11 @@ The asynchronous `Sink` trait for the futures-rs library. name = "futures_sink" [features] -std = ["alloc", "either/use_std", "futures-core-preview/std", "futures-channel-preview/std"] +std = ["alloc", "futures-core-preview/std", "futures-channel-preview/std"] default = ["std"] nightly = ["futures-core-preview/nightly"] alloc = ["futures-core-preview/alloc"] [dependencies] -either = { version = "1.4", default-features = false, optional = true } futures-core-preview = { path = "../futures-core", version = "=0.3.0-alpha.14", default-features = false } futures-channel-preview = { path = "../futures-channel", version = "=0.3.0-alpha.14", default-features = false } diff --git a/futures-sink/src/lib.rs b/futures-sink/src/lib.rs index 16b8211aae..0b8ccc808d 100644 --- a/futures-sink/src/lib.rs +++ b/futures-sink/src/lib.rs @@ -229,49 +229,3 @@ mod if_alloc { #[cfg(feature = "alloc")] pub use self::if_alloc::*; - -#[cfg(feature = "either")] -use either::Either; -#[cfg(feature = "either")] -impl Sink for Either - where A: Sink, - B: Sink, -{ - type SinkError = A::SinkError; - - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { - match Pin::get_unchecked_mut(self) { - Either::Left(x) => Pin::new_unchecked(x).poll_ready(cx), - Either::Right(x) => Pin::new_unchecked(x).poll_ready(cx), - } - } - } - - fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::SinkError> { - unsafe { - match Pin::get_unchecked_mut(self) { - Either::Left(x) => Pin::new_unchecked(x).start_send(item), - Either::Right(x) => Pin::new_unchecked(x).start_send(item), - } - } - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { - match Pin::get_unchecked_mut(self) { - Either::Left(x) => Pin::new_unchecked(x).poll_flush(cx), - Either::Right(x) => Pin::new_unchecked(x).poll_flush(cx), - } - } - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { - match Pin::get_unchecked_mut(self) { - Either::Left(x) => Pin::new_unchecked(x).poll_close(cx), - Either::Right(x) => Pin::new_unchecked(x).poll_close(cx), - } - } - } -} diff --git a/futures-util/Cargo.toml b/futures-util/Cargo.toml index 67204cb1f4..7eb3799c0d 100644 --- a/futures-util/Cargo.toml +++ b/futures-util/Cargo.toml @@ -15,8 +15,8 @@ Common utilities and extension traits for the futures-rs library. name = "futures_util" [features] -std = ["alloc", "futures-core-preview/std", "futures-io-preview/std", "futures-sink-preview/std", "futures-select-macro-preview/std", "either/use_std", "rand", "rand_core", "slab"] -default = ["std", "futures-core-preview/either", "futures-sink-preview/either"] +std = ["alloc", "futures-core-preview/std", "futures-io-preview/std", "futures-sink-preview/std", "futures-select-macro-preview/std", "rand", "rand_core", "slab"] +default = ["std"] compat = ["std", "futures_01"] io-compat = ["compat", "tokio-io"] bench = [] @@ -31,7 +31,6 @@ futures-channel-preview = { path = "../futures-channel", version = "=0.3.0-alpha futures-io-preview = { path = "../futures-io", version = "=0.3.0-alpha.14", default-features = false } futures-sink-preview = { path = "../futures-sink", version = "=0.3.0-alpha.14", default-features = false} futures-select-macro-preview = { path = "../futures-select-macro", version = "=0.3.0-alpha.14", default-features = false } -either = { version = "1.4", default-features = false } proc-macro-hack = "0.5" proc-macro-nested = "0.1.2" rand = { version = "0.6.4", optional = true } diff --git a/futures-util/src/future/disabled/select.rs b/futures-util/src/future/disabled/select.rs deleted file mode 100644 index 120e17f2d3..0000000000 --- a/futures-util/src/future/disabled/select.rs +++ /dev/null @@ -1,36 +0,0 @@ -use futures_core::{Future, Poll, Async}; -use futures_core::task; - -use either::Either; - -/// Future for the [`select`](super::FutureExt::select) method. -#[must_use = "futures do nothing unless polled"] -#[derive(Debug)] -pub struct Select { - inner: Option<(A, B)>, -} - -pub fn new(a: A, b: B) -> Select { - Select { inner: Some((a, b)) } -} - -impl Future for Select where A: Future, B: Future { - type Item = Either<(A::Item, B), (B::Item, A)>; - type Error = Either<(A::Error, B), (B::Error, A)>; - - fn poll(&mut self, cx: &mut Context<'_>) -> Poll { - let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice"); - match a.poll(cx) { - Err(e) => Err(Either::Left((e, b))), - Ok(Poll::Ready(x)) => Ok(Poll::Ready(Either::Left((x, b)))), - Ok(Poll::Pending) => match b.poll(cx) { - Err(e) => Err(Either::Right((e, a))), - Ok(Poll::Ready(x)) => Ok(Poll::Ready(Either::Right((x, a)))), - Ok(Poll::Pending) => { - self.inner = Some((a, b)); - Ok(Poll::Pending) - } - } - } - } -} diff --git a/futures-util/src/future/either.rs b/futures-util/src/future/either.rs new file mode 100644 index 0000000000..7ca917f5d8 --- /dev/null +++ b/futures-util/src/future/either.rs @@ -0,0 +1,127 @@ +use core::pin::Pin; +use core::task::{Context, Poll}; +use futures_core::future::Future; +use futures_core::stream::Stream; +use futures_sink::Sink; + +/// Combines two different futures, streams, or sinks having the same associated types into a single +/// type. +#[derive(Debug, Clone)] +pub enum Either { + /// First branch of the type + Left(A), + /// Second branch of the type + Right(B), +} + +impl Either<(T, A), (T, B)> { + /// Factor out a homogeneous type from an either of pairs. + /// + /// Here, the homogeneous type is the first element of the pairs. + pub fn factor_first(self) -> (T, Either) { + match self { + Either::Left((x, a)) => (x, Either::Left(a)), + Either::Right((x, b)) => (x, Either::Right(b)), + } + } +} + +impl Either<(A, T), (B, T)> { + /// Factor out a homogeneous type from an either of pairs. + /// + /// Here, the homogeneous type is the second element of the pairs. + pub fn factor_second(self) -> (Either, T) { + match self { + Either::Left((a, x)) => (Either::Left(a), x), + Either::Right((b, x)) => (Either::Right(b), x), + } + } +} + +impl Either { + /// Extract the value of an either over two equivalent types. + pub fn into_inner(self) -> T { + match self { + Either::Left(x) => x, + Either::Right(x) => x, + } + } +} + +impl Future for Either +where + A: Future, + B: Future, +{ + type Output = A::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + unsafe { + match Pin::get_unchecked_mut(self) { + Either::Left(a) => Pin::new_unchecked(a).poll(cx), + Either::Right(b) => Pin::new_unchecked(b).poll(cx), + } + } + } +} + +impl Stream for Either +where + A: Stream, + B: Stream, +{ + type Item = A::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { + match Pin::get_unchecked_mut(self) { + Either::Left(a) => Pin::new_unchecked(a).poll_next(cx), + Either::Right(b) => Pin::new_unchecked(b).poll_next(cx), + } + } + } +} + +impl Sink for Either +where + A: Sink, + B: Sink, +{ + type SinkError = A::SinkError; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { + match Pin::get_unchecked_mut(self) { + Either::Left(x) => Pin::new_unchecked(x).poll_ready(cx), + Either::Right(x) => Pin::new_unchecked(x).poll_ready(cx), + } + } + } + + fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::SinkError> { + unsafe { + match Pin::get_unchecked_mut(self) { + Either::Left(x) => Pin::new_unchecked(x).start_send(item), + Either::Right(x) => Pin::new_unchecked(x).start_send(item), + } + } + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { + match Pin::get_unchecked_mut(self) { + Either::Left(x) => Pin::new_unchecked(x).poll_flush(cx), + Either::Right(x) => Pin::new_unchecked(x).poll_flush(cx), + } + } + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { + match Pin::get_unchecked_mut(self) { + Either::Left(x) => Pin::new_unchecked(x).poll_close(cx), + Either::Right(x) => Pin::new_unchecked(x).poll_close(cx), + } + } + } +} diff --git a/futures-util/src/future/mod.rs b/futures-util/src/future/mod.rs index 0c735023b3..4a80ce2daa 100644 --- a/futures-util/src/future/mod.rs +++ b/futures-util/src/future/mod.rs @@ -57,9 +57,8 @@ pub use self::into_stream::IntoStream; mod map; pub use self::map::Map; -// Todo -// mod select; -// pub use self::select::Select; +mod select; +pub use self::select::{select, Select}; mod then; pub use self::then::Then; @@ -75,6 +74,9 @@ mod never_error; #[cfg(feature = "never-type")] pub use self::never_error::NeverError; +mod either; +pub use self::either::Either; + // Implementation details mod chain; pub(crate) use self::chain::Chain; @@ -179,52 +181,6 @@ pub trait FutureExt: Future { assert_future::(Then::new(self, f)) } - /* TODO - /// Waits for either one of two differently-typed futures to complete. - /// - /// This function will return a new future which awaits for either this or - /// the `other` future to complete. The returned future will finish with - /// both the value resolved and a future representing the completion of the - /// other work. - /// - /// Note that this function consumes the receiving futures and returns a - /// wrapped version of them. - /// - /// Also note that if both this and the second future have the same - /// success/error type you can use the `Either::split` method to - /// conveniently extract out the value at the end. - /// - /// # Examples - /// - /// ``` - /// use futures::future::{self, Either}; - /// - /// // A poor-man's join implemented on top of select - /// - /// fn join(a: A, b: B) -> Box> - /// where A: Future + 'static, - /// B: Future + 'static, - /// E: 'static, - /// { - /// Box::new(a.select(b).then(|res| -> Box> { - /// match res { - /// Ok(Either::Left((x, b))) => Box::new(b.map(move |y| (x, y))), - /// Ok(Either::Right((y, a))) => Box::new(a.map(move |x| (x, y))), - /// Err(Either::Left((e, _))) => Box::new(future::err(e)), - /// Err(Either::Right((e, _))) => Box::new(future::err(e)), - /// } - /// })) - /// } - /// ``` - fn select(self, other: B) -> Select - where B: IntoFuture, Self: Sized - { - select::new(self, other.into_future()) - } - */ - - /* ToDo: futures-core cannot implement Future for Either anymore because of - the orphan rule. Remove? Implement our own `Either`? /// Wrap this future in an `Either` future, making it the left-hand variant /// of that `Either`. /// @@ -234,16 +190,19 @@ pub trait FutureExt: Future { /// # Examples /// /// ``` - /// use futures::executor::block_on; + /// #![feature(async_await, await_macro, futures_api)] + /// # futures::executor::block_on(async { + /// use futures::future::{self, FutureExt}; /// /// let x = 6; /// let future = if x < 10 { - /// ready(true).left_future() + /// future::ready(true).left_future() /// } else { - /// ready(false).right_future() + /// future::ready(false).right_future() /// }; /// - /// assert_eq!(true, block_on(future)); + /// assert_eq!(await!(future), true); + /// # }); /// ``` fn left_future(self) -> Either where B: Future, @@ -261,23 +220,26 @@ pub trait FutureExt: Future { /// # Examples /// /// ``` - /// use futures::executor::block_on; + /// #![feature(async_await, await_macro, futures_api)] + /// # futures::executor::block_on(async { + /// use futures::future::{self, FutureExt}; /// /// let x = 6; - /// let future = if x < 10 { - /// ready(true).left_future() + /// let future = if x > 10 { + /// future::ready(true).left_future() /// } else { - /// ready(false).right_future() + /// future::ready(false).right_future() /// }; /// - /// assert_eq!(false, block_on(future)); + /// assert_eq!(await!(future), false); + /// # }); /// ``` fn right_future(self) -> Either where A: Future, Self: Sized, { Either::Right(self) - }*/ + } /// Convert this future into a single element stream. /// diff --git a/futures-util/src/future/select.rs b/futures-util/src/future/select.rs new file mode 100644 index 0000000000..09fa27206c --- /dev/null +++ b/futures-util/src/future/select.rs @@ -0,0 +1,70 @@ +use core::pin::Pin; +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use crate::future::Either; + +/// Future for the [`select`](super::FutureExt::select) function. +#[must_use = "futures do nothing unless polled"] +#[derive(Debug)] +pub struct Select { + inner: Option<(A, B)>, +} + +impl Unpin for Select {} + +/// Waits for either one of two differently-typed futures to complete. +/// +/// This function will return a new future which awaits for either one of both +/// futures to complete. The returned future will finish with both the value +/// resolved and a future representing the completion of the other work. +/// +/// Note that this function consumes the receiving futures and returns a +/// wrapped version of them. +/// +/// Also note that if both this and the second future have the same +/// output type you can use the `Either::factor_first` method to +/// conveniently extract out the value at the end. +/// +/// # Examples +/// +/// ``` +/// #![feature(futures_api)] +/// use futures::future::{self, Either, Future, FutureExt}; +/// +/// // A poor-man's join implemented on top of select +/// +/// fn join(a: A, b: B) -> impl Future +/// where A: Future + Unpin, +/// B: Future + Unpin, +/// { +/// future::select(a, b).then(|either| { +/// match either { +/// Either::Left((x, b)) => b.map(move |y| (x, y)).left_future(), +/// Either::Right((y, a)) => a.map(move |x| (x, y)).right_future(), +/// } +/// }) +/// } +/// ``` +pub fn select(future1: A, future2: B) -> Select + where A: Future + Unpin, B: Future + Unpin +{ + Select { inner: Some((future1, future2)) } +} + +impl Future for Select where A: Future, B: Future { + type Output = Either<(A::Output, B), (B::Output, A)>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice"); + match Pin::new(&mut a).poll(cx) { + Poll::Ready(x) => Poll::Ready(Either::Left((x, b))), + Poll::Pending => match Pin::new(&mut b).poll(cx) { + Poll::Ready(x) => Poll::Ready(Either::Right((x, a))), + Poll::Pending => { + self.inner = Some((a, b)); + Poll::Pending + } + } + } + } +} diff --git a/futures-util/src/sink/mod.rs b/futures-util/src/sink/mod.rs index b7092728c9..bd77591200 100644 --- a/futures-util/src/sink/mod.rs +++ b/futures-util/src/sink/mod.rs @@ -3,10 +3,10 @@ //! This module contains a number of functions for working with `Sink`s, //! including the `SinkExt` trait which adds methods to `Sink` types. -use either::Either; use futures_core::future::Future; use futures_core::stream::Stream; use futures_sink::Sink; +use crate::future::Either; #[cfg(feature = "compat")] use crate::compat::CompatSink; diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index 4622597429..24d1437590 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -4,13 +4,13 @@ //! including the `StreamExt` trait which adds methods to `Stream` types. use core::pin::Pin; -use either::Either; use futures_core::future::Future; use futures_core::stream::{FusedStream, Stream, TryStream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; #[cfg(feature = "alloc")] use alloc::boxed::Box; +use crate::future::Either; mod iter; pub use self::iter::{iter, Iter}; diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 790e7a9f69..bfd61da7ec 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -195,8 +195,10 @@ pub mod future { maybe_done, MaybeDone, poll_fn, PollFn, ready, ok, err, Ready, + select, Select, join, join3, join4, join5, Join, Join3, Join4, Join5, + Either, OptionFuture,