diff --git a/Cargo.toml b/Cargo.toml index d3008b43..92cae673 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ edition = "2018" futures-preview = "0.3.0-alpha.15" runtime-attributes = { path = "runtime-attributes", version = "0.3.0-alpha.3" } runtime-raw = { path = "runtime-raw", version = "0.3.0-alpha.2" } -runtime-native = { path = "runtime-native", version = "0.3.0-alpha.2" } +# runtime-native = { path = "runtime-native", version = "0.3.0-alpha.2" } [dev-dependencies] failure = "0.1.5" @@ -40,3 +40,8 @@ members = [ "runtime-raw", "runtime-tokio", ] + +[patch.crates-io.futures-preview] +git = "https://github.com/taiki-e/futures-rs" +branch = "async-stream" +features = ["async-stream", "nightly"] diff --git a/examples/guessing.rs b/examples/guessing.rs index 7d1dbfd2..e6bd978f 100644 --- a/examples/guessing.rs +++ b/examples/guessing.rs @@ -61,9 +61,10 @@ async fn main() -> Result<(), failure::Error> { let mut listener = TcpListener::bind("127.0.0.1:8080")?; println!("Listening on {}", &listener.local_addr()?); - let mut incoming = listener.incoming(); - while let Some(stream) = await!(incoming.next()) { - runtime::spawn(play(stream?)); - } + let incoming = listener.incoming().map_err(|e| e.into()); + await!(incoming.try_for_each_concurrent(!0, async move |stream| { + await!(runtime::spawn(play(stream)))?; + Ok::<(), failure::Error>(()) + }))?; Ok(()) } diff --git a/examples/tcp-echo.rs b/examples/tcp-echo.rs index 300ae5a4..4a7d0efc 100644 --- a/examples/tcp-echo.rs +++ b/examples/tcp-echo.rs @@ -3,27 +3,24 @@ //! Run the server and connect to it with `nc 127.0.0.1 8080`. //! The server will wait for you to enter lines of text and then echo them back. -#![feature(async_await, await_macro)] +#![feature(async_await, await_macro, stmt_expr_attributes, proc_macro_hygiene)] use futures::prelude::*; use runtime::net::TcpListener; +use runtime::for_await; -#[runtime::main] +#[runtime::main(runtime_tokio::Tokio)] async fn main() -> std::io::Result<()> { let mut listener = TcpListener::bind("127.0.0.1:8080")?; println!("Listening on {}", listener.local_addr()?); - // accept connections and process them in parallel - let mut incoming = listener.incoming(); - while let Some(stream) = await!(incoming.next()) { - runtime::spawn(async move { - let stream = stream?; - println!("Accepting from: {}", stream.peer_addr()?); + #[for_await(try_parallel)] + for stream in listener.incoming() { + println!("Accepting from: {}", stream.peer_addr()?); - let (reader, writer) = &mut stream.split(); - await!(reader.copy_into(writer))?; - Ok::<(), std::io::Error>(()) - }); + let (reader, writer) = &mut stream.split(); + await!(reader.copy_into(writer))?; + Ok::<(), std::io::Error>(()) } Ok(()) } diff --git a/examples/tcp-proxy.rs b/examples/tcp-proxy.rs index 1e132106..e2209b78 100644 --- a/examples/tcp-proxy.rs +++ b/examples/tcp-proxy.rs @@ -11,28 +11,26 @@ async fn main() -> std::io::Result<()> { let mut listener = TcpListener::bind("127.0.0.1:8081")?; println!("Listening on {}", listener.local_addr()?); - // accept connections and process them serially - let mut incoming = listener.incoming(); - while let Some(client) = await!(incoming.next()) { - let handle = runtime::spawn(async move { - let client = client?; - let server = await!(TcpStream::connect("127.0.0.1:8080"))?; - println!( - "Proxying {} to {}", - client.peer_addr()?, - server.peer_addr()? - ); + // accept connections and process them in parallel + await!(listener + .incoming() + .try_for_each_concurrent(!0, async move |client| { + await!(runtime::spawn(async move { + let server = await!(TcpStream::connect("127.0.0.1:8080"))?; + println!( + "Proxying {} to {}", + client.peer_addr()?, + server.peer_addr()? + ); - let (cr, cw) = &mut client.split(); - let (sr, sw) = &mut server.split(); - let a = cr.copy_into(sw); - let b = sr.copy_into(cw); - try_join!(a, b)?; + let (cr, cw) = &mut client.split(); + let (sr, sw) = &mut server.split(); + let a = cr.copy_into(sw); + let b = sr.copy_into(cw); + try_join!(a, b)?; - Ok::<(), std::io::Error>(()) - }); - - await!(handle)?; - } + Ok::<(), std::io::Error>(()) + })) + }))?; Ok(()) } diff --git a/runtime-attributes/Cargo.toml b/runtime-attributes/Cargo.toml index 4e8228fd..db43070d 100644 --- a/runtime-attributes/Cargo.toml +++ b/runtime-attributes/Cargo.toml @@ -16,7 +16,7 @@ edition = "2018" proc-macro = true [dependencies] -syn = { version = "0.15.33", features = ["full"] } +syn = { version = "0.15.34", features = ["full", "extra-traits"] } proc-macro2 = { version = "0.4.29", features = ["nightly"] } quote = "0.6.12" diff --git a/runtime-attributes/src/lib.rs b/runtime-attributes/src/lib.rs index bd3e9ba0..4d22d803 100644 --- a/runtime-attributes/src/lib.rs +++ b/runtime-attributes/src/lib.rs @@ -169,3 +169,69 @@ pub fn bench(attr: TokenStream, item: TokenStream) -> TokenStream { result.into() } + +/// Create an async loop. +/// +/// # Examples +/// +/// ```ignore +/// #![feature(async_await, await_macro)] +/// +/// use futures::prelude::*; +/// use futures::stream; +/// use runtime::for_await; +/// +/// #[runtime::main] +/// async fn main() { +/// // Print items in a series +/// #[for_await] +/// for value in stream::iter(1..=5) { +/// println!("{}", value); +/// } +/// +/// // Print items in a series +/// #[for_await(serial)] +/// for value in stream::iter(1..=5) { +/// println!("{}", value); +/// } +/// +/// // Print items in parallel, spawning each iteration on the threadpool +/// #[for_await(try_parallel)] +/// for value in stream::iter(1..=5) { +/// println!("{}", value); +/// } +/// } +/// ``` +#[proc_macro_attribute] +pub fn for_await(attr: TokenStream, item: TokenStream) -> TokenStream { + let attr_value = if attr.is_empty() { + syn::parse_str("serial").unwrap() + } else { + syn::parse_macro_input!(attr as syn::Expr) + }; + + let input = syn::parse_macro_input!(item as syn::ExprForLoop); + let attrs = &input.attrs; + let pat = &input.pat; + let expr = &input.expr; + let body_block = &input.body; + + match &*format!("{:?}", attr_value) { + "serial" => quote! { + #(#attrs)* + #[futures::for_await] + for #pat in #expr #body_block + } + .into(), + "try_parallel" => quote! { + let stream = #expr.map_err(|e| e.into()); + await!(stream.try_for_each_concurrent(None, async move |#expr| { + await!(runtime::spawn(async move #body_block)) + }))?; + } + .into(), + _ => TokenStream::from(quote_spanned! { + input.span() => compile_error!(r##"#[for_await] takes an optional argument of either "serial" or "try_parallel"##); + }), + } +} diff --git a/src/lib.rs b/src/lib.rs index 28db38c6..285d9d76 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -101,7 +101,7 @@ pub mod task; pub use task::spawn; #[doc(inline)] -pub use runtime_attributes::{bench, test}; +pub use runtime_attributes::{bench, for_await, test}; #[doc(inline)] #[cfg(not(test))] // NOTE: exporting main breaks tests, we should file an issue. @@ -110,5 +110,5 @@ pub use runtime_attributes::main; #[doc(hidden)] pub use runtime_raw as raw; -#[doc(hidden)] -pub use runtime_native as native; +// #[doc(hidden)] +// pub use runtime_native as native; diff --git a/src/net/tcp.rs b/src/net/tcp.rs index a315decf..3e772c09 100644 --- a/src/net/tcp.rs +++ b/src/net/tcp.rs @@ -175,14 +175,6 @@ impl AsyncRead for TcpStream { ) -> Poll> { self.inner.as_mut().poll_read(cx, buf) } - - fn poll_vectored_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - vec: &mut [&mut IoVec], - ) -> Poll> { - self.inner.as_mut().poll_vectored_read(cx, vec) - } } impl AsyncWrite for TcpStream { @@ -201,14 +193,6 @@ impl AsyncWrite for TcpStream { fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.inner.as_mut().poll_close(cx) } - - fn poll_vectored_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - vec: &[&IoVec], - ) -> Poll> { - self.inner.as_mut().poll_vectored_write(cx, vec) - } } /// The future returned by [`TcpStream::connect`].