From b994e7b72ea17f8be7cafe69dcbbdd7c863cd2a6 Mon Sep 17 00:00:00 2001 From: Eyal Kalderon Date: Wed, 27 Feb 2019 18:08:55 +0800 Subject: [PATCH 1/4] Change Forward to drop sink after completion --- futures-util/src/stream/forward.rs | 5 +++-- futures-util/src/stream/mod.rs | 6 ------ 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/futures-util/src/stream/forward.rs b/futures-util/src/stream/forward.rs index 0ae2dfe52f..7eea1786cf 100644 --- a/futures-util/src/stream/forward.rs +++ b/futures-util/src/stream/forward.rs @@ -71,7 +71,7 @@ where Si: Sink + Unpin, St: Stream>, { - type Output = Result; + type Output = Result<(), Si::SinkError>; fn poll( mut self: Pin<&mut Self>, @@ -91,7 +91,8 @@ where Poll::Ready(None) => { try_ready!(self.as_mut().sink().as_pin_mut().expect(INVALID_POLL) .poll_close(waker)); - return Poll::Ready(Ok(self.as_mut().sink().take().unwrap())) + let _ = self.as_mut().sink().take().unwrap(); + return Poll::Ready(Ok(())) } Poll::Pending => { try_ready!(self.as_mut().sink().as_pin_mut().expect(INVALID_POLL) diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index 28308246d8..7ca87dd1ed 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -976,12 +976,6 @@ pub trait StreamExt: Stream { /// exhausted, sending each item to the sink. It will complete once both the /// stream is exhausted and the sink has received and flushed all items. /// Note that the sink is **not** closed. - /// - /// On completion, the sink is returned. - /// - /// Note that this combinator is only usable with `Unpin` sinks. - /// Sinks that are not `Unpin` will need to be pinned in order to be used - /// with `forward`. fn forward(self, sink: S) -> Forward where S: Sink + Unpin, From 49f41baa4fd37da589691b3362562c6d934ac31e Mon Sep 17 00:00:00 2001 From: Eyal Kalderon Date: Wed, 27 Feb 2019 18:10:34 +0800 Subject: [PATCH 2/4] Update Forward docs, remove Unpin sink requirement --- futures-util/src/stream/forward.rs | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/futures-util/src/stream/forward.rs b/futures-util/src/stream/forward.rs index 7eea1786cf..6ebf684dd8 100644 --- a/futures-util/src/stream/forward.rs +++ b/futures-util/src/stream/forward.rs @@ -9,16 +9,10 @@ use pin_utils::{unsafe_pinned, unsafe_unpinned}; const INVALID_POLL: &str = "polled `Forward` after completion"; /// Future for the `Stream::forward` combinator, which sends a stream of values -/// to a sink and then flushes the sink. -/// -/// Note: this is only usable with `Unpin` sinks, so `Sink`s that aren't `Unpin` -/// will need to be pinned in order to be used with this combinator. -// -// This limitation is necessary in order to return the sink after the forwarding -// has completed so that it can be used again. +/// to a sink and then flushes and closes the sink. #[derive(Debug)] #[must_use = "steams do nothing unless polled"] -pub struct Forward { +pub struct Forward { sink: Option, stream: Fuse, buffered_item: Option, @@ -28,7 +22,7 @@ impl Unpin for Forward {} impl Forward where - Si: Sink + Unpin, + Si: Sink, St: Stream>, { unsafe_pinned!(sink: Option); @@ -68,7 +62,7 @@ impl FusedFuture for Forward { impl Future for Forward where - Si: Sink + Unpin, + Si: Sink, St: Stream>, { type Output = Result<(), Si::SinkError>; @@ -91,7 +85,7 @@ where Poll::Ready(None) => { try_ready!(self.as_mut().sink().as_pin_mut().expect(INVALID_POLL) .poll_close(waker)); - let _ = self.as_mut().sink().take().unwrap(); + let _ = self.as_mut().sink().as_pin_mut().take().unwrap(); return Poll::Ready(Ok(())) } Poll::Pending => { From 755804c0ad292e2ebab4531a0c76b28f2c2ea5d3 Mon Sep 17 00:00:00 2001 From: Eyal Kalderon Date: Wed, 27 Feb 2019 18:19:04 +0800 Subject: [PATCH 3/4] Remove Unpin sink bound and update docs of StreamExt::forward() --- futures-util/src/stream/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index 7ca87dd1ed..c3eafdcfc5 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -970,15 +970,15 @@ pub trait StreamExt: Stream { } /// A future that completes after the given stream has been fully processed - /// into the sink, including flushing. + /// into the sink and the sink has been flushed and closed. /// /// This future will drive the stream to keep producing items until it is - /// exhausted, sending each item to the sink. It will complete once both the - /// stream is exhausted and the sink has received and flushed all items. - /// Note that the sink is **not** closed. + /// exhausted, sending each item to the sink. It will complete once the + /// stream is exhausted, the sink has received and flushed all items, and + /// the sink is closed. fn forward(self, sink: S) -> Forward where - S: Sink + Unpin, + S: Sink, Self: Stream> + Sized, { Forward::new(self, sink) From 28ab681e2e321b73c1bce31a0cee336f83d7a45c Mon Sep 17 00:00:00 2001 From: Eyal Kalderon Date: Wed, 27 Feb 2019 18:24:23 +0800 Subject: [PATCH 4/4] Use set() over take() to ensure sink is dropped --- futures-util/src/stream/forward.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/futures-util/src/stream/forward.rs b/futures-util/src/stream/forward.rs index 6ebf684dd8..3a4e215853 100644 --- a/futures-util/src/stream/forward.rs +++ b/futures-util/src/stream/forward.rs @@ -85,7 +85,7 @@ where Poll::Ready(None) => { try_ready!(self.as_mut().sink().as_pin_mut().expect(INVALID_POLL) .poll_close(waker)); - let _ = self.as_mut().sink().as_pin_mut().take().unwrap(); + self.as_mut().sink().set(None); return Poll::Ready(Ok(())) } Poll::Pending => {