Skip to content

Commit 100594a

Browse files
committed
add an enum for IntoAsyncRead::into_inner() to return
This makes code that calls this method look far less mysterious, even if it is more verbose.
1 parent 0b77b4c commit 100594a

File tree

5 files changed

+67
-31
lines changed

5 files changed

+67
-31
lines changed

futures-util/src/stream/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ pub use self::try_stream::{
4545

4646
#[cfg(feature = "io")]
4747
#[cfg(feature = "std")]
48-
pub use self::try_stream::IntoAsyncRead;
48+
pub use self::try_stream::{IntoAsyncRead, IntoAsyncReadParts};
4949

5050
#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
5151
#[cfg(feature = "alloc")]

futures-util/src/stream/try_stream/into_async_read.rs

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,31 @@ enum ReadState<T: AsRef<[u8]>> {
3232
Eof,
3333
}
3434

35+
/// The parts returned by `IntoAsyncInner::into_inner()`.
36+
#[derive(Debug)]
37+
pub enum IntoAsyncReadParts<St>
38+
where
39+
St: TryStream<Error = Error> + Unpin,
40+
St::Ok: AsRef<[u8]>,
41+
{
42+
/// A chunk from the stream is currently buffered.
43+
PartiallyBuffered {
44+
/// The currently buffered chunk.
45+
chunk: St::Ok,
46+
/// The offset into `chunk` that would have been read from next.
47+
offset: usize,
48+
/// The rest of the stream.
49+
stream: St
50+
},
51+
/// No chunks from the stream are currently buffered.
52+
Pending {
53+
/// The rest of the stream.
54+
stream: St
55+
},
56+
/// No chunks remain from the stream.
57+
Eof,
58+
}
59+
3560
impl<St> IntoAsyncRead<St>
3661
where
3762
St: TryStream<Error = Error> + Unpin,
@@ -44,17 +69,18 @@ where
4469
}
4570
}
4671

47-
/// Return the underlying stream along with any stream elements that are currently buffered.
48-
///
49-
/// Returns `(None, stream)` if no stream elements are currently buffered.
50-
///
51-
/// Returns `(Some(element, offset), stream)` if an element is currently buffered, where
52-
/// `offset` is the current offset into `element` that would have been read from next.
53-
pub fn into_inner(self) -> (Option<(St::Ok, usize)>, St) {
72+
/// Return the underlying stream along with the currently-buffered chunk, if one is present.
73+
pub fn into_inner(self) -> IntoAsyncReadParts<St> {
5474
match self.state {
55-
ReadState::Ready { chunk, chunk_start } => (Some((chunk, chunk_start)), self.stream),
56-
ReadState::PendingChunk => (None, self.stream),
57-
ReadState::Eof => (None, self.stream),
75+
ReadState::Ready { chunk, chunk_start } => IntoAsyncReadParts::PartiallyBuffered {
76+
chunk,
77+
offset: chunk_start,
78+
stream: self.stream,
79+
},
80+
ReadState::PendingChunk => IntoAsyncReadParts::Pending {
81+
stream: self.stream,
82+
},
83+
ReadState::Eof => IntoAsyncReadParts::Eof,
5884
}
5985
}
6086
}

futures-util/src/stream/try_stream/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ mod into_async_read;
104104
#[cfg(feature = "io")]
105105
#[cfg(feature = "std")]
106106
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
107-
pub use self::into_async_read::IntoAsyncRead;
107+
pub use self::into_async_read::{IntoAsyncRead, IntoAsyncReadParts};
108108

109109
impl<S: ?Sized + TryStream> TryStreamExt for S {}
110110

futures/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,7 @@ pub mod stream {
488488
};
489489

490490
#[cfg(feature = "std")]
491-
pub use futures_util::stream::IntoAsyncRead;
491+
pub use futures_util::stream::{IntoAsyncRead, IntoAsyncReadParts};
492492
}
493493

494494
pub mod task {

futures/tests/stream_into_async_read.rs

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use core::pin::Pin;
22
use futures::future;
33
use futures::io::{AsyncRead, AsyncBufRead};
4-
use futures::stream::{self, StreamExt, TryStreamExt};
4+
use futures::stream::{self, IntoAsyncReadParts, StreamExt, TryStreamExt};
55
use futures::task::Poll;
66
use futures_test::{task::noop_context, stream::StreamTestExt};
77

@@ -106,14 +106,21 @@ fn test_into_async_read_into_inner() {
106106
assert_eq!(&buf, &[1, 2, 3]);
107107

108108
// turn the reader into its inner parts, which we can inspect
109-
let (first, rest) = reader.into_inner();
110-
let (mut chunk, offset) = first.expect(".into_inner() called in the middle of a chunk");
111-
assert_eq!(&chunk, &[1, 2, 3, 4, 5]);
112-
assert_eq!(offset, 3);
113-
114-
// package the stream back up by splitting off the chunk according to the offset we got back
115-
let stream = stream::once(future::ready(Ok(chunk.split_off(offset)))).chain(rest);
116-
let mut reader = stream.into_async_read();
109+
let mut reader = if let IntoAsyncReadParts::PartiallyBuffered {
110+
mut chunk,
111+
offset,
112+
stream: rest,
113+
} = reader.into_inner()
114+
{
115+
assert_eq!(&chunk, &[1, 2, 3, 4, 5]);
116+
assert_eq!(offset, 3);
117+
118+
// package the stream back up by splitting off the chunk according to the offset we got back
119+
let stream = stream::once(future::ready(Ok(chunk.split_off(offset)))).chain(rest);
120+
stream.into_async_read()
121+
} else {
122+
panic!("reader should have a partial buffer");
123+
};
117124

118125
// resume reading as normal
119126
assert_read!(reader, &mut buf, 2);
@@ -125,12 +132,12 @@ fn test_into_async_read_into_inner() {
125132
assert_read!(reader, &mut buf, 2);
126133
assert_eq!(&buf[..2], &[4, 5]);
127134

128-
// turn the reader into its inner parts again, this time on a chunk boundary
129-
let (first, rest) = reader.into_inner();
130-
assert!(first.is_none());
131-
132-
// package the stream back up and resume reading as normal
133-
let mut reader = rest.into_async_read();
135+
let mut reader = if let IntoAsyncReadParts::Pending { stream: rest } = reader.into_inner() {
136+
// package the stream back up and resume reading as normal
137+
rest.into_async_read()
138+
} else {
139+
panic!("reader should have no partial buffer on a chunk boundary");
140+
};
134141

135142
assert_read!(reader, &mut buf, 3);
136143
assert_eq!(&buf, &[1, 2, 3]);
@@ -140,7 +147,10 @@ fn test_into_async_read_into_inner() {
140147

141148
assert_read!(reader, &mut buf, 0);
142149

143-
// at the end of the stream, there should be no element buffered in the reader
144-
let (first, _end) = reader.into_inner();
145-
assert!(first.is_none());
150+
// at the end of the stream, we should see Eof
151+
if let IntoAsyncReadParts::Eof = reader.into_inner() {
152+
// succeed
153+
} else {
154+
panic!("reader should be at eof");
155+
}
146156
}

0 commit comments

Comments
 (0)