From 0b77b4c81d53e91765c79fa6b02fccb7a90a0af4 Mon Sep 17 00:00:00 2001 From: "Adam C. Foltzer" Date: Fri, 6 Dec 2019 14:44:38 -0800 Subject: [PATCH 1/2] Add `IntoAsyncRead::into_inner()` method and test This is useful when you don't necessarily want to consume the entire stream as a reader, but still want to retain the stream for other uses. --- .../src/stream/try_stream/into_async_read.rs | 14 +++++ futures/tests/stream_into_async_read.rs | 52 ++++++++++++++++++- 2 files changed, 65 insertions(+), 1 deletion(-) diff --git a/futures-util/src/stream/try_stream/into_async_read.rs b/futures-util/src/stream/try_stream/into_async_read.rs index 71bbfd1c72..a1a25db04a 100644 --- a/futures-util/src/stream/try_stream/into_async_read.rs +++ b/futures-util/src/stream/try_stream/into_async_read.rs @@ -43,6 +43,20 @@ where state: ReadState::PendingChunk, } } + + /// Return the underlying stream along with any stream elements that are currently buffered. + /// + /// Returns `(None, stream)` if no stream elements are currently buffered. + /// + /// Returns `(Some(element, offset), stream)` if an element is currently buffered, where + /// `offset` is the current offset into `element` that would have been read from next. + pub fn into_inner(self) -> (Option<(St::Ok, usize)>, St) { + match self.state { + ReadState::Ready { chunk, chunk_start } => (Some((chunk, chunk_start)), self.stream), + ReadState::PendingChunk => (None, self.stream), + ReadState::Eof => (None, self.stream), + } + } } impl AsyncRead for IntoAsyncRead diff --git a/futures/tests/stream_into_async_read.rs b/futures/tests/stream_into_async_read.rs index c528af03f0..4038b6ba33 100644 --- a/futures/tests/stream_into_async_read.rs +++ b/futures/tests/stream_into_async_read.rs @@ -1,6 +1,7 @@ use core::pin::Pin; +use futures::future; use futures::io::{AsyncRead, AsyncBufRead}; -use futures::stream::{self, TryStreamExt}; +use futures::stream::{self, StreamExt, TryStreamExt}; use futures::task::Poll; use futures_test::{task::noop_context, stream::StreamTestExt}; @@ -94,3 +95,52 @@ fn test_into_async_bufread() -> std::io::Result<()> { Ok(()) } + +#[test] +fn test_into_async_read_into_inner() { + let stream = stream::iter((1..=3).flat_map(|_| vec![Ok(vec![]), Ok(vec![1, 2, 3, 4, 5])])); + let mut reader = stream.interleave_pending().into_async_read(); + let mut buf = vec![0; 3]; + + assert_read!(reader, &mut buf, 3); + assert_eq!(&buf, &[1, 2, 3]); + + // turn the reader into its inner parts, which we can inspect + let (first, rest) = reader.into_inner(); + let (mut chunk, offset) = first.expect(".into_inner() called in the middle of a chunk"); + assert_eq!(&chunk, &[1, 2, 3, 4, 5]); + assert_eq!(offset, 3); + + // package the stream back up by splitting off the chunk according to the offset we got back + let stream = stream::once(future::ready(Ok(chunk.split_off(offset)))).chain(rest); + let mut reader = stream.into_async_read(); + + // resume reading as normal + assert_read!(reader, &mut buf, 2); + assert_eq!(&buf[..2], &[4, 5]); + + assert_read!(reader, &mut buf, 3); + assert_eq!(&buf, &[1, 2, 3]); + + assert_read!(reader, &mut buf, 2); + assert_eq!(&buf[..2], &[4, 5]); + + // turn the reader into its inner parts again, this time on a chunk boundary + let (first, rest) = reader.into_inner(); + assert!(first.is_none()); + + // package the stream back up and resume reading as normal + let mut reader = rest.into_async_read(); + + assert_read!(reader, &mut buf, 3); + assert_eq!(&buf, &[1, 2, 3]); + + assert_read!(reader, &mut buf, 2); + assert_eq!(&buf[..2], &[4, 5]); + + assert_read!(reader, &mut buf, 0); + + // at the end of the stream, there should be no element buffered in the reader + let (first, _end) = reader.into_inner(); + assert!(first.is_none()); +} From 100594a73f1f4b11d58a534aec12e3eebef78a0b Mon Sep 17 00:00:00 2001 From: "Adam C. Foltzer" Date: Fri, 6 Dec 2019 15:56:22 -0800 Subject: [PATCH 2/2] add an enum for `IntoAsyncRead::into_inner()` to return This makes code that calls this method look far less mysterious, even if it is more verbose. --- futures-util/src/stream/mod.rs | 2 +- .../src/stream/try_stream/into_async_read.rs | 46 +++++++++++++++---- futures-util/src/stream/try_stream/mod.rs | 2 +- futures/src/lib.rs | 2 +- futures/tests/stream_into_async_read.rs | 46 +++++++++++-------- 5 files changed, 67 insertions(+), 31 deletions(-) diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index 67bb39962a..6863dde746 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -45,7 +45,7 @@ pub use self::try_stream::{ #[cfg(feature = "io")] #[cfg(feature = "std")] -pub use self::try_stream::IntoAsyncRead; +pub use self::try_stream::{IntoAsyncRead, IntoAsyncReadParts}; #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] diff --git a/futures-util/src/stream/try_stream/into_async_read.rs b/futures-util/src/stream/try_stream/into_async_read.rs index a1a25db04a..1b3a888b02 100644 --- a/futures-util/src/stream/try_stream/into_async_read.rs +++ b/futures-util/src/stream/try_stream/into_async_read.rs @@ -32,6 +32,31 @@ enum ReadState> { Eof, } +/// The parts returned by `IntoAsyncInner::into_inner()`. +#[derive(Debug)] +pub enum IntoAsyncReadParts +where + St: TryStream + Unpin, + St::Ok: AsRef<[u8]>, +{ + /// A chunk from the stream is currently buffered. + PartiallyBuffered { + /// The currently buffered chunk. + chunk: St::Ok, + /// The offset into `chunk` that would have been read from next. + offset: usize, + /// The rest of the stream. + stream: St + }, + /// No chunks from the stream are currently buffered. + Pending { + /// The rest of the stream. + stream: St + }, + /// No chunks remain from the stream. + Eof, +} + impl IntoAsyncRead where St: TryStream + Unpin, @@ -44,17 +69,18 @@ where } } - /// Return the underlying stream along with any stream elements that are currently buffered. - /// - /// Returns `(None, stream)` if no stream elements are currently buffered. - /// - /// Returns `(Some(element, offset), stream)` if an element is currently buffered, where - /// `offset` is the current offset into `element` that would have been read from next. - pub fn into_inner(self) -> (Option<(St::Ok, usize)>, St) { + /// Return the underlying stream along with the currently-buffered chunk, if one is present. + pub fn into_inner(self) -> IntoAsyncReadParts { match self.state { - ReadState::Ready { chunk, chunk_start } => (Some((chunk, chunk_start)), self.stream), - ReadState::PendingChunk => (None, self.stream), - ReadState::Eof => (None, self.stream), + ReadState::Ready { chunk, chunk_start } => IntoAsyncReadParts::PartiallyBuffered { + chunk, + offset: chunk_start, + stream: self.stream, + }, + ReadState::PendingChunk => IntoAsyncReadParts::Pending { + stream: self.stream, + }, + ReadState::Eof => IntoAsyncReadParts::Eof, } } } diff --git a/futures-util/src/stream/try_stream/mod.rs b/futures-util/src/stream/try_stream/mod.rs index 6a7ced4f8c..6946e9b6cc 100644 --- a/futures-util/src/stream/try_stream/mod.rs +++ b/futures-util/src/stream/try_stream/mod.rs @@ -104,7 +104,7 @@ mod into_async_read; #[cfg(feature = "io")] #[cfg(feature = "std")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::into_async_read::IntoAsyncRead; +pub use self::into_async_read::{IntoAsyncRead, IntoAsyncReadParts}; impl TryStreamExt for S {} diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 84e2a3f9e3..d723e58311 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -488,7 +488,7 @@ pub mod stream { }; #[cfg(feature = "std")] - pub use futures_util::stream::IntoAsyncRead; + pub use futures_util::stream::{IntoAsyncRead, IntoAsyncReadParts}; } pub mod task { diff --git a/futures/tests/stream_into_async_read.rs b/futures/tests/stream_into_async_read.rs index 4038b6ba33..f8bb8a03b6 100644 --- a/futures/tests/stream_into_async_read.rs +++ b/futures/tests/stream_into_async_read.rs @@ -1,7 +1,7 @@ use core::pin::Pin; use futures::future; use futures::io::{AsyncRead, AsyncBufRead}; -use futures::stream::{self, StreamExt, TryStreamExt}; +use futures::stream::{self, IntoAsyncReadParts, StreamExt, TryStreamExt}; use futures::task::Poll; use futures_test::{task::noop_context, stream::StreamTestExt}; @@ -106,14 +106,21 @@ fn test_into_async_read_into_inner() { assert_eq!(&buf, &[1, 2, 3]); // turn the reader into its inner parts, which we can inspect - let (first, rest) = reader.into_inner(); - let (mut chunk, offset) = first.expect(".into_inner() called in the middle of a chunk"); - assert_eq!(&chunk, &[1, 2, 3, 4, 5]); - assert_eq!(offset, 3); - - // package the stream back up by splitting off the chunk according to the offset we got back - let stream = stream::once(future::ready(Ok(chunk.split_off(offset)))).chain(rest); - let mut reader = stream.into_async_read(); + let mut reader = if let IntoAsyncReadParts::PartiallyBuffered { + mut chunk, + offset, + stream: rest, + } = reader.into_inner() + { + assert_eq!(&chunk, &[1, 2, 3, 4, 5]); + assert_eq!(offset, 3); + + // package the stream back up by splitting off the chunk according to the offset we got back + let stream = stream::once(future::ready(Ok(chunk.split_off(offset)))).chain(rest); + stream.into_async_read() + } else { + panic!("reader should have a partial buffer"); + }; // resume reading as normal assert_read!(reader, &mut buf, 2); @@ -125,12 +132,12 @@ fn test_into_async_read_into_inner() { assert_read!(reader, &mut buf, 2); assert_eq!(&buf[..2], &[4, 5]); - // turn the reader into its inner parts again, this time on a chunk boundary - let (first, rest) = reader.into_inner(); - assert!(first.is_none()); - - // package the stream back up and resume reading as normal - let mut reader = rest.into_async_read(); + let mut reader = if let IntoAsyncReadParts::Pending { stream: rest } = reader.into_inner() { + // package the stream back up and resume reading as normal + rest.into_async_read() + } else { + panic!("reader should have no partial buffer on a chunk boundary"); + }; assert_read!(reader, &mut buf, 3); assert_eq!(&buf, &[1, 2, 3]); @@ -140,7 +147,10 @@ fn test_into_async_read_into_inner() { assert_read!(reader, &mut buf, 0); - // at the end of the stream, there should be no element buffered in the reader - let (first, _end) = reader.into_inner(); - assert!(first.is_none()); + // at the end of the stream, we should see Eof + if let IntoAsyncReadParts::Eof = reader.into_inner() { + // succeed + } else { + panic!("reader should be at eof"); + } }