Skip to content

Commit a15185d

Browse files
committed
finished executors
1 parent 5543083 commit a15185d

File tree

2 files changed

+56
-30
lines changed

2 files changed

+56
-30
lines changed

futures-util/src/compat/executor.rs

Lines changed: 48 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,70 @@
11

2-
pub trait ExecCompat: Executor01<
3-
Compat<FutureObj<'static, ()>, BoxedExecutor>
4-
> + Clone + Send + 'static
2+
use futures::future::Executor as Executor01;
3+
4+
use futures_core::task::Executor as Executor03;
5+
use futures_core::task as task03;
6+
use futures_core::future::FutureObj;
7+
8+
use super::Compat;
9+
use crate::{TryFutureExt, FutureExt, future::NeverError};
10+
11+
pub struct BoxedExecutor(Box<dyn Executor03 + Send>);
12+
13+
impl Executor03 for BoxedExecutor {
14+
fn spawn_obj(&mut self, future: FutureObj<'static, ()>) -> Result<(), task03::SpawnObjError> {
15+
(&mut *self.0).spawn_obj(future)
16+
}
17+
}
18+
19+
/// A future that can run on a futures 0.1 executor.
20+
pub type ExecutorFuture01 = Compat<NeverError<FutureObj<'static, ()>>, BoxedExecutor>;
21+
22+
/// Extension trait for futures 0.1 Executors.
23+
pub trait Executor01CompatExt: Executor01<ExecutorFuture01>
24+
+ Clone + Send + 'static
525
{
6-
fn compat(self) -> ExecutorCompat<Self>
26+
/// Creates an exector compatable with futures 0.3.
27+
fn compat(self) -> CompatExecutor<Self>
728
where Self: Sized;
829
}
930

10-
impl<E> ExecCompat for E
11-
where E: Executor01<
12-
Compat<FutureObj<'static, ()>, BoxedExecutor>
13-
>,
31+
impl<E> Executor01CompatExt for E
32+
where E: Executor01<ExecutorFuture01>,
1433
E: Clone + Send + 'static
1534
{
16-
fn compat(self) -> ExecutorCompat<Self> {
17-
ExecutorCompat {
35+
fn compat(self) -> CompatExecutor<Self> {
36+
CompatExecutor {
1837
exec: self,
1938
}
2039
}
2140
}
2241

42+
/// Converts `futures 0.1` Executors into `futures 0.3` Executors
2343
#[derive(Clone)]
24-
pub struct ExecutorCompat<E> {
44+
pub struct CompatExecutor<E> {
2545
exec: E
2646
}
2747

28-
impl<E> Executor03 for ExecutorCompat<E>
29-
where E: Executor01<
30-
Compat<FutureObj<'static, ()>, Box<Executor03>>
31-
>,
48+
impl<E> Executor03 for CompatExecutor<E>
49+
where E: Executor01<ExecutorFuture01>,
3250
E: Clone + Send + 'static,
3351
{
34-
fn spawn_obj(&mut self, obj: FutureObj<'static, ()>) -> Result<(), task::SpawnObjError> {
52+
fn spawn_obj(
53+
&mut self,
54+
future: FutureObj<'static, ()>,
55+
) -> Result<(), task03::SpawnObjError> {
3556

36-
self.exec.execute(obj.compat(Box::new(self.clone())))
37-
.map_err(|exec_err| {
38-
use futures_core::task::{SpawnObjError, SpawnErrorKind};
57+
let fut = future.never_error().compat(BoxedExecutor(Box::new(self.clone())));
58+
59+
self.exec.execute(fut)
60+
.map_err(|exec_err| {
61+
use futures_core::task::{SpawnObjError, SpawnErrorKind};
3962

40-
let fut = exec_err.into_future().compat().map(|_| ());
41-
SpawnObjError {
42-
kind: SpawnErrorKind::shutdown(),
43-
task: Box::new(fut).into(),
44-
}
45-
})
63+
let fut = exec_err.into_future().into_inner().unwrap_or_else(|_| ());
64+
SpawnObjError {
65+
kind: SpawnErrorKind::shutdown(),
66+
task: Box::new(fut).into(),
67+
}
68+
})
4669
}
4770
}

futures-util/src/compat/mod.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,17 @@ use futures::executor::{with_notify, NotifyHandle, Notify, UnsafeNotify};
1212
use futures_core::Future as Future03;
1313
use futures_core::TryFuture as TryFuture03;
1414
use futures_core::Poll as Poll03;
15-
use futures_core::task;
15+
use futures_core::task as task03;
1616
use futures_core::task::Executor as Executor03;
1717
use futures_core::task::{Wake, Waker, LocalWaker, local_waker_from_nonlocal};
1818

19-
use core::mem::PinMut;
20-
use core::marker::Unpin;
19+
use std::mem::PinMut;
20+
use std::marker::Unpin;
2121
use std::sync::Arc;
2222

23+
mod executor;
24+
pub use self::executor::{Executor01CompatExt, ExecutorFuture01, CompatExecutor};
25+
2326
/// Converts a futures 0.3 `TryFuture` into a futures 0.1 `Future`
2427
/// and vice versa.
2528
#[derive(Debug)]
@@ -47,7 +50,7 @@ impl<Fut, E> Compat<Fut, E> {
4750
impl<T> Future03 for Compat<T, ()> where T: Future01 {
4851
type Output = Result<T::Item, T::Error>;
4952

50-
fn poll(self: PinMut<Self>, cx: &mut task::Context) -> Poll03<Self::Output> {
53+
fn poll(self: PinMut<Self>, cx: &mut task03::Context) -> Poll03<Self::Output> {
5154
use futures::Async;
5255

5356
let notify = &WakerToHandle(cx.waker());
@@ -109,7 +112,7 @@ impl<T, E> Future01 for Compat<T, E> where T: TryFuture03 + Unpin,
109112
use futures::Async;
110113

111114
let waker = current_as_waker();
112-
let mut cx = task::Context::new(&waker, self.executor.as_mut().unwrap());
115+
let mut cx = task03::Context::new(&waker, self.executor.as_mut().unwrap());
113116
match PinMut::new(&mut self.inner).try_poll(&mut cx) {
114117
Poll03::Ready(Ok(t)) => Ok(Async::Ready(t)),
115118
Poll03::Pending => Ok(Async::NotReady),

0 commit comments

Comments
 (0)