Skip to content

Commit 428ce77

Browse files
taiki-ecramertj
authored andcommitted
impl Stream/Sink for FlattenSink/TryFlattenStream
1 parent e173151 commit 428ce77

File tree

6 files changed

+341
-167
lines changed

6 files changed

+341
-167
lines changed
Lines changed: 44 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,102 +1,76 @@
1+
use super::FlattenStreamSink;
12
use core::pin::Pin;
23
use futures_core::future::TryFuture;
4+
use futures_core::stream::{FusedStream, Stream, TryStream};
35
use futures_core::task::{Context, Poll};
46
use futures_sink::Sink;
5-
6-
#[derive(Debug)]
7-
enum State<Fut, Si> {
8-
Waiting(Fut),
9-
Ready(Si),
10-
Closed,
11-
}
12-
use self::State::*;
7+
use pin_utils::unsafe_pinned;
138

149
/// Sink for the [`flatten_sink`](super::TryFutureExt::flatten_sink) method.
1510
#[derive(Debug)]
1611
#[must_use = "sinks do nothing unless polled"]
17-
pub struct FlattenSink<Fut, Si>(State<Fut, Si>);
18-
19-
impl<Fut: Unpin, Si: Unpin> Unpin for FlattenSink<Fut, Si> {}
12+
pub struct FlattenSink<Fut, Si>
13+
where
14+
Fut: TryFuture<Ok = Si>,
15+
{
16+
inner: FlattenStreamSink<Fut>,
17+
}
2018

2119
impl<Fut, Si> FlattenSink<Fut, Si>
2220
where
2321
Fut: TryFuture<Ok = Si>,
2422
{
25-
pub(super) fn new(future: Fut) -> FlattenSink<Fut, Si> {
26-
FlattenSink(Waiting(future))
27-
}
23+
unsafe_pinned!(inner: FlattenStreamSink<Fut>);
2824

29-
fn project_pin<'a>(
30-
self: Pin<&'a mut Self>
31-
) -> State<Pin<&'a mut Fut>, Pin<&'a mut Si>> {
32-
unsafe {
33-
match &mut Pin::get_unchecked_mut(self).0 {
34-
Waiting(f) => Waiting(Pin::new_unchecked(f)),
35-
Ready(s) => Ready(Pin::new_unchecked(s)),
36-
Closed => Closed,
37-
}
25+
pub(super) fn new(future: Fut) -> Self {
26+
Self {
27+
inner: FlattenStreamSink::new(future),
3828
}
3929
}
4030
}
4131

32+
impl<Fut, S> FusedStream for FlattenSink<Fut, S>
33+
where
34+
Fut: TryFuture<Ok = S>,
35+
S: FusedStream,
36+
{
37+
fn is_terminated(&self) -> bool {
38+
self.inner.is_terminated()
39+
}
40+
}
41+
42+
impl<Fut, S> Stream for FlattenSink<Fut, S>
43+
where
44+
Fut: TryFuture<Ok = S>,
45+
S: TryStream<Error = Fut::Error>,
46+
{
47+
type Item = Result<S::Ok, Fut::Error>;
48+
49+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
50+
self.inner().poll_next(cx)
51+
}
52+
}
53+
4254
impl<Fut, Si, Item> Sink<Item> for FlattenSink<Fut, Si>
4355
where
4456
Fut: TryFuture<Ok = Si>,
4557
Si: Sink<Item, SinkError = Fut::Error>,
4658
{
47-
type SinkError = Si::SinkError;
59+
type SinkError = Fut::Error;
4860

49-
fn poll_ready(
50-
mut self: Pin<&mut Self>,
51-
cx: &mut Context<'_>,
52-
) -> Poll<Result<(), Self::SinkError>> {
53-
let resolved_stream = match self.as_mut().project_pin() {
54-
Ready(s) => return s.poll_ready(cx),
55-
Waiting(f) => ready!(f.try_poll(cx))?,
56-
Closed => panic!("poll_ready called after eof"),
57-
};
58-
self.set(FlattenSink(Ready(resolved_stream)));
59-
if let Ready(resolved_stream) = self.project_pin() {
60-
resolved_stream.poll_ready(cx)
61-
} else {
62-
unreachable!()
63-
}
61+
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::SinkError>> {
62+
self.inner().poll_ready(cx)
6463
}
6564

66-
fn start_send(
67-
self: Pin<&mut Self>,
68-
item: Item,
69-
) -> Result<(), Self::SinkError> {
70-
match self.project_pin() {
71-
Ready(s) => s.start_send(item),
72-
Waiting(_) => panic!("poll_ready not called first"),
73-
Closed => panic!("start_send called after eof"),
74-
}
65+
fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::SinkError> {
66+
self.inner().start_send(item)
7567
}
7668

77-
fn poll_flush(
78-
self: Pin<&mut Self>,
79-
cx: &mut Context<'_>,
80-
) -> Poll<Result<(), Self::SinkError>> {
81-
match self.project_pin() {
82-
Ready(s) => s.poll_flush(cx),
83-
// if sink not yet resolved, nothing written ==> everything flushed
84-
Waiting(_) => Poll::Ready(Ok(())),
85-
Closed => panic!("poll_flush called after eof"),
86-
}
69+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::SinkError>> {
70+
self.inner().poll_flush(cx)
8771
}
8872

89-
fn poll_close(
90-
mut self: Pin<&mut Self>,
91-
cx: &mut Context<'_>,
92-
) -> Poll<Result<(), Self::SinkError>> {
93-
let res = match self.as_mut().project_pin() {
94-
Ready(s) => s.poll_close(cx),
95-
Waiting(_) | Closed => Poll::Ready(Ok(())),
96-
};
97-
if res.is_ready() {
98-
self.set(FlattenSink(Closed));
99-
}
100-
res
73+
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::SinkError>> {
74+
self.inner().poll_close(cx)
10175
}
10276
}
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
use core::fmt;
2+
use core::pin::Pin;
3+
use futures_core::future::TryFuture;
4+
use futures_core::stream::{FusedStream, Stream, TryStream};
5+
use futures_core::task::{Context, Poll};
6+
use futures_sink::Sink;
7+
use pin_utils::unsafe_pinned;
8+
9+
#[must_use = "streams do nothing unless polled"]
10+
pub(crate) struct FlattenStreamSink<Fut>
11+
where
12+
Fut: TryFuture,
13+
{
14+
state: State<Fut, Fut::Ok>,
15+
}
16+
17+
impl<Fut> Unpin for FlattenStreamSink<Fut>
18+
where
19+
Fut: TryFuture + Unpin,
20+
Fut::Ok: Unpin,
21+
{
22+
}
23+
24+
impl<Fut> fmt::Debug for FlattenStreamSink<Fut>
25+
where
26+
Fut: TryFuture + fmt::Debug,
27+
Fut::Ok: fmt::Debug,
28+
{
29+
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
30+
fmt.debug_struct("FlattenStreamSink")
31+
.field("state", &self.state)
32+
.finish()
33+
}
34+
}
35+
36+
impl<Fut> FlattenStreamSink<Fut>
37+
where
38+
Fut: TryFuture,
39+
{
40+
unsafe_pinned!(state: State<Fut, Fut::Ok>);
41+
42+
pub(crate) fn new(future: Fut) -> Self {
43+
Self {
44+
state: State::Future(future),
45+
}
46+
}
47+
}
48+
49+
#[derive(Debug)]
50+
enum State<Fut, S> {
51+
// future is not yet called or called and not ready
52+
Future(Fut),
53+
// future resolved to Stream or Sink
54+
StreamOrSink(S),
55+
// future resolved to error
56+
Done,
57+
}
58+
59+
impl<Fut, S> State<Fut, S> {
60+
fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> State<Pin<&'a mut Fut>, Pin<&'a mut S>> {
61+
// safety: data is never moved via the resulting &mut reference
62+
match unsafe { Pin::get_unchecked_mut(self) } {
63+
// safety: the future we're re-pinning here will never be moved;
64+
// it will just be polled, then dropped in place
65+
State::Future(f) => State::Future(unsafe { Pin::new_unchecked(f) }),
66+
// safety: the stream we're repinning here will never be moved;
67+
// it will just be polled, then dropped in place
68+
State::StreamOrSink(s) => State::StreamOrSink(unsafe { Pin::new_unchecked(s) }),
69+
State::Done => State::Done,
70+
}
71+
}
72+
}
73+
74+
impl<Fut> State<Fut, Fut::Ok>
75+
where
76+
Fut: TryFuture,
77+
{
78+
fn poll_future(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Fut::Error>> {
79+
if let State::Future(f) = self.as_mut().get_pin_mut() {
80+
match ready!(f.try_poll(cx)) {
81+
Ok(s) => {
82+
// Future resolved to stream.
83+
// We do not return, but poll that
84+
// stream in the next loop iteration.
85+
self.set(State::StreamOrSink(s));
86+
}
87+
Err(e) => {
88+
// Future resolved to error.
89+
// We have neither a pollable stream nor a future.
90+
self.set(State::Done);
91+
return Poll::Ready(Err(e));
92+
}
93+
}
94+
}
95+
Poll::Ready(Ok(()))
96+
}
97+
}
98+
99+
impl<Fut> FusedStream for FlattenStreamSink<Fut>
100+
where
101+
Fut: TryFuture,
102+
Fut::Ok: FusedStream,
103+
{
104+
fn is_terminated(&self) -> bool {
105+
match &self.state {
106+
State::Future(_) => false,
107+
State::StreamOrSink(stream) => stream.is_terminated(),
108+
State::Done => true,
109+
}
110+
}
111+
}
112+
113+
impl<Fut> Stream for FlattenStreamSink<Fut>
114+
where
115+
Fut: TryFuture,
116+
Fut::Ok: TryStream<Error = Fut::Error>,
117+
{
118+
type Item = Result<<Fut::Ok as TryStream>::Ok, Fut::Error>;
119+
120+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
121+
ready!(self.as_mut().state().poll_future(cx)?);
122+
match self.as_mut().state().get_pin_mut() {
123+
State::StreamOrSink(s) => s.try_poll_next(cx),
124+
State::Done => Poll::Ready(None),
125+
State::Future(_) => unreachable!(),
126+
}
127+
}
128+
}
129+
130+
impl<Fut, Item> Sink<Item> for FlattenStreamSink<Fut>
131+
where
132+
Fut: TryFuture,
133+
Fut::Ok: Sink<Item, SinkError = Fut::Error>,
134+
{
135+
type SinkError = Fut::Error;
136+
137+
fn poll_ready(
138+
mut self: Pin<&mut Self>,
139+
cx: &mut Context<'_>,
140+
) -> Poll<Result<(), Self::SinkError>> {
141+
ready!(self.as_mut().state().poll_future(cx)?);
142+
match self.as_mut().state().get_pin_mut() {
143+
State::StreamOrSink(s) => s.poll_ready(cx),
144+
State::Done => panic!("poll_ready called after eof"),
145+
State::Future(_) => unreachable!(),
146+
}
147+
}
148+
149+
fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::SinkError> {
150+
match self.state().get_pin_mut() {
151+
State::StreamOrSink(s) => s.start_send(item),
152+
State::Future(_) => panic!("poll_ready not called first"),
153+
State::Done => panic!("start_send called after eof"),
154+
}
155+
}
156+
157+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::SinkError>> {
158+
match self.state().get_pin_mut() {
159+
State::StreamOrSink(s) => s.poll_flush(cx),
160+
// if sink not yet resolved, nothing written ==> everything flushed
161+
State::Future(_) => Poll::Ready(Ok(())),
162+
State::Done => panic!("poll_flush called after eof"),
163+
}
164+
}
165+
166+
fn poll_close(
167+
mut self: Pin<&mut Self>,
168+
cx: &mut Context<'_>,
169+
) -> Poll<Result<(), Self::SinkError>> {
170+
let res = match self.as_mut().state().get_pin_mut() {
171+
State::StreamOrSink(s) => s.poll_close(cx),
172+
State::Future(_) | State::Done => Poll::Ready(Ok(())),
173+
};
174+
if res.is_ready() {
175+
self.as_mut().state().set(State::Done);
176+
}
177+
res
178+
}
179+
}

futures-util/src/try_future/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ mod unwrap_or_else;
6565
pub use self::unwrap_or_else::UnwrapOrElse;
6666

6767
// Implementation details
68+
mod flatten_stream_sink;
69+
pub(crate) use self::flatten_stream_sink::FlattenStreamSink;
70+
6871
mod try_chain;
6972
pub(crate) use self::try_chain::{TryChain, TryChainAction};
7073

0 commit comments

Comments
 (0)