Skip to content
This repository was archived by the owner on Oct 30, 2019. It is now read-only.

Update for futures v0.3.0-alpha.15 #18

Merged
merged 1 commit into from
Apr 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ categories = ["asynchronous", "network-programming", "filesystem", "concurrency"
edition = "2018"

[dependencies]
futures-preview = { version = "0.3.0-alpha.14" }
futures-preview = { version = "0.3.0-alpha.15" }
runtime-attributes = { path = "runtime-attributes", version = "0.3.0-alpha.2" }
runtime-raw = { path = "runtime-raw", version = "0.3.0-alpha.1" }
runtime-native = { path = "runtime-native", version = "0.3.0-alpha.1" }

[dev-dependencies]
failure = "0.1.5"
futures01 = { package = "futures", version = "0.1" }
futures-preview = { version = "0.3.0-alpha.15", features = ["nightly", "async-await"] }
juliex = "0.3.0-alpha.2"
mio = "0.6.16"
rand = "0.6.5"
Expand Down
2 changes: 1 addition & 1 deletion runtime-native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ edition = "2018"

[dependencies]
async-datagram = "2.0.0"
futures-preview = "0.3.0-alpha.13"
futures-preview = "0.3.0-alpha.15"
lazy_static = "1.0.0"
romio = "0.3.0-alpha.5"
runtime-raw = { path = "../runtime-raw", version = "0.3.0-alpha.1" }
Expand Down
12 changes: 5 additions & 7 deletions runtime-native/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@
)]

use futures::prelude::*;
use futures::{future::FutureObj, task::SpawnError};
use futures::{future::BoxFuture, task::SpawnError};
use lazy_static::lazy_static;

use std::future::Future;
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
Expand All @@ -38,23 +37,22 @@ lazy_static! {
pub struct Native;

impl runtime_raw::Runtime for Native {
fn spawn_obj(&self, fut: FutureObj<'static, ()>) -> Result<(), SpawnError> {
JULIEX_THREADPOOL.spawn(fut);
fn spawn_boxed(&self, fut: BoxFuture<'static, ()>) -> Result<(), SpawnError> {
JULIEX_THREADPOOL.spawn_obj(fut.into());
Ok(())
}

fn connect_tcp_stream(
&self,
addr: &SocketAddr,
) -> Pin<Box<dyn Future<Output = io::Result<Pin<Box<dyn runtime_raw::TcpStream>>>> + Send>>
{
) -> BoxFuture<'static, io::Result<Pin<Box<dyn runtime_raw::TcpStream>>>> {
let romio_connect = romio::TcpStream::connect(addr);
let connect = romio_connect.map(|res| {
res.map(|romio_stream| {
Box::pin(TcpStream { romio_stream }) as Pin<Box<dyn runtime_raw::TcpStream>>
})
});
Box::pin(connect)
connect.boxed()
}

fn bind_tcp_listener(
Expand Down
2 changes: 1 addition & 1 deletion runtime-raw/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ categories = ["asynchronous", "network-programming", "filesystem", "concurrency"
edition = "2018"

[dependencies]
futures-preview = "0.3.0-alpha.13"
futures-preview = "0.3.0-alpha.15"
9 changes: 4 additions & 5 deletions runtime-raw/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
)]

use futures::executor;
use futures::future::FutureObj;
use futures::future::BoxFuture;
use futures::prelude::*;
use futures::task::SpawnError;

Expand Down Expand Up @@ -65,16 +65,15 @@ where
let _ = tx.send(t);
};

rt.spawn_obj(FutureObj::from(Box::new(fut)))
.expect("cannot spawn a future");
rt.spawn_boxed(fut.boxed()).expect("cannot spawn a future");

executor::block_on(rx).expect("the main future has panicked")
}

/// The runtime trait.
pub trait Runtime: Send + Sync + 'static {
/// Spawn a new future.
fn spawn_obj(&self, fut: FutureObj<'static, ()>) -> Result<(), SpawnError>;
fn spawn_boxed(&self, fut: BoxFuture<'static, ()>) -> Result<(), SpawnError>;

/// Create a new `TcpStream`.
///
Expand All @@ -83,7 +82,7 @@ pub trait Runtime: Send + Sync + 'static {
fn connect_tcp_stream(
&self,
addr: &SocketAddr,
) -> Pin<Box<dyn Future<Output = io::Result<Pin<Box<dyn TcpStream>>>> + Send>>;
) -> BoxFuture<'static, io::Result<Pin<Box<dyn TcpStream>>>>;

/// Create a new `TcpListener`.
///
Expand Down
2 changes: 1 addition & 1 deletion runtime-tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ categories = ["asynchronous", "network-programming", "filesystem", "concurrency"
edition = "2018"

[dependencies]
futures-preview = { version = "0.3.0-alpha.13", features = ["compat", "io-compat"] }
futures-preview = { version = "0.3.0-alpha.15", features = ["compat", "io-compat"] }
futures01 = { package = "futures", version = "0.1" }
lazy_static = "1.0.0"
mio = "0.6.16"
Expand Down
26 changes: 10 additions & 16 deletions runtime-tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
)]

use futures::{
compat::Compat,
future::{Future, FutureExt, FutureObj},
compat::Future01CompatExt,
future::{BoxFuture, FutureExt, TryFutureExt},
task::SpawnError,
};
use lazy_static::lazy_static;
Expand All @@ -34,7 +34,7 @@ use udp::UdpSocket;
pub struct Tokio;

impl runtime_raw::Runtime for Tokio {
fn spawn_obj(&self, fut: FutureObj<'static, ()>) -> Result<(), SpawnError> {
fn spawn_boxed(&self, fut: BoxFuture<'static, ()>) -> Result<(), SpawnError> {
lazy_static! {
static ref TOKIO_RUNTIME: tokio::runtime::Runtime = {
tokio::runtime::Builder::new()
Expand All @@ -46,25 +46,21 @@ impl runtime_raw::Runtime for Tokio {
};
}

TOKIO_RUNTIME
.executor()
.spawn(Compat::new(fut.map(|_| Ok(()))));
TOKIO_RUNTIME.executor().spawn(fut.unit_error().compat());
Ok(())
}

fn connect_tcp_stream(
&self,
addr: &SocketAddr,
) -> Pin<Box<dyn Future<Output = io::Result<Pin<Box<dyn runtime_raw::TcpStream>>>> + Send>>
{
use futures::compat::Compat01As03;
) -> BoxFuture<'static, io::Result<Pin<Box<dyn runtime_raw::TcpStream>>>> {
use futures01::Future;

let tokio_connect = tokio::net::TcpStream::connect(addr);
let connect = tokio_connect.map(|tokio_stream| {
Box::pin(TcpStream { tokio_stream }) as Pin<Box<dyn runtime_raw::TcpStream>>
});
Box::pin(Compat01As03::new(connect))
connect.compat().boxed()
}

fn bind_tcp_listener(
Expand All @@ -89,7 +85,7 @@ impl runtime_raw::Runtime for Tokio {
pub struct TokioCurrentThread;

impl runtime_raw::Runtime for TokioCurrentThread {
fn spawn_obj(&self, fut: FutureObj<'static, ()>) -> Result<(), SpawnError> {
fn spawn_boxed(&self, fut: BoxFuture<'static, ()>) -> Result<(), SpawnError> {
lazy_static! {
static ref TOKIO_RUNTIME: Mutex<tokio::runtime::current_thread::Handle> = {
let (tx, rx) = mpsc::channel();
Expand All @@ -114,24 +110,22 @@ impl runtime_raw::Runtime for TokioCurrentThread {
TOKIO_RUNTIME
.lock()
.unwrap()
.spawn(Compat::new(fut.map(|_| Ok(()))))
.spawn(fut.unit_error().compat())
.unwrap();
Ok(())
}

fn connect_tcp_stream(
&self,
addr: &SocketAddr,
) -> Pin<Box<dyn Future<Output = io::Result<Pin<Box<dyn runtime_raw::TcpStream>>>> + Send>>
{
use futures::compat::Compat01As03;
) -> BoxFuture<'static, io::Result<Pin<Box<dyn runtime_raw::TcpStream>>>> {
use futures01::Future;

let tokio_connect = tokio::net::TcpStream::connect(addr);
let connect = tokio_connect.map(|tokio_stream| {
Box::pin(TcpStream { tokio_stream }) as Pin<Box<dyn runtime_raw::TcpStream>>
});
Box::pin(Compat01As03::new(connect))
connect.compat().boxed()
}

fn bind_tcp_listener(
Expand Down
5 changes: 2 additions & 3 deletions src/net/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::io;
use std::net::{SocketAddr, ToSocketAddrs};
use std::pin::Pin;

use futures::future::BoxFuture;
use futures::io::*;
use futures::prelude::*;
use futures::ready;
Expand Down Expand Up @@ -219,9 +220,7 @@ impl AsyncWrite for TcpStream {
pub struct Connect {
addrs: Option<io::Result<VecDeque<SocketAddr>>>,
last_err: Option<io::Error>,
future: Option<
Pin<Box<dyn Future<Output = io::Result<Pin<Box<dyn runtime_raw::TcpStream>>>> + Send>>,
>,
future: Option<BoxFuture<'static, io::Result<Pin<Box<dyn runtime_raw::TcpStream>>>>>,
runtime: &'static dyn runtime_raw::Runtime,
}

Expand Down
3 changes: 1 addition & 2 deletions src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

use std::pin::Pin;

use futures::future::FutureObj;
use futures::prelude::*;
use futures::task::{Context, Poll};

Expand Down Expand Up @@ -37,7 +36,7 @@ where
};

runtime_raw::current_runtime()
.spawn_obj(FutureObj::from(Box::new(fut)))
.spawn_boxed(fut.boxed())
.expect("cannot spawn a future");

JoinHandle { rx }
Expand Down