Skip to content

Commit 9a04fa3

Browse files
Merge pull request #1141 from cramertj/map-ok
Add TryStream::{map_ok, map_err}
2 parents 23b0625 + ca564ad commit 9a04fa3

3 files changed

Lines changed: 146 additions & 0 deletions

File tree

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
use core::marker::Unpin;
2+
use core::mem::PinMut;
3+
use futures_core::stream::{Stream, TryStream};
4+
use futures_core::task::{self, Poll};
5+
6+
/// Stream for the [`map_err`](super::TryStreamExt::map_err) combinator.
7+
#[derive(Debug)]
8+
#[must_use = "streams do nothing unless polled"]
9+
pub struct MapErr<St, F> {
10+
stream: St,
11+
f: F,
12+
}
13+
14+
impl<St, F> MapErr<St, F> {
15+
unsafe_pinned!(stream: St);
16+
unsafe_unpinned!(f: F);
17+
18+
/// Creates a new MapErr.
19+
pub(super) fn new(stream: St, f: F) -> Self {
20+
MapErr { stream, f }
21+
}
22+
}
23+
24+
impl<St: Unpin, F> Unpin for MapErr<St, F> {}
25+
26+
impl<St, F, E> Stream for MapErr<St, F>
27+
where
28+
St: TryStream,
29+
F: FnMut(St::Error) -> E,
30+
{
31+
type Item = Result<St::Ok, E>;
32+
33+
#[allow(redundant_closure)] // https://github.com/rust-lang-nursery/rust-clippy/issues/1439
34+
fn poll_next(
35+
mut self: PinMut<Self>,
36+
cx: &mut task::Context,
37+
) -> Poll<Option<Self::Item>> {
38+
match self.stream().try_poll_next(cx) {
39+
Poll::Pending => Poll::Pending,
40+
Poll::Ready(opt) =>
41+
Poll::Ready(opt.map(|res| res.map_err(|e| self.f()(e)))),
42+
}
43+
}
44+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
use core::marker::Unpin;
2+
use core::mem::PinMut;
3+
use futures_core::stream::{Stream, TryStream};
4+
use futures_core::task::{self, Poll};
5+
6+
/// Stream for the [`map_ok`](super::TryStreamExt::map_ok) combinator.
7+
#[derive(Debug)]
8+
#[must_use = "streams do nothing unless polled"]
9+
pub struct MapOk<St, F> {
10+
stream: St,
11+
f: F,
12+
}
13+
14+
impl<St, F> MapOk<St, F> {
15+
unsafe_pinned!(stream: St);
16+
unsafe_unpinned!(f: F);
17+
18+
/// Creates a new MapOk.
19+
pub(super) fn new(stream: St, f: F) -> Self {
20+
MapOk { stream, f }
21+
}
22+
}
23+
24+
impl<St: Unpin, F> Unpin for MapOk<St, F> {}
25+
26+
impl<St, F, T> Stream for MapOk<St, F>
27+
where
28+
St: TryStream,
29+
F: FnMut(St::Ok) -> T,
30+
{
31+
type Item = Result<T, St::Error>;
32+
33+
#[allow(redundant_closure)] // https://github.com/rust-lang-nursery/rust-clippy/issues/1439
34+
fn poll_next(
35+
mut self: PinMut<Self>,
36+
cx: &mut task::Context,
37+
) -> Poll<Option<Self::Item>> {
38+
match self.stream().try_poll_next(cx) {
39+
Poll::Pending => Poll::Pending,
40+
Poll::Ready(opt) =>
41+
Poll::Ready(opt.map(|res| res.map(|x| self.f()(x)))),
42+
}
43+
}
44+
}

futures-util/src/try_stream/mod.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,12 @@ use futures_core::stream::TryStream;
1010
mod err_into;
1111
pub use self::err_into::ErrInto;
1212

13+
mod map_ok;
14+
pub use self::map_ok::MapOk;
15+
16+
mod map_err;
17+
pub use self::map_err::MapErr;
18+
1319
mod try_next;
1420
pub use self::try_next::TryNext;
1521

@@ -51,6 +57,58 @@ pub trait TryStreamExt: TryStream {
5157
ErrInto::new(self)
5258
}
5359

60+
/// Wraps the current stream in a new stream which maps the success value
61+
/// using the provided closure.
62+
///
63+
/// # Examples
64+
///
65+
/// ```
66+
/// #![feature(async_await, await_macro)]
67+
/// # futures::executor::block_on(async {
68+
/// use futures::{stream, TryStreamExt};
69+
///
70+
/// let mut stream =
71+
/// stream::iter(vec![Ok(5), Err(0)])
72+
/// .map_ok(|x| x + 2);
73+
///
74+
/// assert_eq!(await!(stream.try_next()), Ok(Some(7)));
75+
/// assert_eq!(await!(stream.try_next()), Err(0));
76+
/// # })
77+
/// ```
78+
fn map_ok<T, F>(self, f: F) -> MapOk<Self, F>
79+
where
80+
Self: Sized,
81+
F: FnMut(Self::Ok) -> T,
82+
{
83+
MapOk::new(self, f)
84+
}
85+
86+
/// Wraps the current stream in a new stream which maps the error value
87+
/// using the provided closure.
88+
///
89+
/// # Examples
90+
///
91+
/// ```
92+
/// #![feature(async_await, await_macro)]
93+
/// # futures::executor::block_on(async {
94+
/// use futures::{stream, TryStreamExt};
95+
///
96+
/// let mut stream =
97+
/// stream::iter(vec![Ok(5), Err(0)])
98+
/// .map_err(|x| x + 2);
99+
///
100+
/// assert_eq!(await!(stream.try_next()), Ok(Some(5)));
101+
/// assert_eq!(await!(stream.try_next()), Err(2));
102+
/// # })
103+
/// ```
104+
fn map_err<E, F>(self, f: F) -> MapErr<Self, F>
105+
where
106+
Self: Sized,
107+
F: FnMut(Self::Error) -> E,
108+
{
109+
MapErr::new(self, f)
110+
}
111+
54112
/// Creates a future that attempts to resolve the next item in the stream.
55113
/// If an error is encountered before the next item, the error is returned
56114
/// instead.

0 commit comments

Comments
 (0)