diff --git a/Cargo.lock b/Cargo.lock index 526cda1a..e940b137 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -34,6 +34,7 @@ dependencies = [ "bzip2", "flate2", "futures", + "futures-channel", "futures-core", "futures-io", "futures-test", @@ -216,9 +217,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.19" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba3dda0b6588335f360afc675d0564c17a77a2bda81ca178a4b6081bd86c7f0b" +checksum = "30bdd20c28fadd505d0fd6712cdfcb0d4b5648baf45faef7f852afb2399bb050" dependencies = [ "futures-core", "futures-sink", @@ -226,9 +227,9 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.19" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0c8ff0461b82559810cdccfde3215c3f373807f5e5232b71479bff7bb2583d7" +checksum = "4e5aa3de05362c3fb88de6531e6296e85cde7739cccad4b9dfeeb7f6ebce56bf" [[package]] name = "futures-executor" @@ -243,15 +244,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.19" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1f9d34af5a1aac6fb380f735fe510746c38067c5bf16c7fd250280503c971b2" +checksum = "bbf4d2a7a308fd4578637c0b17c7e1c7ba127b8f6ba00b29f717e9655d85eb68" [[package]] name = "futures-macro" -version = "0.3.19" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6dbd947adfffb0efc70599b3ddcf7b5597bb5fa9e245eb99f62b3a5f7bb8bd3c" +checksum = "42cd15d1c7456c04dbdf7e88bcd69760d74f3a798d6444e16974b505b0e62f17" dependencies = [ "proc-macro2 1.0.36", "quote 1.0.15", @@ -260,26 +261,25 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.19" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3055baccb68d74ff6480350f8d6eb8fcfa3aa11bdc1a1ae3afdd0514617d508" +checksum = "21b20ba5a92e727ba30e72834706623d94ac93a725410b6a6b6fbc1b07f7ba56" [[package]] name = "futures-task" -version = "0.3.19" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ee7c6485c30167ce4dfb83ac568a849fe53274c831081476ee13e0dce1aad72" +checksum = "a6508c467c73851293f390476d4491cf4d227dbabcd4170f3bb6044959b294f1" [[package]] name = "futures-test" -version = "0.3.19" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e741bc851e1e90ad08901b329389ae77e02d5e9a0ec61955b80834630fbdc2f" +checksum = "e77baeade98824bc928c21b8ad39918b9d8a06745ebdb6e2c93fb7673fb7968d" dependencies = [ "futures-core", "futures-executor", "futures-io", - "futures-macro", "futures-sink", "futures-task", "futures-util", @@ -289,9 +289,9 @@ dependencies = [ [[package]] name = "futures-util" -version = "0.3.19" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9b5cf40b47a271f77a8b1bec03ca09044d99d2372c0de244e66430761127164" +checksum = "44fb6cb1be61cc1d2e43b262516aafcf63b241cffdb1d3fa115f91d9c7b09c90" dependencies = [ "futures-channel", "futures-core", @@ -725,7 +725,7 @@ dependencies = [ "memchr", "pin-project-lite 0.1.12", "slab", - "tokio-macros", + "tokio-macros 0.2.6", ] [[package]] @@ -750,6 +750,7 @@ dependencies = [ "bytes 1.1.0", "memchr", "pin-project-lite 0.2.8", + "tokio-macros 1.8.0", ] [[package]] @@ -763,6 +764,17 @@ dependencies = [ "syn 1.0.86", ] +[[package]] +name = "tokio-macros" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9724f9a975fb987ef7a3cd9be0350edcbe130698af5b8f7a631e23d42d052484" +dependencies = [ + "proc-macro2 1.0.36", + "quote 1.0.15", + "syn 1.0.86", +] + [[package]] name = "tokio-util" version = "0.3.1" diff --git a/Cargo.toml b/Cargo.toml index 8e9b0823..e17897e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,6 +56,7 @@ proptest = "1.0.0" proptest-derive = "0.3.0" rand = "0.8.5" futures = "0.3.5" +futures-channel = "0.3.24" futures-test = "0.3.5" ntest = "0.8.1" bytes-05 = { package = "bytes", version = "0.5.0" } @@ -63,7 +64,7 @@ bytes-06 = { package = "bytes", version = "0.6.0" } bytes = "1.0.0" tokio-02 = { package = "tokio", version = "0.2.21", default-features = false, features = ["io-util", "stream", "macros", "io-std"] } tokio-03 = { package = "tokio", version = "0.3.0", default-features = false, features = ["io-util", "stream"] } -tokio = { version = "1.0.0", default-features = false, features = ["io-util"] } +tokio = { version = "1.0.0", default-features = false, features = ["io-util", "macros", "rt", "time"] } tokio-util-03 = { package = "tokio-util", version = "0.3.0", default-features = false, features = ["codec"] } tokio-util-04 = { package = "tokio-util", version = "0.4.0", default-features = false, features = ["io"] } tokio-util-05 = { package = "tokio-util", version = "0.5.0", default-features = false, features = ["io"] } diff --git a/src/futures/flush.rs b/src/futures/flush.rs new file mode 100644 index 00000000..93653ec5 --- /dev/null +++ b/src/futures/flush.rs @@ -0,0 +1,74 @@ +//! Types related to [`AsyncFlush`](AsyncFlush) to wrap encoders + +use futures_core::Future; +use futures_core::Stream; +use futures_io::AsyncRead; +use pin_project_lite::pin_project; + +use super::bufread::Encoder; +use crate::codec::Encode; + +/// Flushes asynchronously +/// +/// `AsyncRead` and `AsyncBufRead` implementations may not have enough information +/// to know when to flush the data they have in store, so they can implement this +/// trait and let the caller decide when data should be flushed +pub trait AsyncFlush { + /// Attempts to flush in flight data from the `AsyncFlush` into `buf`. + /// + /// On success, returns `Poll::Ready(Ok(()))` and places data in the + /// unfilled portion of `buf`. If no data was read (`buf.filled().len()` is + /// unchanged), it implies that EOF has been reached. + /// + /// If no data is available for reading, the method returns `Poll::Pending` + /// and arranges for the current task (via `cx.waker()`) to receive a + /// notification when the object becomes readable or is closed. + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut [u8], + ) -> std::task::Poll>; +} + +pin_project! { + /// This structure wraps an `Encoder` implementing [`AsyncRead`](tokio::io::AsyncRead) to + /// allow the caller to flush its buffers. + pub struct FlushableEncoder> { + #[pin] + encoder: E, + #[pin] + receiver: Rx, + } +} + +impl> FlushableEncoder { + /// Creates a new `FlushableEncoder` from an existing `Encoder` and a Stream + /// + /// Whenever a message is received from the stream, the encoder will flushes its buffers + /// and compress them. + pub fn new(encoder: E, receiver: Rx) -> Self { + Self { encoder, receiver } + } +} + +impl> AsyncRead for FlushableEncoder { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut [u8], + ) -> std::task::Poll> { + let mut this = self.project(); + + match this.encoder.as_mut().poll_read(cx, buf) { + std::task::Poll::Ready(r) => std::task::Poll::Ready(r), + std::task::Poll::Pending => match this.receiver.as_mut().poll_next(cx) { + std::task::Poll::Pending => std::task::Poll::Pending, + std::task::Poll::Ready(_) => match this.encoder.poll_flush(cx, buf) { + std::task::Poll::Ready(Ok(sz)) => std::task::Poll::Ready(Ok(sz)), + std::task::Poll::Ready(Err(e)) => std::task::Poll::Ready(Err(e)), + std::task::Poll::Pending => std::task::Poll::Pending, + }, + }, + } + } +} diff --git a/src/futures/mod.rs b/src/futures/mod.rs index be7f7ed6..e2cc3b8a 100644 --- a/src/futures/mod.rs +++ b/src/futures/mod.rs @@ -1,4 +1,5 @@ //! Implementations for IO traits exported by `futures`. pub mod bufread; +pub mod flush; pub mod write; diff --git a/src/tokio/bufread/generic/encoder.rs b/src/tokio/bufread/generic/encoder.rs index a80fa122..a47143ae 100644 --- a/src/tokio/bufread/generic/encoder.rs +++ b/src/tokio/bufread/generic/encoder.rs @@ -4,7 +4,7 @@ use core::{ }; use std::io::Result; -use crate::{codec::Encode, util::PartialBuffer}; +use crate::{codec::Encode, tokio::flush::AsyncFlush, util::PartialBuffer}; use futures_core::ready; use pin_project_lite::pin_project; use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf}; @@ -115,3 +115,23 @@ impl AsyncRead for Encoder { } } } + +impl AsyncFlush for Encoder { + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let mut output = PartialBuffer::new(buf.initialize_unfilled()); + let mut this = self.project(); + + match this.encoder.flush(&mut output)? { + true => { + let len = output.written().len(); + buf.advance(len); + Poll::Ready(Ok(true)) + } + false => Poll::Ready(Ok(false)), + } + } +} diff --git a/src/tokio/bufread/macros/encoder.rs b/src/tokio/bufread/macros/encoder.rs index 44c8b595..5e4b444b 100644 --- a/src/tokio/bufread/macros/encoder.rs +++ b/src/tokio/bufread/macros/encoder.rs @@ -62,6 +62,16 @@ macro_rules! encoder { } } + impl<$inner: tokio::io::AsyncBufRead> crate::tokio::flush::AsyncFlush for $name<$inner> { + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + self.project().inner.poll_flush(cx, buf) + } + } + const _: () = { fn _assert() { use crate::util::{_assert_send, _assert_sync}; diff --git a/src/tokio/flush.rs b/src/tokio/flush.rs new file mode 100644 index 00000000..3044e2b1 --- /dev/null +++ b/src/tokio/flush.rs @@ -0,0 +1,75 @@ +//! Types related to [`AsyncFlush`](AsyncFlush) to wrap encoders + +use futures_core::Future; +use futures_core::Stream; +use pin_project_lite::pin_project; +use tokio::io::{AsyncBufRead, AsyncRead}; + +use super::bufread::Encoder; +use crate::codec::Encode; + +/// Flushes asynchronously +/// +/// `AsyncRead` and `AsyncBufRead` implementations may not have enough information +/// to know when to flush the data they have in store, so they can implement this +/// trait and let the caller decide when data should be flushed +pub trait AsyncFlush { + /// Attempts to flush in flight data from the `AsyncFlush` into `buf`. + /// + /// On success, returns `Poll::Ready(Ok(()))` and places data in the + /// unfilled portion of `buf`. If no data was read (`buf.filled().len()` is + /// unchanged), it implies that EOF has been reached. + /// + /// If no data is available for reading, the method returns `Poll::Pending` + /// and arranges for the current task (via `cx.waker()`) to receive a + /// notification when the object becomes readable or is closed. + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll>; +} + +pin_project! { + /// This structure wraps an `Encoder` implementing [`AsyncRead`](tokio::io::AsyncRead) to + /// allow the caller to flush its buffers. + pub struct FlushableEncoder> { + #[pin] + encoder: E, + #[pin] + receiver: Rx, + } +} + +impl> FlushableEncoder { + /// Creates a new `FlushableEncoder` from an existing `Encoder` and a Stream + /// + /// Whenever a message is received from the stream, the encoder will flushes its buffers + /// and compress them. + pub fn new(encoder: E, receiver: Rx) -> Self { + Self { encoder, receiver } + } +} + +impl> AsyncRead for FlushableEncoder { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + let mut this = self.project(); + + match this.encoder.as_mut().poll_read(cx, buf) { + std::task::Poll::Ready(r) => std::task::Poll::Ready(r), + std::task::Poll::Pending => match this.receiver.as_mut().poll_next(cx) { + std::task::Poll::Pending => std::task::Poll::Pending, + std::task::Poll::Ready(_) => match this.encoder.poll_flush(cx, buf) { + std::task::Poll::Ready(Ok(true)) => std::task::Poll::Ready(Ok(())), + std::task::Poll::Ready(Ok(false)) => std::task::Poll::Pending, + std::task::Poll::Ready(Err(e)) => std::task::Poll::Ready(Err(e)), + std::task::Poll::Pending => std::task::Poll::Pending, + }, + }, + } + } +} diff --git a/src/tokio/mod.rs b/src/tokio/mod.rs index 8eba9add..fe2960a3 100644 --- a/src/tokio/mod.rs +++ b/src/tokio/mod.rs @@ -1,4 +1,5 @@ //! Implementations for IO traits exported by [`tokio` v1.0](::tokio). pub mod bufread; +pub mod flush; pub mod write; diff --git a/tests/utils/test_cases.rs b/tests/utils/test_cases.rs index fc61d78a..6857c079 100644 --- a/tests/utils/test_cases.rs +++ b/tests/utils/test_cases.rs @@ -438,6 +438,73 @@ macro_rules! io_test_cases { assert_eq!(output, bytes); } + + use crate::utils::algos::$variant::tokio::bufread::Encoder; + + use std::time::Duration; + + use async_compression::tokio::flush::FlushableEncoder; + use futures::SinkExt; + use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + time, + }; + + #[tokio::test] + //#[ntest::timeout(1000)] + async fn flushable_encoder() { + let (mut client, server) = tokio::io::duplex(1024); + tokio::task::spawn(async move { + loop { + client + .write_all( + &std::iter::repeat(b'A').take(256).collect::>(), + ) + .await + .unwrap(); + println!("sent data: 256 bytes"); + time::sleep(Duration::from_millis(100)).await; + } + }); + + let (mut tx, rx) = futures_channel::mpsc::channel(1); + let encoder = Encoder::new(tokio::io::BufReader::new(server)); + //if this is commented out, the test will fail + let mut encoder = Box::pin(FlushableEncoder::new(encoder, rx)); + + let mut buf = std::iter::repeat(0u8).take(1024).collect::>(); + + tokio::task::spawn(async move { + loop { + tokio::time::sleep(Duration::from_millis(250)).await; + tx.send(()).await.unwrap(); + } + }); + let start = std::time::Instant::now(); + let mut counter = 0usize; + println!("start"); + loop { + let read = encoder.read(&mut buf); + match time::timeout(Duration::from_secs(5), read).await { + Err(e) => { + panic!("{}ms | timeout: {:?}", start.elapsed().as_millis(), e); + } + + Ok(res) => { + println!( + "{}ms | received data: {:?}", + start.elapsed().as_millis(), + res + ); + + counter += 1; + if counter == 10 { + break; + } + } + } + } + } } } }