Skip to content

Commit 064fdf0

Browse files
committed
Stream::delay
Signed-off-by: Yoshua Wuyts <[email protected]>
1 parent 30b5ca5 commit 064fdf0

File tree

2 files changed

+76
-0
lines changed

2 files changed

+76
-0
lines changed

src/stream/stream/delay.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
use std::future::Future;
2+
use std::pin::Pin;
3+
use std::time::Duration;
4+
5+
use crate::stream::Stream;
6+
use crate::task::{Context, Poll};
7+
8+
#[doc(hidden)]
9+
#[allow(missing_debug_implementations)]
10+
pub struct Delay<S> {
11+
stream: S,
12+
delay: futures_timer::Delay,
13+
delay_done: bool,
14+
}
15+
16+
impl<S> Delay<S> {
17+
pin_utils::unsafe_pinned!(stream: S);
18+
pin_utils::unsafe_pinned!(delay: futures_timer::Delay);
19+
pin_utils::unsafe_unpinned!(delay_done: bool);
20+
21+
pub(super) fn new(stream: S, dur: Duration) -> Self {
22+
Delay {
23+
stream,
24+
delay: futures_timer::Delay::new(dur),
25+
delay_done: false,
26+
}
27+
}
28+
}
29+
30+
impl<S> Stream for Delay<S>
31+
where
32+
S: Stream,
33+
{
34+
type Item = S::Item;
35+
36+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
37+
if !self.delay_done {
38+
futures_core::ready!(self.as_mut().delay().poll(cx));
39+
*self.as_mut().delay_done() = true;
40+
}
41+
42+
self.as_mut().stream().poll_next(cx)
43+
}
44+
}

src/stream/stream/mod.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
mod all;
2525
mod any;
2626
mod chain;
27+
mod delay;
2728
mod enumerate;
2829
mod filter;
2930
mod filter_map;
@@ -61,6 +62,7 @@ use try_for_each::TryForEeachFuture;
6162
pub use chain::Chain;
6263
pub use filter::Filter;
6364
pub use fuse::Fuse;
65+
pub use delay::Delay;
6466
pub use inspect::Inspect;
6567
pub use map::Map;
6668
pub use scan::Scan;
@@ -340,6 +342,36 @@ extension_trait! {
340342
Enumerate::new(self)
341343
}
342344

345+
#[doc = r#"
346+
Creates a stream that is delayed before it starts yielding items.
347+
348+
# Examples
349+
350+
```
351+
# fn main() { async_std::task::block_on(async {
352+
#
353+
use async_std::prelude::*;
354+
use async_std::future;
355+
use std::time::Duration;
356+
357+
let p1 = future::ready(1).delay(Duration::from_millis(200));
358+
let p1 = future::ready(2).delay(Duration::from_millis(100));
359+
let p1 = future::ready(3).delay(Duration::from_millis(300));
360+
361+
assert_eq!(future::join!(p1, p2, p3).await, (1, 2, 3));
362+
#
363+
# }) }
364+
```
365+
"#]
366+
#[cfg(any(feature = "unstable", feature = "docs"))]
367+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
368+
fn delay(self, dur: std::time::Duration) -> Delay<Self>
369+
where
370+
Self: Sized,
371+
{
372+
Delay::new(self, dur)
373+
}
374+
343375
#[doc = r#"
344376
Takes a closure and creates a stream that calls that closure on every element of this stream.
345377

0 commit comments

Comments
 (0)