From 2e6290c517be566938de6a6bc63fd989ec018bd9 Mon Sep 17 00:00:00 2001 From: Wim Looman Date: Sat, 27 Apr 2019 16:58:13 +0200 Subject: [PATCH] Update for futures v0.3.0-alpha.15 Replace (most) usage of FutureObj with BoxFuture and use BoxFuture to simplify some function signatures. Enable the async-await feature for the tests. --- Cargo.toml | 3 ++- runtime-native/Cargo.toml | 2 +- runtime-native/src/lib.rs | 12 +++++------- runtime-raw/Cargo.toml | 2 +- runtime-raw/src/lib.rs | 9 ++++----- runtime-tokio/Cargo.toml | 2 +- runtime-tokio/src/lib.rs | 26 ++++++++++---------------- src/net/tcp.rs | 5 ++--- src/task.rs | 3 +-- 9 files changed, 27 insertions(+), 37 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ec6f2c8a..a35abdee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ categories = ["asynchronous", "network-programming", "filesystem", "concurrency" edition = "2018" [dependencies] -futures-preview = { version = "0.3.0-alpha.14" } +futures-preview = { version = "0.3.0-alpha.15" } runtime-attributes = { path = "runtime-attributes", version = "0.3.0-alpha.2" } runtime-raw = { path = "runtime-raw", version = "0.3.0-alpha.1" } runtime-native = { path = "runtime-native", version = "0.3.0-alpha.1" } @@ -21,6 +21,7 @@ runtime-native = { path = "runtime-native", version = "0.3.0-alpha.1" } [dev-dependencies] failure = "0.1.5" futures01 = { package = "futures", version = "0.1" } +futures-preview = { version = "0.3.0-alpha.15", features = ["nightly", "async-await"] } juliex = "0.3.0-alpha.2" mio = "0.6.16" rand = "0.6.5" diff --git a/runtime-native/Cargo.toml b/runtime-native/Cargo.toml index 900f44d9..c91c4b3b 100644 --- a/runtime-native/Cargo.toml +++ b/runtime-native/Cargo.toml @@ -14,7 +14,7 @@ edition = "2018" [dependencies] async-datagram = "2.0.0" -futures-preview = "0.3.0-alpha.13" +futures-preview = "0.3.0-alpha.15" lazy_static = "1.0.0" romio = "0.3.0-alpha.5" runtime-raw = { path = "../runtime-raw", version = "0.3.0-alpha.1" } diff --git a/runtime-native/src/lib.rs b/runtime-native/src/lib.rs index 39306aa5..0b0ecf9b 100644 --- a/runtime-native/src/lib.rs +++ b/runtime-native/src/lib.rs @@ -11,10 +11,9 @@ )] use futures::prelude::*; -use futures::{future::FutureObj, task::SpawnError}; +use futures::{future::BoxFuture, task::SpawnError}; use lazy_static::lazy_static; -use std::future::Future; use std::io; use std::net::SocketAddr; use std::pin::Pin; @@ -38,23 +37,22 @@ lazy_static! { pub struct Native; impl runtime_raw::Runtime for Native { - fn spawn_obj(&self, fut: FutureObj<'static, ()>) -> Result<(), SpawnError> { - JULIEX_THREADPOOL.spawn(fut); + fn spawn_boxed(&self, fut: BoxFuture<'static, ()>) -> Result<(), SpawnError> { + JULIEX_THREADPOOL.spawn_obj(fut.into()); Ok(()) } fn connect_tcp_stream( &self, addr: &SocketAddr, - ) -> Pin>>> + Send>> - { + ) -> BoxFuture<'static, io::Result>>> { let romio_connect = romio::TcpStream::connect(addr); let connect = romio_connect.map(|res| { res.map(|romio_stream| { Box::pin(TcpStream { romio_stream }) as Pin> }) }); - Box::pin(connect) + connect.boxed() } fn bind_tcp_listener( diff --git a/runtime-raw/Cargo.toml b/runtime-raw/Cargo.toml index df7f0c75..cbe437bb 100644 --- a/runtime-raw/Cargo.toml +++ b/runtime-raw/Cargo.toml @@ -13,4 +13,4 @@ categories = ["asynchronous", "network-programming", "filesystem", "concurrency" edition = "2018" [dependencies] -futures-preview = "0.3.0-alpha.13" +futures-preview = "0.3.0-alpha.15" diff --git a/runtime-raw/src/lib.rs b/runtime-raw/src/lib.rs index 1a75fdf8..0be99f67 100644 --- a/runtime-raw/src/lib.rs +++ b/runtime-raw/src/lib.rs @@ -15,7 +15,7 @@ )] use futures::executor; -use futures::future::FutureObj; +use futures::future::BoxFuture; use futures::prelude::*; use futures::task::SpawnError; @@ -65,8 +65,7 @@ where let _ = tx.send(t); }; - rt.spawn_obj(FutureObj::from(Box::new(fut))) - .expect("cannot spawn a future"); + rt.spawn_boxed(fut.boxed()).expect("cannot spawn a future"); executor::block_on(rx).expect("the main future has panicked") } @@ -74,7 +73,7 @@ where /// The runtime trait. pub trait Runtime: Send + Sync + 'static { /// Spawn a new future. - fn spawn_obj(&self, fut: FutureObj<'static, ()>) -> Result<(), SpawnError>; + fn spawn_boxed(&self, fut: BoxFuture<'static, ()>) -> Result<(), SpawnError>; /// Create a new `TcpStream`. /// @@ -83,7 +82,7 @@ pub trait Runtime: Send + Sync + 'static { fn connect_tcp_stream( &self, addr: &SocketAddr, - ) -> Pin>>> + Send>>; + ) -> BoxFuture<'static, io::Result>>>; /// Create a new `TcpListener`. /// diff --git a/runtime-tokio/Cargo.toml b/runtime-tokio/Cargo.toml index e51ea79d..b6c2814d 100644 --- a/runtime-tokio/Cargo.toml +++ b/runtime-tokio/Cargo.toml @@ -13,7 +13,7 @@ categories = ["asynchronous", "network-programming", "filesystem", "concurrency" edition = "2018" [dependencies] -futures-preview = { version = "0.3.0-alpha.13", features = ["compat", "io-compat"] } +futures-preview = { version = "0.3.0-alpha.15", features = ["compat", "io-compat"] } futures01 = { package = "futures", version = "0.1" } lazy_static = "1.0.0" mio = "0.6.16" diff --git a/runtime-tokio/src/lib.rs b/runtime-tokio/src/lib.rs index 3d38bdcd..122b737e 100644 --- a/runtime-tokio/src/lib.rs +++ b/runtime-tokio/src/lib.rs @@ -11,8 +11,8 @@ )] use futures::{ - compat::Compat, - future::{Future, FutureExt, FutureObj}, + compat::Future01CompatExt, + future::{BoxFuture, FutureExt, TryFutureExt}, task::SpawnError, }; use lazy_static::lazy_static; @@ -34,7 +34,7 @@ use udp::UdpSocket; pub struct Tokio; impl runtime_raw::Runtime for Tokio { - fn spawn_obj(&self, fut: FutureObj<'static, ()>) -> Result<(), SpawnError> { + fn spawn_boxed(&self, fut: BoxFuture<'static, ()>) -> Result<(), SpawnError> { lazy_static! { static ref TOKIO_RUNTIME: tokio::runtime::Runtime = { tokio::runtime::Builder::new() @@ -46,25 +46,21 @@ impl runtime_raw::Runtime for Tokio { }; } - TOKIO_RUNTIME - .executor() - .spawn(Compat::new(fut.map(|_| Ok(())))); + TOKIO_RUNTIME.executor().spawn(fut.unit_error().compat()); Ok(()) } fn connect_tcp_stream( &self, addr: &SocketAddr, - ) -> Pin>>> + Send>> - { - use futures::compat::Compat01As03; + ) -> BoxFuture<'static, io::Result>>> { use futures01::Future; let tokio_connect = tokio::net::TcpStream::connect(addr); let connect = tokio_connect.map(|tokio_stream| { Box::pin(TcpStream { tokio_stream }) as Pin> }); - Box::pin(Compat01As03::new(connect)) + connect.compat().boxed() } fn bind_tcp_listener( @@ -89,7 +85,7 @@ impl runtime_raw::Runtime for Tokio { pub struct TokioCurrentThread; impl runtime_raw::Runtime for TokioCurrentThread { - fn spawn_obj(&self, fut: FutureObj<'static, ()>) -> Result<(), SpawnError> { + fn spawn_boxed(&self, fut: BoxFuture<'static, ()>) -> Result<(), SpawnError> { lazy_static! { static ref TOKIO_RUNTIME: Mutex = { let (tx, rx) = mpsc::channel(); @@ -114,7 +110,7 @@ impl runtime_raw::Runtime for TokioCurrentThread { TOKIO_RUNTIME .lock() .unwrap() - .spawn(Compat::new(fut.map(|_| Ok(())))) + .spawn(fut.unit_error().compat()) .unwrap(); Ok(()) } @@ -122,16 +118,14 @@ impl runtime_raw::Runtime for TokioCurrentThread { fn connect_tcp_stream( &self, addr: &SocketAddr, - ) -> Pin>>> + Send>> - { - use futures::compat::Compat01As03; + ) -> BoxFuture<'static, io::Result>>> { use futures01::Future; let tokio_connect = tokio::net::TcpStream::connect(addr); let connect = tokio_connect.map(|tokio_stream| { Box::pin(TcpStream { tokio_stream }) as Pin> }); - Box::pin(Compat01As03::new(connect)) + connect.compat().boxed() } fn bind_tcp_listener( diff --git a/src/net/tcp.rs b/src/net/tcp.rs index 3402f5f3..a315decf 100644 --- a/src/net/tcp.rs +++ b/src/net/tcp.rs @@ -21,6 +21,7 @@ use std::io; use std::net::{SocketAddr, ToSocketAddrs}; use std::pin::Pin; +use futures::future::BoxFuture; use futures::io::*; use futures::prelude::*; use futures::ready; @@ -219,9 +220,7 @@ impl AsyncWrite for TcpStream { pub struct Connect { addrs: Option>>, last_err: Option, - future: Option< - Pin>>> + Send>>, - >, + future: Option>>>>, runtime: &'static dyn runtime_raw::Runtime, } diff --git a/src/task.rs b/src/task.rs index 6a750cb3..ea4d32e3 100644 --- a/src/task.rs +++ b/src/task.rs @@ -2,7 +2,6 @@ use std::pin::Pin; -use futures::future::FutureObj; use futures::prelude::*; use futures::task::{Context, Poll}; @@ -37,7 +36,7 @@ where }; runtime_raw::current_runtime() - .spawn_obj(FutureObj::from(Box::new(fut))) + .spawn_boxed(fut.boxed()) .expect("cannot spawn a future"); JoinHandle { rx }