Skip to content

Commit 806f228

Browse files
Nemo157cramertj
authored andcommitted
Add InterleavePending adaptor for AsyncWrite
1 parent 2645682 commit 806f228

File tree

3 files changed

+168
-0
lines changed

3 files changed

+168
-0
lines changed

futures-test/src/io/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,5 @@
33
pub mod read;
44
pub use read::AsyncReadTestExt;
55

6+
pub mod write;
7+
pub use write::AsyncWriteTestExt;
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
use futures_io::{self as io, AsyncWrite};
2+
use pin_utils::{unsafe_pinned, unsafe_unpinned};
3+
use std::{
4+
marker::Unpin,
5+
pin::Pin,
6+
task::{Context, Poll},
7+
};
8+
9+
/// Writer for the [`interleave_pending_write`](super::AsyncWriteTestExt::interleave_pending_write)
10+
/// method.
11+
#[derive(Debug)]
12+
pub struct InterleavePendingWrite<W: AsyncWrite> {
13+
writer: W,
14+
pended: bool,
15+
}
16+
17+
impl<W: AsyncWrite + Unpin> Unpin for InterleavePendingWrite<W> {}
18+
19+
impl<W: AsyncWrite> InterleavePendingWrite<W> {
20+
unsafe_pinned!(writer: W);
21+
unsafe_unpinned!(pended: bool);
22+
23+
pub(crate) fn new(writer: W) -> Self {
24+
Self {
25+
writer,
26+
pended: false,
27+
}
28+
}
29+
30+
/// Acquires a reference to the underlying writer that this adaptor is wrapping.
31+
pub fn get_ref(&self) -> &W {
32+
&self.writer
33+
}
34+
35+
/// Acquires a mutable reference to the underlying writer that this adaptor is wrapping.
36+
pub fn get_mut(&mut self) -> &mut W {
37+
&mut self.writer
38+
}
39+
40+
/// Acquires a pinned mutable reference to the underlying writer that this adaptor is wrapping.
41+
pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut W> {
42+
self.project().0
43+
}
44+
45+
/// Consumes this adaptor returning the underlying writer.
46+
pub fn into_inner(self) -> W {
47+
self.writer
48+
}
49+
50+
fn project<'a>(self: Pin<&'a mut Self>) -> (Pin<&'a mut W>, &'a mut bool) {
51+
unsafe {
52+
let this = self.get_unchecked_mut();
53+
(Pin::new_unchecked(&mut this.writer), &mut this.pended)
54+
}
55+
}
56+
}
57+
58+
impl<W: AsyncWrite> AsyncWrite for InterleavePendingWrite<W> {
59+
fn poll_write(
60+
self: Pin<&mut Self>,
61+
cx: &mut Context<'_>,
62+
buf: &[u8],
63+
) -> Poll<io::Result<usize>> {
64+
let (writer, pended) = self.project();
65+
if *pended {
66+
let next = writer.poll_write(cx, buf);
67+
if next.is_ready() {
68+
*pended = false;
69+
}
70+
next
71+
} else {
72+
cx.waker().wake_by_ref();
73+
*pended = true;
74+
Poll::Pending
75+
}
76+
}
77+
78+
fn poll_flush(
79+
self: Pin<&mut Self>,
80+
cx: &mut Context<'_>,
81+
) -> Poll<io::Result<()>> {
82+
let (writer, pended) = self.project();
83+
if *pended {
84+
let next = writer.poll_flush(cx);
85+
if next.is_ready() {
86+
*pended = false;
87+
}
88+
next
89+
} else {
90+
cx.waker().wake_by_ref();
91+
*pended = true;
92+
Poll::Pending
93+
}
94+
}
95+
96+
fn poll_close(
97+
self: Pin<&mut Self>,
98+
cx: &mut Context<'_>,
99+
) -> Poll<io::Result<()>> {
100+
let (writer, pended) = self.project();
101+
if *pended {
102+
let next = writer.poll_close(cx);
103+
if next.is_ready() {
104+
*pended = false;
105+
}
106+
next
107+
} else {
108+
cx.waker().wake_by_ref();
109+
*pended = true;
110+
Poll::Pending
111+
}
112+
}
113+
}

futures-test/src/io/write/mod.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
//! Additional combinators for testing async writers.
2+
3+
use futures_io::AsyncWrite;
4+
5+
mod interleave_pending_write;
6+
pub use self::interleave_pending_write::InterleavePendingWrite;
7+
8+
/// Additional combinators for testing async writers.
9+
pub trait AsyncWriteTestExt: AsyncWrite {
10+
/// Introduces an extra [`Poll::Pending`](futures_core::task::Poll::Pending)
11+
/// in between each operation on the writer.
12+
///
13+
/// # Examples
14+
///
15+
/// ```
16+
/// #![feature(async_await)]
17+
/// use futures::task::Poll;
18+
/// use futures::io::AsyncWrite;
19+
/// use futures_test::task::noop_context;
20+
/// use futures_test::io::AsyncWriteTestExt;
21+
/// use pin_utils::pin_mut;
22+
///
23+
/// let writer = std::io::Cursor::new([0u8; 4]).interleave_pending_write();
24+
/// pin_mut!(writer);
25+
///
26+
/// let mut cx = noop_context();
27+
///
28+
/// assert_eq!(writer.as_mut().poll_write(&mut cx, &[1, 2])?, Poll::Pending);
29+
/// assert_eq!(writer.as_mut().poll_write(&mut cx, &[1, 2])?, Poll::Ready(2));
30+
/// assert_eq!(writer.get_ref().get_ref(), &[1, 2, 0, 0]);
31+
/// assert_eq!(writer.as_mut().poll_write(&mut cx, &[3, 4])?, Poll::Pending);
32+
/// assert_eq!(writer.as_mut().poll_write(&mut cx, &[3, 4])?, Poll::Ready(2));
33+
/// assert_eq!(writer.get_ref().get_ref(), &[1, 2, 3, 4]);
34+
/// assert_eq!(writer.as_mut().poll_write(&mut cx, &[5, 6])?, Poll::Pending);
35+
/// assert_eq!(writer.as_mut().poll_write(&mut cx, &[5, 6])?, Poll::Ready(0));
36+
///
37+
/// assert_eq!(writer.as_mut().poll_flush(&mut cx)?, Poll::Pending);
38+
/// assert_eq!(writer.as_mut().poll_flush(&mut cx)?, Poll::Ready(()));
39+
///
40+
/// assert_eq!(writer.as_mut().poll_close(&mut cx)?, Poll::Pending);
41+
/// assert_eq!(writer.as_mut().poll_close(&mut cx)?, Poll::Ready(()));
42+
///
43+
/// # Ok::<(), std::io::Error>(())
44+
/// ```
45+
fn interleave_pending_write(self) -> InterleavePendingWrite<Self>
46+
where
47+
Self: Sized,
48+
{
49+
InterleavePendingWrite::new(self)
50+
}
51+
}
52+
53+
impl<W> AsyncWriteTestExt for W where W: AsyncWrite {}

0 commit comments

Comments
 (0)