Skip to content

Commit 4d0e1d4

Browse files
committed
feat: implement last helper method on StreamExt
The `last` function consumes the stream and returns the last element.
1 parent de9274e commit 4d0e1d4

4 files changed

Lines changed: 118 additions & 1 deletion

File tree

futures-util/src/stream/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream};
1919
mod stream;
2020
pub use self::stream::{
2121
All, Any, Chain, Collect, Concat, Count, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten,
22-
Fold, ForEach, Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan,
22+
Fold, ForEach, Fuse, Inspect, Last, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan,
2323
SelectNextSome, Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then,
2424
TryFold, TryForEach, Unzip, Zip,
2525
};
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
use core::fmt;
2+
use core::pin::Pin;
3+
use futures_core::future::{FusedFuture, Future};
4+
use futures_core::ready;
5+
use futures_core::stream::{FusedStream, Stream};
6+
use futures_core::task::{Context, Poll};
7+
use pin_project_lite::pin_project;
8+
9+
pin_project! {
10+
/// Future for the [`last`](super::StreamExt::last) method.
11+
#[must_use = "futures do nothing unless you `.await` or poll them"]
12+
pub struct Last<St: Stream> {
13+
#[pin]
14+
stream: St,
15+
last: Option<St::Item>,
16+
done: bool,
17+
}
18+
}
19+
20+
impl<St> fmt::Debug for Last<St>
21+
where
22+
St: Stream + fmt::Debug,
23+
{
24+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
25+
f.debug_struct("Last").field("stream", &self.stream).finish()
26+
}
27+
}
28+
29+
impl<St> Last<St>
30+
where
31+
St: Stream,
32+
{
33+
pub(super) fn new(stream: St) -> Self {
34+
Self { stream, last: None, done: false }
35+
}
36+
}
37+
38+
impl<St> FusedFuture for Last<St>
39+
where
40+
St: FusedStream,
41+
{
42+
fn is_terminated(&self) -> bool {
43+
self.done
44+
}
45+
}
46+
47+
impl<St> Future for Last<St>
48+
where
49+
St: Stream,
50+
{
51+
type Output = Option<St::Item>;
52+
53+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
54+
let mut this = self.project();
55+
56+
if *this.done {
57+
panic!("Last polled after completion");
58+
}
59+
60+
Poll::Ready(loop {
61+
match ready!(this.stream.as_mut().poll_next(cx)) {
62+
Some(item) => *this.last = Some(item),
63+
None => {
64+
*this.done = true;
65+
break this.last.take();
66+
}
67+
}
68+
})
69+
}
70+
}

futures-util/src/stream/stream/mod.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ pub use self::any::Any;
6868
mod all;
6969
pub use self::all::All;
7070

71+
mod last;
72+
pub use self::last::Last;
73+
7174
#[cfg(feature = "sink")]
7275
mod forward;
7376

@@ -716,6 +719,32 @@ pub trait StreamExt: Stream {
716719
assert_future::<bool, _>(All::new(self, f))
717720
}
718721

722+
/// Returns the last element of the stream, or `None` if the stream is empty.
723+
///
724+
/// This function will consume the entire stream to return the last item.
725+
///
726+
/// # Examples
727+
///
728+
/// ```
729+
/// # futures::executor::block_on(async {
730+
/// use futures::stream::{self, StreamExt};
731+
///
732+
/// let number_stream = stream::iter(1..=5);
733+
/// let last_number = number_stream.last().await;
734+
/// assert_eq!(last_number, Some(5));
735+
///
736+
/// let empty_stream = stream::iter(Vec::<i32>::new());
737+
/// let last_number = empty_stream.last().await;
738+
/// assert_eq!(last_number, None);
739+
/// # });
740+
/// ```
741+
fn last(self) -> Last<Self>
742+
where
743+
Self: Sized,
744+
{
745+
assert_future::<Option<Self::Item>, _>(Last::new(self))
746+
}
747+
719748
/// Flattens a stream of streams into just one continuous stream.
720749
///
721750
/// # Examples

futures/tests/stream.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -592,3 +592,21 @@ fn any() {
592592
assert!(!any);
593593
});
594594
}
595+
596+
#[test]
597+
fn last() {
598+
block_on(async {
599+
let empty: [u8; 0] = [];
600+
let st = stream::iter(empty);
601+
let last = st.last().await;
602+
assert_eq!(last, None);
603+
604+
let st = stream::iter([1]);
605+
let last = st.last().await;
606+
assert_eq!(last, Some(1));
607+
608+
let st = stream::iter([1, 2, 3, 4, 5]);
609+
let last = st.last().await;
610+
assert_eq!(last, Some(5));
611+
});
612+
}

0 commit comments

Comments
 (0)