Skip to content

Commit b9d507d

Browse files
authored
Merge branch 'master' into 0.4-stream-try-fold-for-each
2 parents 5817ede + 5f8d523 commit b9d507d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

69 files changed

+1455
-1894
lines changed

.editorconfig

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
[*.rs]
2+
end_of_line = lf
3+
insert_final_newline = true
4+
charset = utf-8
5+
indent_style = space
6+
indent_size = 4

futures-channel/src/mpsc/mod.rs

+16-6
Original file line numberDiff line numberDiff line change
@@ -1021,8 +1021,10 @@ impl<T> Receiver<T> {
10211021
/// only when you've otherwise arranged to be notified when the channel is
10221022
/// no longer empty.
10231023
///
1024-
/// This function will panic if called after `try_next` or `poll_next` has
1025-
/// returned `None`.
1024+
/// This function returns:
1025+
/// * `Ok(Some(t))` when message is fetched
1026+
/// * `Ok(None)` when channel is closed and no messages left in the queue
1027+
/// * `Err(e)` when there are no messages available, but channel is not yet closed
10261028
pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
10271029
match self.next_message() {
10281030
Poll::Ready(msg) => {
@@ -1033,7 +1035,10 @@ impl<T> Receiver<T> {
10331035
}
10341036

10351037
fn next_message(&mut self) -> Poll<Option<T>> {
1036-
let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`");
1038+
let inner = match self.inner.as_mut() {
1039+
None => return Poll::Ready(None),
1040+
Some(inner) => inner,
1041+
};
10371042
// Pop off a message
10381043
match unsafe { inner.message_queue.pop_spin() } {
10391044
Some(msg) => {
@@ -1169,8 +1174,10 @@ impl<T> UnboundedReceiver<T> {
11691174
/// only when you've otherwise arranged to be notified when the channel is
11701175
/// no longer empty.
11711176
///
1172-
/// This function will panic if called after `try_next` or `poll_next` has
1173-
/// returned `None`.
1177+
/// This function returns:
1178+
/// * `Ok(Some(t))` when message is fetched
1179+
/// * `Ok(None)` when channel is closed and no messages left in the queue
1180+
/// * `Err(e)` when there are no messages available, but channel is not yet closed
11741181
pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
11751182
match self.next_message() {
11761183
Poll::Ready(msg) => {
@@ -1181,7 +1188,10 @@ impl<T> UnboundedReceiver<T> {
11811188
}
11821189

11831190
fn next_message(&mut self) -> Poll<Option<T>> {
1184-
let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`");
1191+
let inner = match self.inner.as_mut() {
1192+
None => return Poll::Ready(None),
1193+
Some(inner) => inner,
1194+
};
11851195
// Pop off a message
11861196
match unsafe { inner.message_queue.pop_spin() } {
11871197
Some(msg) => {

futures-channel/tests/mpsc-close.rs

+22
Original file line numberDiff line numberDiff line change
@@ -276,3 +276,25 @@ fn stress_try_send_as_receiver_closes() {
276276
bg.join()
277277
.expect("background thread join");
278278
}
279+
280+
#[test]
281+
fn unbounded_try_next_after_none() {
282+
let (tx, mut rx) = mpsc::unbounded::<String>();
283+
// Drop the sender, close the channel.
284+
drop(tx);
285+
// Receive the end of channel.
286+
assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
287+
// None received, check we can call `try_next` again.
288+
assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
289+
}
290+
291+
#[test]
292+
fn bounded_try_next_after_none() {
293+
let (tx, mut rx) = mpsc::channel::<String>(17);
294+
// Drop the sender, close the channel.
295+
drop(tx);
296+
// Receive the end of channel.
297+
assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
298+
// None received, check we can call `try_next` again.
299+
assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
300+
}

futures-util/src/future/join.rs

-117
Original file line numberDiff line numberDiff line change
@@ -76,15 +76,6 @@ macro_rules! generate {
7676
generate! {
7777
/// Future for the [`join`](join()) function.
7878
(Join, <Fut1, Fut2>),
79-
80-
/// Future for the [`join3`] function.
81-
(Join3, <Fut1, Fut2, Fut3>),
82-
83-
/// Future for the [`join4`] function.
84-
(Join4, <Fut1, Fut2, Fut3, Fut4>),
85-
86-
/// Future for the [`join5`] function.
87-
(Join5, <Fut1, Fut2, Fut3, Fut4, Fut5>),
8879
}
8980

9081
/// Joins the result of two futures, waiting for them both to complete.
@@ -116,111 +107,3 @@ where
116107
let f = Join::new(future1, future2);
117108
assert_future::<(Fut1::Output, Fut2::Output), _>(f)
118109
}
119-
120-
/// Same as [`join`](join()), but with more futures.
121-
///
122-
/// # Examples
123-
///
124-
/// ```
125-
/// # futures::executor::block_on(async {
126-
/// use futures::future;
127-
///
128-
/// let a = async { 1 };
129-
/// let b = async { 2 };
130-
/// let c = async { 3 };
131-
/// let tuple = future::join3(a, b, c);
132-
///
133-
/// assert_eq!(tuple.await, (1, 2, 3));
134-
/// # });
135-
/// ```
136-
pub fn join3<Fut1, Fut2, Fut3>(
137-
future1: Fut1,
138-
future2: Fut2,
139-
future3: Fut3,
140-
) -> Join3<Fut1, Fut2, Fut3>
141-
where
142-
Fut1: Future,
143-
Fut2: Future,
144-
Fut3: Future,
145-
{
146-
let f = Join3::new(future1, future2, future3);
147-
assert_future::<(Fut1::Output, Fut2::Output, Fut3::Output), _>(f)
148-
}
149-
150-
/// Same as [`join`](join()), but with more futures.
151-
///
152-
/// # Examples
153-
///
154-
/// ```
155-
/// # futures::executor::block_on(async {
156-
/// use futures::future;
157-
///
158-
/// let a = async { 1 };
159-
/// let b = async { 2 };
160-
/// let c = async { 3 };
161-
/// let d = async { 4 };
162-
/// let tuple = future::join4(a, b, c, d);
163-
///
164-
/// assert_eq!(tuple.await, (1, 2, 3, 4));
165-
/// # });
166-
/// ```
167-
pub fn join4<Fut1, Fut2, Fut3, Fut4>(
168-
future1: Fut1,
169-
future2: Fut2,
170-
future3: Fut3,
171-
future4: Fut4,
172-
) -> Join4<Fut1, Fut2, Fut3, Fut4>
173-
where
174-
Fut1: Future,
175-
Fut2: Future,
176-
Fut3: Future,
177-
Fut4: Future,
178-
{
179-
let f = Join4::new(future1, future2, future3, future4);
180-
assert_future::<(Fut1::Output, Fut2::Output, Fut3::Output, Fut4::Output), _>(f)
181-
}
182-
183-
/// Same as [`join`](join()), but with more futures.
184-
///
185-
/// # Examples
186-
///
187-
/// ```
188-
/// # futures::executor::block_on(async {
189-
/// use futures::future;
190-
///
191-
/// let a = async { 1 };
192-
/// let b = async { 2 };
193-
/// let c = async { 3 };
194-
/// let d = async { 4 };
195-
/// let e = async { 5 };
196-
/// let tuple = future::join5(a, b, c, d, e);
197-
///
198-
/// assert_eq!(tuple.await, (1, 2, 3, 4, 5));
199-
/// # });
200-
/// ```
201-
pub fn join5<Fut1, Fut2, Fut3, Fut4, Fut5>(
202-
future1: Fut1,
203-
future2: Fut2,
204-
future3: Fut3,
205-
future4: Fut4,
206-
future5: Fut5,
207-
) -> Join5<Fut1, Fut2, Fut3, Fut4, Fut5>
208-
where
209-
Fut1: Future,
210-
Fut2: Future,
211-
Fut3: Future,
212-
Fut4: Future,
213-
Fut5: Future,
214-
{
215-
let f = Join5::new(future1, future2, future3, future4, future5);
216-
assert_future::<
217-
(
218-
Fut1::Output,
219-
Fut2::Output,
220-
Fut3::Output,
221-
Fut4::Output,
222-
Fut5::Output,
223-
),
224-
_,
225-
>(f)
226-
}

futures-util/src/future/mod.rs

+2-4
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ mod ready;
7272
pub use self::ready::{err, ok, ready, Ready};
7373

7474
mod join;
75-
pub use self::join::{join, join3, join4, join5, Join, Join3, Join4, Join5};
75+
pub use self::join::{join, Join};
7676

7777
#[cfg(feature = "alloc")]
7878
mod join_all;
@@ -88,9 +88,7 @@ mod select_all;
8888
pub use self::select_all::{select_all, SelectAll};
8989

9090
mod try_join;
91-
pub use self::try_join::{
92-
try_join, try_join3, try_join4, try_join5, TryJoin, TryJoin3, TryJoin4, TryJoin5,
93-
};
91+
pub use self::try_join::{try_join, TryJoin};
9492

9593
#[cfg(feature = "alloc")]
9694
mod try_join_all;

futures-util/src/future/select_all.rs

+7
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,13 @@ pub fn select_all<I>(iter: I) -> SelectAll<I::Item>
4242
assert_future::<(<I::Item as Future>::Output, usize, Vec<I::Item>), _>(ret)
4343
}
4444

45+
impl<Fut> SelectAll<Fut> {
46+
/// Consumes this combinator, returning the underlying futures.
47+
pub fn into_inner(self) -> Vec<Fut> {
48+
self.inner
49+
}
50+
}
51+
4552
impl<Fut: Future + Unpin> Future for SelectAll<Fut> {
4653
type Output = (Fut::Output, usize, Vec<Fut>);
4754

futures-util/src/future/try_join.rs

-111
Original file line numberDiff line numberDiff line change
@@ -92,15 +92,6 @@ macro_rules! generate {
9292
generate! {
9393
/// Future for the [`try_join`](try_join()) function.
9494
(TryJoin, <Fut1, Fut2>),
95-
96-
/// Future for the [`try_join3`] function.
97-
(TryJoin3, <Fut1, Fut2, Fut3>),
98-
99-
/// Future for the [`try_join4`] function.
100-
(TryJoin4, <Fut1, Fut2, Fut3, Fut4>),
101-
102-
/// Future for the [`try_join5`] function.
103-
(TryJoin5, <Fut1, Fut2, Fut3, Fut4, Fut5>),
10495
}
10596

10697
/// Joins the result of two futures, waiting for them both to complete or
@@ -152,105 +143,3 @@ where
152143
{
153144
assert_future::<Result<(Fut1::Ok, Fut2::Ok), Fut1::Error>, _>(TryJoin::new(future1, future2))
154145
}
155-
156-
/// Same as [`try_join`](try_join()), but with more futures.
157-
///
158-
/// # Examples
159-
///
160-
/// ```
161-
/// # futures::executor::block_on(async {
162-
/// use futures::future;
163-
///
164-
/// let a = future::ready(Ok::<i32, i32>(1));
165-
/// let b = future::ready(Ok::<i32, i32>(2));
166-
/// let c = future::ready(Ok::<i32, i32>(3));
167-
/// let tuple = future::try_join3(a, b, c);
168-
///
169-
/// assert_eq!(tuple.await, Ok((1, 2, 3)));
170-
/// # });
171-
/// ```
172-
pub fn try_join3<Fut1, Fut2, Fut3>(
173-
future1: Fut1,
174-
future2: Fut2,
175-
future3: Fut3,
176-
) -> TryJoin3<Fut1, Fut2, Fut3>
177-
where
178-
Fut1: TryFuture,
179-
Fut2: TryFuture<Error = Fut1::Error>,
180-
Fut3: TryFuture<Error = Fut1::Error>,
181-
{
182-
assert_future::<Result<(Fut1::Ok, Fut2::Ok, Fut3::Ok), Fut1::Error>, _>(TryJoin3::new(
183-
future1, future2, future3,
184-
))
185-
}
186-
187-
/// Same as [`try_join`](try_join()), but with more futures.
188-
///
189-
/// # Examples
190-
///
191-
/// ```
192-
/// # futures::executor::block_on(async {
193-
/// use futures::future;
194-
///
195-
/// let a = future::ready(Ok::<i32, i32>(1));
196-
/// let b = future::ready(Ok::<i32, i32>(2));
197-
/// let c = future::ready(Ok::<i32, i32>(3));
198-
/// let d = future::ready(Ok::<i32, i32>(4));
199-
/// let tuple = future::try_join4(a, b, c, d);
200-
///
201-
/// assert_eq!(tuple.await, Ok((1, 2, 3, 4)));
202-
/// # });
203-
/// ```
204-
pub fn try_join4<Fut1, Fut2, Fut3, Fut4>(
205-
future1: Fut1,
206-
future2: Fut2,
207-
future3: Fut3,
208-
future4: Fut4,
209-
) -> TryJoin4<Fut1, Fut2, Fut3, Fut4>
210-
where
211-
Fut1: TryFuture,
212-
Fut2: TryFuture<Error = Fut1::Error>,
213-
Fut3: TryFuture<Error = Fut1::Error>,
214-
Fut4: TryFuture<Error = Fut1::Error>,
215-
{
216-
assert_future::<Result<(Fut1::Ok, Fut2::Ok, Fut3::Ok, Fut4::Ok), Fut1::Error>, _>(
217-
TryJoin4::new(future1, future2, future3, future4),
218-
)
219-
}
220-
221-
/// Same as [`try_join`](try_join()), but with more futures.
222-
///
223-
/// # Examples
224-
///
225-
/// ```
226-
/// # futures::executor::block_on(async {
227-
/// use futures::future;
228-
///
229-
/// let a = future::ready(Ok::<i32, i32>(1));
230-
/// let b = future::ready(Ok::<i32, i32>(2));
231-
/// let c = future::ready(Ok::<i32, i32>(3));
232-
/// let d = future::ready(Ok::<i32, i32>(4));
233-
/// let e = future::ready(Ok::<i32, i32>(5));
234-
/// let tuple = future::try_join5(a, b, c, d, e);
235-
///
236-
/// assert_eq!(tuple.await, Ok((1, 2, 3, 4, 5)));
237-
/// # });
238-
/// ```
239-
pub fn try_join5<Fut1, Fut2, Fut3, Fut4, Fut5>(
240-
future1: Fut1,
241-
future2: Fut2,
242-
future3: Fut3,
243-
future4: Fut4,
244-
future5: Fut5,
245-
) -> TryJoin5<Fut1, Fut2, Fut3, Fut4, Fut5>
246-
where
247-
Fut1: TryFuture,
248-
Fut2: TryFuture<Error = Fut1::Error>,
249-
Fut3: TryFuture<Error = Fut1::Error>,
250-
Fut4: TryFuture<Error = Fut1::Error>,
251-
Fut5: TryFuture<Error = Fut1::Error>,
252-
{
253-
assert_future::<Result<(Fut1::Ok, Fut2::Ok, Fut3::Ok, Fut4::Ok, Fut5::Ok), Fut1::Error>, _>(
254-
TryJoin5::new(future1, future2, future3, future4, future5),
255-
)
256-
}

futures-util/src/io/mod.rs

+11
Original file line numberDiff line numberDiff line change
@@ -601,6 +601,17 @@ pub trait AsyncSeekExt: AsyncSeek {
601601
{
602602
assert_future::<Result<u64>, _>(Seek::new(self, pos))
603603
}
604+
605+
/// Creates a future which will return the current seek position from the
606+
/// start of the stream.
607+
///
608+
/// This is equivalent to `self.seek(SeekFrom::Current(0))`.
609+
fn stream_position(&mut self) -> Seek<'_, Self>
610+
where
611+
Self: Unpin,
612+
{
613+
self.seek(SeekFrom::Current(0))
614+
}
604615
}
605616

606617
impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {}

0 commit comments

Comments
 (0)