Skip to content

Commit 06e2d98

Browse files
Thomasdezeeuwcramertj
authored andcommitted
Use one centralised InterleavePending type for futures-test
1 parent 8ae07e7 commit 06e2d98

File tree

7 files changed

+53
-75
lines changed

7 files changed

+53
-75
lines changed

futures-test/src/io/interleave_pending.rs renamed to futures-test/src/interleave_pending.rs

Lines changed: 48 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,59 +1,85 @@
1+
use futures_core::stream::Stream;
12
use futures_io::{self as io, AsyncBufRead, AsyncRead, AsyncWrite};
23
use pin_utils::{unsafe_pinned, unsafe_unpinned};
34
use std::{
45
pin::Pin,
56
task::{Context, Poll},
67
};
78

8-
/// I/O wrapper for interleaving [`Poll::Pending`] in calls to read or write.
9+
/// Wrapper that interleaves [`Poll::Pending`] in calls to poll.
910
///
10-
/// See the [`interleave_pending`] and [`interleave_pending_write`] methods.
11-
///
12-
/// [`interleave_pending`]: super::AsyncReadTestExt::interleave_pending
13-
/// [`interleave_pending_write`]: super::AsyncWriteTestExt::interleave_pending_write
11+
/// See the `interleave_pending` methods on:
12+
/// * [`FutureTestExt`](crate::future::FutureTestExt::interleave_pending)
13+
/// * [`StreamTestExt`](crate::stream::StreamTestExt::interleave_pending)
14+
/// * [`AsyncReadTestExt`](crate::io::AsyncReadTestExt::interleave_pending)
15+
/// * [`AsyncWriteTestExt`](crate::io::AsyncWriteTestExt::interleave_pending_write)
1416
#[derive(Debug)]
15-
pub struct InterleavePending<Io> {
16-
io: Io,
17+
pub struct InterleavePending<T> {
18+
inner: T,
1719
pended: bool,
1820
}
1921

20-
impl<Io: Unpin> Unpin for InterleavePending<Io> {}
22+
impl<T: Unpin> Unpin for InterleavePending<T> {}
2123

22-
impl<Io> InterleavePending<Io> {
23-
unsafe_pinned!(io: Io);
24+
impl<T> InterleavePending<T> {
25+
unsafe_pinned!(inner: T);
2426
unsafe_unpinned!(pended: bool);
2527

26-
pub(crate) fn new(io: Io) -> Self {
27-
Self { io, pended: false }
28+
pub(crate) fn new(inner: T) -> Self {
29+
Self {
30+
inner,
31+
pended: false,
32+
}
2833
}
2934

3035
/// Acquires a reference to the underlying I/O object that this adaptor is
3136
/// wrapping.
32-
pub fn get_ref(&self) -> &Io {
33-
&self.io
37+
pub fn get_ref(&self) -> &T {
38+
&self.inner
3439
}
3540

3641
/// Acquires a mutable reference to the underlying I/O object that this
3742
/// adaptor is wrapping.
38-
pub fn get_mut(&mut self) -> &mut Io {
39-
&mut self.io
43+
pub fn get_mut(&mut self) -> &mut T {
44+
&mut self.inner
4045
}
4146

4247
/// Acquires a pinned mutable reference to the underlying I/O object that
4348
/// this adaptor is wrapping.
44-
pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut Io> {
49+
pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut T> {
4550
self.project().0
4651
}
4752

4853
/// Consumes this adaptor returning the underlying I/O object.
49-
pub fn into_inner(self) -> Io {
50-
self.io
54+
pub fn into_inner(self) -> T {
55+
self.inner
5156
}
5257

53-
fn project<'a>(self: Pin<&'a mut Self>) -> (Pin<&'a mut Io>, &'a mut bool) {
58+
fn project<'a>(self: Pin<&'a mut Self>) -> (Pin<&'a mut T>, &'a mut bool) {
5459
unsafe {
5560
let this = self.get_unchecked_mut();
56-
(Pin::new_unchecked(&mut this.io), &mut this.pended)
61+
(Pin::new_unchecked(&mut this.inner), &mut this.pended)
62+
}
63+
}
64+
}
65+
66+
impl<St: Stream> Stream for InterleavePending<St> {
67+
type Item = St::Item;
68+
69+
fn poll_next(
70+
mut self: Pin<&mut Self>,
71+
cx: &mut Context<'_>,
72+
) -> Poll<Option<Self::Item>> {
73+
if *self.as_mut().pended() {
74+
let next = self.as_mut().inner().poll_next(cx);
75+
if next.is_ready() {
76+
*self.pended() = false;
77+
}
78+
next
79+
} else {
80+
cx.waker().wake_by_ref();
81+
*self.pended() = true;
82+
Poll::Pending
5783
}
5884
}
5985
}
@@ -156,6 +182,6 @@ impl<R: AsyncBufRead> AsyncBufRead for InterleavePending<R> {
156182
}
157183

158184
fn consume(self: Pin<&mut Self>, amount: usize) {
159-
self.io().consume(amount)
185+
self.inner().consume(amount)
160186
}
161187
}

futures-test/src/io/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
//! Additional combinators for testing async IO.
22
3-
mod interleave_pending;
43
mod limited;
54

65
pub mod read;

futures-test/src/io/read/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
33
use futures_io::AsyncRead;
44

5-
pub use super::interleave_pending::InterleavePending;
65
pub use super::limited::Limited;
6+
pub use crate::interleave_pending::InterleavePending;
77

88
/// Additional combinators for testing async readers.
99
pub trait AsyncReadTestExt: AsyncRead {

futures-test/src/io/write/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
33
use futures_io::AsyncWrite;
44

5-
pub use super::interleave_pending::InterleavePending;
65
pub use super::limited::Limited;
6+
pub use crate::interleave_pending::InterleavePending;
77

88
/// Additional combinators for testing async writers.
99
pub trait AsyncWriteTestExt: AsyncWrite {

futures-test/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,5 @@ pub mod stream;
3434

3535
#[cfg(feature = "std")]
3636
pub mod io;
37+
38+
mod interleave_pending;

futures-test/src/stream/interleave_pending.rs

Lines changed: 0 additions & 48 deletions
This file was deleted.

futures-test/src/stream/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@
22
33
use futures_core::stream::Stream;
44

5-
mod interleave_pending;
6-
pub use self::interleave_pending::InterleavePending;
5+
pub use crate::interleave_pending::InterleavePending;
76

87
/// Additional combinators for testing streams.
98
pub trait StreamTestExt: Stream {

0 commit comments

Comments
 (0)