Skip to content

Commit 0c8db16

Browse files
committed
Add core::stream::Stream
This patch adds the `core::stream` submodule and implements `core::stream::Stream` in accordance with RFC2996. Add feedback from @camelid
1 parent bbc01bb commit 0c8db16

File tree

8 files changed

+347
-0
lines changed

8 files changed

+347
-0
lines changed

library/alloc/src/boxed.rs

+14
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ use core::ops::{
149149
};
150150
use core::pin::Pin;
151151
use core::ptr::{self, Unique};
152+
use core::stream::Stream;
152153
use core::task::{Context, Poll};
153154

154155
use crate::alloc::{handle_alloc_error, AllocError, Allocator, Global, Layout, WriteCloneIntoRaw};
@@ -1618,3 +1619,16 @@ where
16181619
F::poll(Pin::new(&mut *self), cx)
16191620
}
16201621
}
1622+
1623+
#[unstable(feature = "async_stream", issue = "79024")]
1624+
impl<S: ?Sized + Stream + Unpin> Stream for Box<S> {
1625+
type Item = S::Item;
1626+
1627+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1628+
Pin::new(&mut **self).poll_next(cx)
1629+
}
1630+
1631+
fn size_hint(&self) -> (usize, Option<usize>) {
1632+
(**self).size_hint()
1633+
}
1634+
}

library/alloc/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
#![feature(array_windows)]
8383
#![feature(allow_internal_unstable)]
8484
#![feature(arbitrary_self_types)]
85+
#![feature(async_stream)]
8586
#![feature(box_patterns)]
8687
#![feature(box_syntax)]
8788
#![feature(cfg_sanitize)]

library/core/src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,8 @@ pub mod panicking;
254254
pub mod pin;
255255
pub mod raw;
256256
pub mod result;
257+
#[unstable(feature = "async_stream", issue = "79024")]
258+
pub mod stream;
257259
pub mod sync;
258260

259261
pub mod fmt;

library/core/src/stream/mod.rs

+154
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
//! Composable asynchronous iteration.
2+
//!
3+
//! If futures are asynchronous values, then streams are asynchronous
4+
//! iterators. If you've found yourself with an asynchronous collection of some kind,
5+
//! and needed to perform an operation on the elements of said collection,
6+
//! you'll quickly run into 'streams'. Streams are heavily used in idiomatic
7+
//! asynchronous Rust code, so it's worth becoming familiar with them.
8+
//!
9+
//! Before explaining more, let's talk about how this module is structured:
10+
//!
11+
//! # Organization
12+
//!
13+
//! This module is largely organized by type:
14+
//!
15+
//! * [Traits] are the core portion: these traits define what kind of streams
16+
//! exist and what you can do with them. The methods of these traits are worth
17+
//! putting some extra study time into.
18+
//! * Functions provide some helpful ways to create some basic streams.
19+
//! * [Structs] are often the return types of the various methods on this
20+
//! module's traits. You'll usually want to look at the method that creates
21+
//! the `struct`, rather than the `struct` itself. For more detail about why,
22+
//! see '[Implementing Stream](#implementing-stream)'.
23+
//!
24+
//! [Traits]: #traits
25+
//! [Structs]: #structs
26+
//!
27+
//! That's it! Let's dig into streams.
28+
//!
29+
//! # Stream
30+
//!
31+
//! The heart and soul of this module is the [`Stream`] trait. The core of
32+
//! [`Stream`] looks like this:
33+
//!
34+
//! ```
35+
//! # use core::task::{Context, Poll};
36+
//! # use core::pin::Pin;
37+
//! trait Stream {
38+
//! type Item;
39+
//! fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
40+
//! }
41+
//! ```
42+
//!
43+
//! Unlike `Iterator`, `Stream` makes a distinction between the [`poll_next`]
44+
//! method which is used when implementing a `Stream`, and the [`next`] method
45+
//! which is used when consuming a stream. Consumers of `Stream` only need to
46+
//! consider [`next`], which when called, returns a future which yields
47+
//! yields [`Option`][`<Item>`].
48+
//!
49+
//! The future returned by [`next`] will yield `Some(Item)` as long as there are
50+
//! elements, and once they've all been exhausted, will yield `None` to indicate
51+
//! that iteration is finished. If we're waiting on something asynchronous to
52+
//! resolve, the future will wait until the stream is ready to yield again.
53+
//!
54+
//! Individual streams may choose to resume iteration, and so calling [`next`]
55+
//! again may or may not eventually yield `Some(Item)` again at some point.
56+
//!
57+
//! [`Stream`]'s full definition includes a number of other methods as well,
58+
//! but they are default methods, built on top of [`poll_next`], and so you get
59+
//! them for free.
60+
//!
61+
//! [`Poll`]: super::task::Poll
62+
//! [`poll_next`]: Stream::poll_next
63+
//! [`next`]: Stream::next
64+
//! [`<Item>`]: Stream::Item
65+
//!
66+
//! # Implementing Stream
67+
//!
68+
//! Creating a stream of your own involves two steps: creating a `struct` to
69+
//! hold the stream's state, and then implementing [`Stream`] for that
70+
//! `struct`.
71+
//!
72+
//! Let's make a stream named `Counter` which counts from `1` to `5`:
73+
//!
74+
//! ```no_run
75+
//! #![feature(async_stream)]
76+
//! # use core::stream::Stream;
77+
//! # use core::task::{Context, Poll};
78+
//! # use core::pin::Pin;
79+
//!
80+
//! // First, the struct:
81+
//!
82+
//! /// A stream which counts from one to five
83+
//! struct Counter {
84+
//! count: usize,
85+
//! }
86+
//!
87+
//! // we want our count to start at one, so let's add a new() method to help.
88+
//! // This isn't strictly necessary, but is convenient. Note that we start
89+
//! // `count` at zero, we'll see why in `poll_next()`'s implementation below.
90+
//! impl Counter {
91+
//! fn new() -> Counter {
92+
//! Counter { count: 0 }
93+
//! }
94+
//! }
95+
//!
96+
//! // Then, we implement `Stream` for our `Counter`:
97+
//!
98+
//! impl Stream for Counter {
99+
//! // we will be counting with usize
100+
//! type Item = usize;
101+
//!
102+
//! // poll_next() is the only required method
103+
//! fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
104+
//! // Increment our count. This is why we started at zero.
105+
//! self.count += 1;
106+
//!
107+
//! // Check to see if we've finished counting or not.
108+
//! if self.count < 6 {
109+
//! Poll::Ready(Some(self.count))
110+
//! } else {
111+
//! Poll::Ready(None)
112+
//! }
113+
//! }
114+
//! }
115+
//!
116+
//! // And now we can use it!
117+
//! # async fn run() {
118+
//! #
119+
//! let mut counter = Counter::new();
120+
//!
121+
//! let x = counter.next().await.unwrap();
122+
//! println!("{}", x);
123+
//!
124+
//! let x = counter.next().await.unwrap();
125+
//! println!("{}", x);
126+
//!
127+
//! let x = counter.next().await.unwrap();
128+
//! println!("{}", x);
129+
//!
130+
//! let x = counter.next().await.unwrap();
131+
//! println!("{}", x);
132+
//!
133+
//! let x = counter.next().await.unwrap();
134+
//! println!("{}", x);
135+
//! #
136+
//! }
137+
//! ```
138+
//!
139+
//! This will print `1` through `5`, each on their own line.
140+
//!
141+
//! # Laziness
142+
//!
143+
//! Streams are *lazy*. This means that just creating a stream doesn't _do_ a
144+
//! whole lot. Nothing really happens until you call [`next`]. This is sometimes a
145+
//! source of confusion when creating a stream solely for its side effects. The
146+
//! compiler will warn us about this kind of behavior:
147+
//!
148+
//! ```text
149+
//! warning: unused result that must be used: streams do nothing unless polled
150+
//! ```
151+
152+
mod stream;
153+
154+
pub use stream::{Next, Stream};

library/core/src/stream/stream/mod.rs

+129
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
mod next;
2+
3+
pub use next::Next;
4+
5+
use crate::ops::DerefMut;
6+
use crate::pin::Pin;
7+
use crate::task::{Context, Poll};
8+
9+
/// An interface for dealing with asynchronous iterators.
10+
///
11+
/// This is the main stream trait. For more about the concept of streams
12+
/// generally, please see the [module-level documentation]. In particular, you
13+
/// may want to know how to [implement `Stream`][impl].
14+
///
15+
/// [module-level documentation]: index.html
16+
/// [impl]: index.html#implementing-stream
17+
#[unstable(feature = "async_stream", issue = "79024")]
18+
#[must_use = "streams do nothing unless polled"]
19+
pub trait Stream {
20+
/// The type of items yielded by the stream.
21+
type Item;
22+
23+
/// Attempt to pull out the next value of this stream, registering the
24+
/// current task for wakeup if the value is not yet available, and returning
25+
/// `None` if the stream is exhausted.
26+
///
27+
/// # Return value
28+
///
29+
/// There are several possible return values, each indicating a distinct
30+
/// stream state:
31+
///
32+
/// - `Poll::Pending` means that this stream's next value is not ready
33+
/// yet. Implementations will ensure that the current task will be notified
34+
/// when the next value may be ready.
35+
///
36+
/// - `Poll::Ready(Some(val))` means that the stream has successfully
37+
/// produced a value, `val`, and may produce further values on subsequent
38+
/// `poll_next` calls.
39+
///
40+
/// - `Poll::Ready(None)` means that the stream has terminated, and
41+
/// `poll_next` should not be invoked again.
42+
///
43+
/// # Panics
44+
///
45+
/// Once a stream has finished (returned `Ready(None)` from `poll_next`), calling its
46+
/// `poll_next` method again may panic, block forever, or cause other kinds of
47+
/// problems; the `Stream` trait places no requirements on the effects of
48+
/// such a call. However, as the `poll_next` method is not marked `unsafe`,
49+
/// Rust's usual rules apply: calls must never cause undefined behavior
50+
/// (memory corruption, incorrect use of `unsafe` functions, or the like),
51+
/// regardless of the stream's state.
52+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
53+
54+
/// Returns the bounds on the remaining length of the stream.
55+
///
56+
/// Specifically, `size_hint()` returns a tuple where the first element
57+
/// is the lower bound, and the second element is the upper bound.
58+
///
59+
/// The second half of the tuple that is returned is an [`Option`]`<`[`usize`]`>`.
60+
/// A [`None`] here means that either there is no known upper bound, or the
61+
/// upper bound is larger than [`usize`].
62+
///
63+
/// # Implementation notes
64+
///
65+
/// It is not enforced that a stream implementation yields the declared
66+
/// number of elements. A buggy stream may yield less than the lower bound
67+
/// or more than the upper bound of elements.
68+
///
69+
/// `size_hint()` is primarily intended to be used for optimizations such as
70+
/// reserving space for the elements of the stream, but must not be
71+
/// trusted to e.g., omit bounds checks in unsafe code. An incorrect
72+
/// implementation of `size_hint()` should not lead to memory safety
73+
/// violations.
74+
///
75+
/// That said, the implementation should provide a correct estimation,
76+
/// because otherwise it would be a violation of the trait's protocol.
77+
///
78+
/// The default implementation returns `(0, `[`None`]`)` which is correct for any
79+
/// stream.
80+
#[inline]
81+
fn size_hint(&self) -> (usize, Option<usize>) {
82+
(0, None)
83+
}
84+
85+
/// Advances the stream and returns a future which yields the next value.
86+
///
87+
/// The returned future yields [`None`] when iteration is finished.
88+
/// Individual stream implementations may choose to resume iteration, and so
89+
/// calling `next()` again may or may not eventually start yielding
90+
/// [`Some(Item)`] again at some point.
91+
///
92+
/// [`Some(Item)`]: Some
93+
fn next(&mut self) -> Next<'_, Self>
94+
where
95+
Self: Unpin,
96+
{
97+
Next::new(self)
98+
}
99+
}
100+
101+
#[unstable(feature = "async_stream", issue = "79024")]
102+
impl<S: ?Sized + Stream + Unpin> Stream for &mut S {
103+
type Item = S::Item;
104+
105+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
106+
S::poll_next(Pin::new(&mut **self), cx)
107+
}
108+
109+
fn size_hint(&self) -> (usize, Option<usize>) {
110+
(**self).size_hint()
111+
}
112+
}
113+
114+
#[unstable(feature = "async_stream", issue = "79024")]
115+
impl<P> Stream for Pin<P>
116+
where
117+
P: DerefMut + Unpin,
118+
P::Target: Stream,
119+
{
120+
type Item = <P::Target as Stream>::Item;
121+
122+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
123+
self.get_mut().as_mut().poll_next(cx)
124+
}
125+
126+
fn size_hint(&self) -> (usize, Option<usize>) {
127+
(**self).size_hint()
128+
}
129+
}
+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
use crate::future::Future;
2+
use crate::pin::Pin;
3+
use crate::stream::Stream;
4+
use crate::task::{Context, Poll};
5+
6+
/// A future which advances the stream and returns the next value.
7+
///
8+
/// This `struct` is created by [`Stream::next`]. See its documentation for more.
9+
#[unstable(feature = "async_stream", issue = "79024")]
10+
#[derive(Debug)]
11+
#[must_use = "futures do nothing unless you `.await` or poll them"]
12+
pub struct Next<'a, S: ?Sized> {
13+
stream: &'a mut S,
14+
}
15+
16+
impl<'a, S: ?Sized> Next<'a, S> {
17+
/// Create a new instance of `Next`.
18+
pub(crate) fn new(stream: &'a mut S) -> Self {
19+
Self { stream }
20+
}
21+
}
22+
23+
#[unstable(feature = "async_stream", issue = "79024")]
24+
impl<S: Stream + Unpin + ?Sized> Future for Next<'_, S> {
25+
type Output = Option<S::Item>;
26+
27+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
28+
Pin::new(&mut *self.stream).poll_next(cx)
29+
}
30+
}

library/std/src/lib.rs

+3
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@
224224
#![feature(allocator_internals)]
225225
#![feature(allow_internal_unsafe)]
226226
#![feature(allow_internal_unstable)]
227+
#![feature(async_stream)]
227228
#![feature(arbitrary_self_types)]
228229
#![feature(array_error_internals)]
229230
#![feature(asm)]
@@ -448,6 +449,8 @@ pub use core::ptr;
448449
pub use core::raw;
449450
#[stable(feature = "rust1", since = "1.0.0")]
450451
pub use core::result;
452+
#[unstable(feature = "async_stream", issue = "79024")]
453+
pub use core::stream;
451454
#[stable(feature = "i128", since = "1.26.0")]
452455
#[allow(deprecated, deprecated_in_future)]
453456
pub use core::u128;

0 commit comments

Comments
 (0)