Skip to content

Commit 4613193

Browse files
taiki-ecramertj
authored andcommitted
Add Stream::size_hint
1 parent 90c83b8 commit 4613193

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+425
-34
lines changed

futures-core/src/future/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ mod private_try_future {
5757

5858
/// A convenience for futures that return `Result` values that includes
5959
/// a variety of adapters tailored to such futures.
60-
pub trait TryFuture: private_try_future::Sealed {
60+
pub trait TryFuture: Future + private_try_future::Sealed {
6161
/// The type of successful values yielded by this future
6262
type Ok;
6363

futures-core/src/stream.rs

+63-11
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,37 @@ pub trait Stream {
5959
self: Pin<&mut Self>,
6060
cx: &mut Context<'_>,
6161
) -> Poll<Option<Self::Item>>;
62+
63+
/// Returns the bounds on the remaining length of the stream.
64+
///
65+
/// Specifically, `size_hint()` returns a tuple where the first element
66+
/// is the lower bound, and the second element is the upper bound.
67+
///
68+
/// The second half of the tuple that is returned is an [`Option`]`<`[`usize`]`>`.
69+
/// A [`None`] here means that either there is no known upper bound, or the
70+
/// upper bound is larger than [`usize`].
71+
///
72+
/// # Implementation notes
73+
///
74+
/// It is not enforced that a stream implementation yields the declared
75+
/// number of elements. A buggy stream may yield less than the lower bound
76+
/// or more than the upper bound of elements.
77+
///
78+
/// `size_hint()` is primarily intended to be used for optimizations such as
79+
/// reserving space for the elements of the stream, but must not be
80+
/// trusted to e.g., omit bounds checks in unsafe code. An incorrect
81+
/// implementation of `size_hint()` should not lead to memory safety
82+
/// violations.
83+
///
84+
/// That said, the implementation should provide a correct estimation,
85+
/// because otherwise it would be a violation of the trait's protocol.
86+
///
87+
/// The default implementation returns `(0, `[`None`]`)` which is correct for any
88+
/// stream.
89+
#[inline]
90+
fn size_hint(&self) -> (usize, Option<usize>) {
91+
(0, None)
92+
}
6293
}
6394

6495
impl<S: ?Sized + Stream + Unpin> Stream for &mut S {
@@ -70,6 +101,10 @@ impl<S: ?Sized + Stream + Unpin> Stream for &mut S {
70101
) -> Poll<Option<Self::Item>> {
71102
S::poll_next(Pin::new(&mut **self), cx)
72103
}
104+
105+
fn size_hint(&self) -> (usize, Option<usize>) {
106+
(**self).size_hint()
107+
}
73108
}
74109

75110
impl<P> Stream for Pin<P>
@@ -85,6 +120,10 @@ where
85120
) -> Poll<Option<Self::Item>> {
86121
self.get_mut().as_mut().poll_next(cx)
87122
}
123+
124+
fn size_hint(&self) -> (usize, Option<usize>) {
125+
(**self).size_hint()
126+
}
88127
}
89128

90129
/// A stream which tracks whether or not the underlying stream
@@ -126,7 +165,7 @@ mod private_try_stream {
126165

127166
/// A convenience for streams that return `Result` values that includes
128167
/// a variety of adapters tailored to such futures.
129-
pub trait TryStream: private_try_stream::Sealed {
168+
pub trait TryStream: Stream + private_try_stream::Sealed {
130169
/// The type of successful values yielded by this future
131170
type Ok;
132171

@@ -169,10 +208,30 @@ mod if_alloc {
169208
) -> Poll<Option<Self::Item>> {
170209
Pin::new(&mut **self).poll_next(cx)
171210
}
211+
212+
fn size_hint(&self) -> (usize, Option<usize>) {
213+
(**self).size_hint()
214+
}
215+
}
216+
217+
impl<T: Unpin> Stream for alloc::collections::VecDeque<T> {
218+
type Item = T;
219+
220+
fn poll_next(
221+
mut self: Pin<&mut Self>,
222+
_cx: &mut Context<'_>,
223+
) -> Poll<Option<Self::Item>> {
224+
Poll::Ready(self.pop_front())
225+
}
226+
227+
fn size_hint(&self) -> (usize, Option<usize>) {
228+
let len = self.len();
229+
(len, Some(len))
230+
}
172231
}
173232

174233
#[cfg(feature = "std")]
175-
impl<S: Stream> Stream for ::std::panic::AssertUnwindSafe<S> {
234+
impl<S: Stream> Stream for std::panic::AssertUnwindSafe<S> {
176235
type Item = S::Item;
177236

178237
fn poll_next(
@@ -181,16 +240,9 @@ mod if_alloc {
181240
) -> Poll<Option<S::Item>> {
182241
unsafe { self.map_unchecked_mut(|x| &mut x.0) }.poll_next(cx)
183242
}
184-
}
185-
186-
impl<T: Unpin> Stream for ::alloc::collections::VecDeque<T> {
187-
type Item = T;
188243

189-
fn poll_next(
190-
mut self: Pin<&mut Self>,
191-
_cx: &mut Context<'_>,
192-
) -> Poll<Option<Self::Item>> {
193-
Poll::Ready(self.pop_front())
244+
fn size_hint(&self) -> (usize, Option<usize>) {
245+
self.0.size_hint()
194246
}
195247
}
196248

futures-executor/src/local_pool.rs

+5
Original file line numberDiff line numberDiff line change
@@ -329,9 +329,14 @@ impl<S: Stream + Unpin> BlockingStream<S> {
329329

330330
impl<S: Stream + Unpin> Iterator for BlockingStream<S> {
331331
type Item = S::Item;
332+
332333
fn next(&mut self) -> Option<Self::Item> {
333334
LocalPool::new().run_until(self.stream.next())
334335
}
336+
337+
fn size_hint(&self) -> (usize, Option<usize>) {
338+
self.stream.size_hint()
339+
}
335340
}
336341

337342
impl Spawn for LocalSpawner {

futures-test/src/interleave_pending.rs

+4
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ impl<St: Stream> Stream for InterleavePending<St> {
104104
Poll::Pending
105105
}
106106
}
107+
108+
fn size_hint(&self) -> (usize, Option<usize>) {
109+
self.inner.size_hint()
110+
}
107111
}
108112

109113
impl<W: AsyncWrite> AsyncWrite for InterleavePending<W> {
+11-10
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::stream::{self, Once};
12
use core::pin::Pin;
23
use futures_core::future::Future;
34
use futures_core::stream::Stream;
@@ -8,29 +9,29 @@ use pin_utils::unsafe_pinned;
89
#[must_use = "streams do nothing unless polled"]
910
#[derive(Debug)]
1011
pub struct IntoStream<Fut: Future> {
11-
future: Option<Fut>
12+
inner: Once<Fut>
1213
}
1314

1415
impl<Fut: Future> IntoStream<Fut> {
15-
unsafe_pinned!(future: Option<Fut>);
16+
unsafe_pinned!(inner: Once<Fut>);
1617

1718
pub(super) fn new(future: Fut) -> IntoStream<Fut> {
1819
IntoStream {
19-
future: Some(future)
20+
inner: stream::once(future)
2021
}
2122
}
2223
}
2324

2425
impl<Fut: Future> Stream for IntoStream<Fut> {
2526
type Item = Fut::Output;
2627

27-
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
28-
let v = match self.as_mut().future().as_pin_mut() {
29-
Some(fut) => ready!(fut.poll(cx)),
30-
None => return Poll::Ready(None),
31-
};
28+
#[inline]
29+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
30+
self.inner().poll_next(cx)
31+
}
3232

33-
self.as_mut().future().set(None);
34-
Poll::Ready(Some(v))
33+
#[inline]
34+
fn size_hint(&self) -> (usize, Option<usize>) {
35+
self.inner.size_hint()
3536
}
3637
}

futures-util/src/sink/buffer.rs

+4
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ impl<S, Item> Stream for Buffer<S, Item> where S: Sink<Item> + Stream {
7676
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
7777
self.sink().poll_next(cx)
7878
}
79+
80+
fn size_hint(&self) -> (usize, Option<usize>) {
81+
self.sink.size_hint()
82+
}
7983
}
8084

8185
impl<S, Item> FusedStream for Buffer<S, Item> where S: Sink<Item> + FusedStream {

futures-util/src/sink/err_into.rs

+4
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ impl<S, Item, E> Stream for SinkErrInto<S, Item, E>
7070
) -> Poll<Option<S::Item>> {
7171
self.sink().poll_next(cx)
7272
}
73+
74+
fn size_hint(&self) -> (usize, Option<usize>) {
75+
self.sink.size_hint()
76+
}
7377
}
7478

7579
impl<S, Item, E> FusedStream for SinkErrInto<S, Item, E>

futures-util/src/sink/map_err.rs

+4
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ impl<S: Stream, F> Stream for SinkMapErr<S, F> {
9595
) -> Poll<Option<S::Item>> {
9696
self.sink().poll_next(cx)
9797
}
98+
99+
fn size_hint(&self) -> (usize, Option<usize>) {
100+
self.sink.size_hint()
101+
}
98102
}
99103

100104
impl<S: FusedStream, F> FusedStream for SinkMapErr<S, F> {

futures-util/src/sink/with.rs

+4
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,10 @@ impl<S, Item, U, Fut, F> Stream for With<S, Item, U, Fut, F>
9797
) -> Poll<Option<S::Item>> {
9898
self.sink().poll_next(cx)
9999
}
100+
101+
fn size_hint(&self) -> (usize, Option<usize>) {
102+
self.sink.size_hint()
103+
}
100104
}
101105

102106
impl<Si, Item, U, Fut, F, E> With<Si, Item, U, Fut, F>

futures-util/src/sink/with_flat_map.rs

+5
Original file line numberDiff line numberDiff line change
@@ -121,12 +121,17 @@ where
121121
St: Stream<Item = Result<Item, S::Error>>,
122122
{
123123
type Item = S::Item;
124+
124125
fn poll_next(
125126
self: Pin<&mut Self>,
126127
cx: &mut Context<'_>,
127128
) -> Poll<Option<S::Item>> {
128129
self.sink().poll_next(cx)
129130
}
131+
132+
fn size_hint(&self) -> (usize, Option<usize>) {
133+
self.sink.size_hint()
134+
}
130135
}
131136

132137
impl<S, Item, U, St, F> FusedStream for WithFlatMap<S, Item, U, St, F>

futures-util/src/stream/buffer_unordered.rs

+11
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,17 @@ where
127127
Poll::Pending
128128
}
129129
}
130+
131+
fn size_hint(&self) -> (usize, Option<usize>) {
132+
let queue_len = self.in_progress_queue.len();
133+
let (lower, upper) = self.stream.size_hint();
134+
let lower = lower.saturating_add(queue_len);
135+
let upper = match upper {
136+
Some(x) => x.checked_add(queue_len),
137+
None => None,
138+
};
139+
(lower, upper)
140+
}
130141
}
131142

132143
impl<St> FusedStream for BufferUnordered<St>

futures-util/src/stream/buffered.rs

+11
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,17 @@ where
122122
Poll::Pending
123123
}
124124
}
125+
126+
fn size_hint(&self) -> (usize, Option<usize>) {
127+
let queue_len = self.in_progress_queue.len();
128+
let (lower, upper) = self.stream.size_hint();
129+
let lower = lower.saturating_add(queue_len);
130+
let upper = match upper {
131+
Some(x) => x.checked_add(queue_len),
132+
None => None,
133+
};
134+
(lower, upper)
135+
}
125136
}
126137

127138
// Forwarding impl of Sink from the underlying stream

futures-util/src/stream/catch_unwind.rs

+8
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,14 @@ impl<St: Stream + UnwindSafe> Stream for CatchUnwind<St> {
4545
}
4646
}
4747
}
48+
49+
fn size_hint(&self) -> (usize, Option<usize>) {
50+
if self.caught_unwind {
51+
(0, Some(0))
52+
} else {
53+
self.stream.size_hint()
54+
}
55+
}
4856
}
4957

5058
impl<St: FusedStream + UnwindSafe> FusedStream for CatchUnwind<St> {

futures-util/src/stream/chain.rs

+18
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,22 @@ where St1: Stream,
5454
self.as_mut().first().set(None);
5555
self.as_mut().second().poll_next(cx)
5656
}
57+
58+
fn size_hint(&self) -> (usize, Option<usize>) {
59+
if let Some(first) = &self.first {
60+
let (first_lower, first_upper) = first.size_hint();
61+
let (second_lower, second_upper) = self.second.size_hint();
62+
63+
let lower = first_lower.saturating_add(second_lower);
64+
65+
let upper = match (first_upper, second_upper) {
66+
(Some(x), Some(y)) => x.checked_add(y),
67+
_ => None
68+
};
69+
70+
(lower, upper)
71+
} else {
72+
self.second.size_hint()
73+
}
74+
}
5775
}

futures-util/src/stream/chunks.rs

+11
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,17 @@ impl<St: Stream> Stream for Chunks<St> {
105105
}
106106
}
107107
}
108+
109+
fn size_hint(&self) -> (usize, Option<usize>) {
110+
let chunk_len = if self.items.is_empty() { 0 } else { 1 };
111+
let (lower, upper) = self.stream.size_hint();
112+
let lower = lower.saturating_add(chunk_len);
113+
let upper = match upper {
114+
Some(x) => x.checked_add(chunk_len),
115+
None => None,
116+
};
117+
(lower, upper)
118+
}
108119
}
109120

110121
impl<St: FusedStream> FusedStream for Chunks<St> {

futures-util/src/stream/empty.rs

+4
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,8 @@ impl<T> Stream for Empty<T> {
3333
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3434
Poll::Ready(None)
3535
}
36+
37+
fn size_hint(&self) -> (usize, Option<usize>) {
38+
(0, Some(0))
39+
}
3640
}

futures-util/src/stream/enumerate.rs

+4
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ impl<St: Stream> Stream for Enumerate<St> {
8181
None => Poll::Ready(None),
8282
}
8383
}
84+
85+
fn size_hint(&self) -> (usize, Option<usize>) {
86+
self.stream.size_hint()
87+
}
8488
}
8589

8690
// Forwarding impl of Sink from the underlying stream

futures-util/src/stream/filter.rs

+10
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,16 @@ impl<St, Fut, F> Stream for Filter<St, Fut, F>
132132
}
133133
}
134134
}
135+
136+
fn size_hint(&self) -> (usize, Option<usize>) {
137+
let pending_len = if self.pending_item.is_some() { 1 } else { 0 };
138+
let (_, upper) = self.stream.size_hint();
139+
let upper = match upper {
140+
Some(x) => x.checked_add(pending_len),
141+
None => None,
142+
};
143+
(0, upper) // can't know a lower bound, due to the predicate
144+
}
135145
}
136146

137147
// Forwarding impl of Sink from the underlying stream

0 commit comments

Comments
 (0)