Skip to content

Add IntoAsyncRead::into_inner() method and test #1997

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion futures-util/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub use self::try_stream::{

#[cfg(feature = "io")]
#[cfg(feature = "std")]
pub use self::try_stream::IntoAsyncRead;
pub use self::try_stream::{IntoAsyncRead, IntoAsyncReadParts};

#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
Expand Down
40 changes: 40 additions & 0 deletions futures-util/src/stream/try_stream/into_async_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,31 @@ enum ReadState<T: AsRef<[u8]>> {
Eof,
}

/// The parts returned by `IntoAsyncInner::into_inner()`.
#[derive(Debug)]
pub enum IntoAsyncReadParts<St>
where
St: TryStream<Error = Error> + Unpin,
St::Ok: AsRef<[u8]>,
{
/// A chunk from the stream is currently buffered.
PartiallyBuffered {
/// The currently buffered chunk.
chunk: St::Ok,
/// The offset into `chunk` that would have been read from next.
offset: usize,
/// The rest of the stream.
stream: St
},
/// No chunks from the stream are currently buffered.
Pending {
/// The rest of the stream.
stream: St
},
/// No chunks remain from the stream.
Eof,
}

impl<St> IntoAsyncRead<St>
where
St: TryStream<Error = Error> + Unpin,
Expand All @@ -43,6 +68,21 @@ where
state: ReadState::PendingChunk,
}
}

/// Return the underlying stream along with the currently-buffered chunk, if one is present.
pub fn into_inner(self) -> IntoAsyncReadParts<St> {
match self.state {
ReadState::Ready { chunk, chunk_start } => IntoAsyncReadParts::PartiallyBuffered {
chunk,
offset: chunk_start,
stream: self.stream,
},
ReadState::PendingChunk => IntoAsyncReadParts::Pending {
stream: self.stream,
},
ReadState::Eof => IntoAsyncReadParts::Eof,
}
}
}

impl<St> AsyncRead for IntoAsyncRead<St>
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/try_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ mod into_async_read;
#[cfg(feature = "io")]
#[cfg(feature = "std")]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::into_async_read::IntoAsyncRead;
pub use self::into_async_read::{IntoAsyncRead, IntoAsyncReadParts};

impl<S: ?Sized + TryStream> TryStreamExt for S {}

Expand Down
2 changes: 1 addition & 1 deletion futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ pub mod stream {
};

#[cfg(feature = "std")]
pub use futures_util::stream::IntoAsyncRead;
pub use futures_util::stream::{IntoAsyncRead, IntoAsyncReadParts};
}

pub mod task {
Expand Down
62 changes: 61 additions & 1 deletion futures/tests/stream_into_async_read.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use core::pin::Pin;
use futures::future;
use futures::io::{AsyncRead, AsyncBufRead};
use futures::stream::{self, TryStreamExt};
use futures::stream::{self, IntoAsyncReadParts, StreamExt, TryStreamExt};
use futures::task::Poll;
use futures_test::{task::noop_context, stream::StreamTestExt};

Expand Down Expand Up @@ -94,3 +95,62 @@ fn test_into_async_bufread() -> std::io::Result<()> {

Ok(())
}

#[test]
fn test_into_async_read_into_inner() {
let stream = stream::iter((1..=3).flat_map(|_| vec![Ok(vec![]), Ok(vec![1, 2, 3, 4, 5])]));
let mut reader = stream.interleave_pending().into_async_read();
let mut buf = vec![0; 3];

assert_read!(reader, &mut buf, 3);
assert_eq!(&buf, &[1, 2, 3]);

// turn the reader into its inner parts, which we can inspect
let mut reader = if let IntoAsyncReadParts::PartiallyBuffered {
mut chunk,
offset,
stream: rest,
} = reader.into_inner()
{
assert_eq!(&chunk, &[1, 2, 3, 4, 5]);
assert_eq!(offset, 3);

// package the stream back up by splitting off the chunk according to the offset we got back
let stream = stream::once(future::ready(Ok(chunk.split_off(offset)))).chain(rest);
stream.into_async_read()
} else {
panic!("reader should have a partial buffer");
};

// resume reading as normal
assert_read!(reader, &mut buf, 2);
assert_eq!(&buf[..2], &[4, 5]);

assert_read!(reader, &mut buf, 3);
assert_eq!(&buf, &[1, 2, 3]);

assert_read!(reader, &mut buf, 2);
assert_eq!(&buf[..2], &[4, 5]);

let mut reader = if let IntoAsyncReadParts::Pending { stream: rest } = reader.into_inner() {
// package the stream back up and resume reading as normal
rest.into_async_read()
} else {
panic!("reader should have no partial buffer on a chunk boundary");
};

assert_read!(reader, &mut buf, 3);
assert_eq!(&buf, &[1, 2, 3]);

assert_read!(reader, &mut buf, 2);
assert_eq!(&buf[..2], &[4, 5]);

assert_read!(reader, &mut buf, 0);

// at the end of the stream, we should see Eof
if let IntoAsyncReadParts::Eof = reader.into_inner() {
// succeed
} else {
panic!("reader should be at eof");
}
}