Skip to content

Commit 363ebe4

Browse files
taiki-ecramertj
authored andcommitted
Add StreamExt::enumerate
1 parent a5d8d15 commit 363ebe4

File tree

3 files changed

+113
-3
lines changed

3 files changed

+113
-3
lines changed

futures-util/src/stream/enumerate.rs

+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
use core::pin::Pin;
2+
use futures_core::stream::{FusedStream, Stream};
3+
use futures_core::task::{Context, Poll};
4+
use futures_sink::Sink;
5+
use pin_utils::{unsafe_pinned, unsafe_unpinned};
6+
7+
/// Stream for the [`enumerate`](super::EnumerateExt::enumerate) method.
8+
#[derive(Debug)]
9+
#[must_use = "streams do nothing unless polled"]
10+
pub struct Enumerate<St: Stream> {
11+
stream: St,
12+
count: usize,
13+
}
14+
15+
impl<St: Stream + Unpin> Unpin for Enumerate<St> {}
16+
17+
impl<St: Stream> Enumerate<St> {
18+
unsafe_pinned!(stream: St);
19+
unsafe_unpinned!(count: usize);
20+
21+
pub(super) fn new(stream: St) -> Enumerate<St> {
22+
Enumerate {
23+
stream,
24+
count: 0,
25+
}
26+
}
27+
}
28+
29+
impl<St: Stream + FusedStream> FusedStream for Enumerate<St> {
30+
fn is_terminated(&self) -> bool {
31+
self.stream.is_terminated()
32+
}
33+
}
34+
35+
impl<St: Stream> Stream for Enumerate<St> {
36+
type Item = (usize, St::Item);
37+
38+
fn poll_next(
39+
mut self: Pin<&mut Self>,
40+
cx: &mut Context<'_>,
41+
) -> Poll<Option<Self::Item>> {
42+
match ready!(self.as_mut().stream().poll_next(cx)) {
43+
Some(item) => {
44+
let count = self.count;
45+
*self.as_mut().count() += 1;
46+
Poll::Ready(Some((count, item)))
47+
}
48+
None => Poll::Ready(None),
49+
}
50+
}
51+
}
52+
53+
// Forwarding impl of Sink from the underlying stream
54+
impl<S, Item> Sink<Item> for Enumerate<S>
55+
where
56+
S: Stream + Sink<Item>,
57+
{
58+
type SinkError = S::SinkError;
59+
60+
delegate_sink!(stream, Item);
61+
}

futures-util/src/stream/mod.rs

+48
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ pub use self::concat::Concat;
3030
mod empty;
3131
pub use self::empty::{empty, Empty};
3232

33+
mod enumerate;
34+
pub use self::enumerate::Enumerate;
35+
3336
mod filter;
3437
pub use self::filter::Filter;
3538

@@ -242,6 +245,51 @@ pub trait StreamExt: Stream {
242245
Map::new(self, f)
243246
}
244247

248+
/// Creates a stream which gives the current iteration count as well as
249+
/// the next value.
250+
///
251+
/// The stream returned yields pairs `(i, val)`, where `i` is the
252+
/// current index of iteration and `val` is the value returned by the
253+
/// stream.
254+
///
255+
/// `enumerate()` keeps its count as a [`usize`]. If you want to count by a
256+
/// different sized integer, the [`zip`](StreamExt::zip) function provides similar
257+
/// functionality.
258+
///
259+
/// # Overflow Behavior
260+
///
261+
/// The method does no guarding against overflows, so enumerating more than
262+
/// [`usize::max_value()`] elements either produces the wrong result or panics. If
263+
/// debug assertions are enabled, a panic is guaranteed.
264+
///
265+
/// # Panics
266+
///
267+
/// The returned stream might panic if the to-be-returned index would
268+
/// overflow a [`usize`].
269+
///
270+
/// # Examples
271+
///
272+
/// ```
273+
/// #![feature(async_await, await_macro, futures_api)]
274+
/// # futures::executor::block_on(async {
275+
/// use futures::stream::{self, StreamExt};
276+
///
277+
/// let stream = stream::iter(vec!['a', 'b', 'c']);
278+
///
279+
/// let mut stream = stream.enumerate();
280+
///
281+
/// assert_eq!(await!(stream.next()), Some((0, 'a')));
282+
/// assert_eq!(await!(stream.next()), Some((1, 'b')));
283+
/// assert_eq!(await!(stream.next()), Some((2, 'c')));
284+
/// assert_eq!(await!(stream.next()), None);
285+
/// # });
286+
/// ```
287+
fn enumerate(self) -> Enumerate<Self>
288+
where Self: Sized,
289+
{
290+
Enumerate::new(self)
291+
}
292+
245293
/// Filters the values produced by this stream according to the provided
246294
/// asynchronous predicate.
247295
///

futures/src/lib.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -345,9 +345,10 @@ pub mod stream {
345345
unfold, Unfold,
346346

347347
StreamExt,
348-
Chain, Collect, Concat, Filter, FilterMap, Flatten, Fold, Forward,
349-
ForEach, Fuse, StreamFuture, Inspect, Map, Next, SelectNextSome,
350-
Peekable, Select, Skip, SkipWhile, Take, TakeWhile, Then, Zip
348+
Chain, Collect, Concat, Enumerate, Filter, FilterMap, Flatten, Fold,
349+
Forward, ForEach, Fuse, StreamFuture, Inspect, Map, Next,
350+
SelectNextSome, Peekable, Select, Skip, SkipWhile, Take, TakeWhile,
351+
Then, Zip
351352
};
352353

353354
#[cfg(feature = "alloc")]

0 commit comments

Comments
 (0)