Skip to content

Commit 69fb4b1

Browse files
Refactor compatibility layer
1 parent c84b5a6 commit 69fb4b1

File tree

7 files changed

+186
-178
lines changed

7 files changed

+186
-178
lines changed

futures-util/src/compat/compat.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/// Converts a futures 0.3 `TryFuture` into a futures 0.1 `Future`
2+
/// and vice versa.
3+
#[derive(Debug)]
4+
#[must_use = "futures do nothing unless polled"]
5+
pub struct Compat<Fut, Ex> {
6+
crate future: Fut,
7+
crate executor: Option<Ex>,
8+
}
9+
10+
impl<Fut, Ex> Compat<Fut, Ex> {
11+
/// Returns the inner future.
12+
pub fn into_inner(self) -> Fut {
13+
self.future
14+
}
15+
16+
/// Creates a new `Compat`.
17+
crate fn new(future: Fut, executor: Option<Ex>) -> Compat<Fut, Ex> {
18+
Compat { future, executor }
19+
}
20+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
use super::Compat;
2+
use futures::Async as Async01;
3+
use futures::Future as Future01;
4+
use futures::executor::{self as executor01, NotifyHandle as NotifyHandle01,
5+
Notify as Notify01, UnsafeNotify as UnsafeNotify01};
6+
use futures_core::Future as Future03;
7+
use futures_core::task as task03;
8+
use std::mem::PinMut;
9+
10+
impl<Fut: Future01> Future03 for Compat<Fut, ()> {
11+
type Output = Result<Fut::Item, Fut::Error>;
12+
13+
fn poll(
14+
self: PinMut<Self>,
15+
cx: &mut task03::Context
16+
) -> task03::Poll<Self::Output> {
17+
let notify = &WakerToHandle(cx.waker());
18+
19+
executor01::with_notify(notify, 0, move || {
20+
unsafe {
21+
match PinMut::get_mut_unchecked(self).future.poll() {
22+
Ok(Async01::Ready(t)) => task03::Poll::Ready(Ok(t)),
23+
Ok(Async01::NotReady) => task03::Poll::Pending,
24+
Err(e) => task03::Poll::Ready(Err(e)),
25+
}
26+
}
27+
})
28+
}
29+
}
30+
31+
struct NotifyWaker(task03::Waker);
32+
33+
#[derive(Clone)]
34+
struct WakerToHandle<'a>(&'a task03::Waker);
35+
36+
impl<'a> From<WakerToHandle<'a>> for NotifyHandle01 {
37+
fn from(handle: WakerToHandle<'a>) -> NotifyHandle01 {
38+
let ptr = Box::new(NotifyWaker(handle.0.clone()));
39+
40+
unsafe {
41+
NotifyHandle01::new(Box::into_raw(ptr))
42+
}
43+
}
44+
}
45+
46+
impl Notify01 for NotifyWaker {
47+
fn notify(&self, _: usize) {
48+
self.0.wake();
49+
}
50+
}
51+
52+
unsafe impl UnsafeNotify01 for NotifyWaker {
53+
unsafe fn clone_raw(&self) -> NotifyHandle01 {
54+
WakerToHandle(&self.0).into()
55+
}
56+
57+
unsafe fn drop_raw(&self) {
58+
let ptr: *const dyn UnsafeNotify01 = self;
59+
drop(Box::from_raw(ptr as *mut dyn UnsafeNotify01));
60+
}
61+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
use super::Compat;
2+
use futures::Future as Future01;
3+
use futures::Poll as Poll01;
4+
use futures::task as task01;
5+
use futures::Async as Async01;
6+
use futures_core::TryFuture as TryFuture03;
7+
use futures_core::task as task03;
8+
use std::marker::Unpin;
9+
use std::mem::PinMut;
10+
use std::sync::Arc;
11+
12+
impl<Fut, Ex> Future01 for Compat<Fut, Ex>
13+
where Fut: TryFuture03 + Unpin,
14+
Ex: task03::Executor
15+
{
16+
type Item = Fut::Ok;
17+
type Error = Fut::Error;
18+
19+
fn poll(&mut self) -> Poll01<Self::Item, Self::Error> {
20+
let waker = current_as_waker();
21+
let mut cx = task03::Context::new(&waker, self.executor.as_mut().unwrap());
22+
match PinMut::new(&mut self.future).try_poll(&mut cx) {
23+
task03::Poll::Ready(Ok(t)) => Ok(Async01::Ready(t)),
24+
task03::Poll::Pending => Ok(Async01::NotReady),
25+
task03::Poll::Ready(Err(e)) => Err(e),
26+
}
27+
}
28+
}
29+
30+
fn current_as_waker() -> task03::LocalWaker {
31+
let arc_waker = Arc::new(Current(task01::current()));
32+
task03::local_waker_from_nonlocal(arc_waker)
33+
}
34+
35+
struct Current(task01::Task);
36+
37+
impl task03::Wake for Current {
38+
fn wake(arc_self: &Arc<Self>) {
39+
arc_self.0.notify();
40+
}
41+
}

futures-util/src/compat/executor.rs

Lines changed: 34 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,70 +1,71 @@
11

2+
use super::Compat;
3+
use crate::{TryFutureExt, FutureExt, future::NeverError};
24
use futures::future::Executor as Executor01;
3-
45
use futures_core::task::Executor as Executor03;
56
use futures_core::task as task03;
67
use futures_core::future::FutureObj;
78

8-
use super::Compat;
9-
use crate::{TryFutureExt, FutureExt, future::NeverError};
10-
11-
pub struct BoxedExecutor(Box<dyn Executor03 + Send>);
9+
pub struct BoxedExecutor03(Box<dyn Executor03 + Send>);
1210

13-
impl Executor03 for BoxedExecutor {
14-
fn spawn_obj(&mut self, future: FutureObj<'static, ()>) -> Result<(), task03::SpawnObjError> {
11+
impl Executor03 for BoxedExecutor03 {
12+
fn spawn_obj(
13+
&mut self,
14+
future: FutureObj<'static, ()>,
15+
) -> Result<(), task03::SpawnObjError> {
1516
(&mut *self.0).spawn_obj(future)
1617
}
1718
}
1819

1920
/// A future that can run on a futures 0.1 executor.
20-
pub type ExecutorFuture01 = Compat<NeverError<FutureObj<'static, ()>>, BoxedExecutor>;
21+
pub type Executor01Future = Compat<NeverError<FutureObj<'static, ()>>, BoxedExecutor03>;
2122

2223
/// Extension trait for futures 0.1 Executors.
23-
pub trait Executor01CompatExt: Executor01<ExecutorFuture01>
24-
+ Clone + Send + 'static
24+
pub trait Executor01CompatExt: Executor01<Executor01Future> +
25+
Clone + Send + 'static
2526
{
2627
/// Creates an `Executor` compatable with futures 0.3.
27-
fn compat(self) -> CompatExecutor<Self>
28+
fn compat(self) -> Executor01As03<Self>
2829
where Self: Sized;
2930
}
3031

31-
impl<E> Executor01CompatExt for E
32-
where E: Executor01<ExecutorFuture01>,
33-
E: Clone + Send + 'static
32+
impl<Ex> Executor01CompatExt for Ex
33+
where Ex: Executor01<Executor01Future> + Clone + Send + 'static
3434
{
35-
fn compat(self) -> CompatExecutor<Self> {
36-
CompatExecutor {
37-
exec: self,
35+
fn compat(self) -> Executor01As03<Self> {
36+
Executor01As03 {
37+
executor01: self,
3838
}
3939
}
4040
}
4141

4242
/// Converts a futures 0.1 `Executor` into a futures 0.3 `Executor`.
4343
#[derive(Clone)]
44-
pub struct CompatExecutor<E> {
45-
exec: E
44+
pub struct Executor01As03<Ex> {
45+
executor01: Ex
4646
}
4747

48-
impl<E> Executor03 for CompatExecutor<E>
49-
where E: Executor01<ExecutorFuture01>,
50-
E: Clone + Send + 'static,
48+
impl<Ex> Executor03 for Executor01As03<Ex>
49+
where Ex: Executor01<Executor01Future>,
50+
Ex: Clone + Send + 'static,
5151
{
5252
fn spawn_obj(
53-
&mut self,
53+
&mut self,
5454
future: FutureObj<'static, ()>,
5555
) -> Result<(), task03::SpawnObjError> {
56-
57-
let fut = future.never_error().compat(BoxedExecutor(Box::new(self.clone())));
56+
let future = future.never_error().compat(BoxedExecutor03(Box::new(self.clone())));
5857

59-
self.exec.execute(fut)
60-
.map_err(|exec_err| {
58+
match self.executor01.execute(future) {
59+
Ok(()) => Ok(()),
60+
Err(err) => {
6161
use futures_core::task::{SpawnObjError, SpawnErrorKind};
62-
63-
let fut = exec_err.into_future().into_inner().unwrap_or_else(|_| ());
64-
SpawnObjError {
62+
63+
let fut = err.into_future().into_inner().unwrap_or_else(|_| ());
64+
Err(SpawnObjError {
6565
kind: SpawnErrorKind::shutdown(),
66-
task: Box::new(fut).into(),
67-
}
68-
})
66+
future: Box::new(fut).into(),
67+
})
68+
}
69+
}
6970
}
7071
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
use super::Compat;
2+
use futures::Future as Future01;
3+
4+
impl<Fut: Future01> Future01CompatExt for Fut {}
5+
6+
/// Extension trait for futures 0.1 Futures.
7+
pub trait Future01CompatExt: Future01 {
8+
/// Converts a futures 0.1 `Future<Item = T, Error = E>` into a
9+
/// futures 0.3 `Future<Output = Result<T, E>>`.
10+
fn compat(self) -> Compat<Self, ()> where Self: Sized {
11+
Compat {
12+
future: self,
13+
executor: None,
14+
}
15+
}
16+
}
17+
18+

0 commit comments

Comments
 (0)