-
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
fix(http1): poll_loop writes when ready #3952
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
lthiery
wants to merge
2
commits into
hyperium:master
Choose a base branch
from
lthiery:lthiery/wants-write-again
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+270
−2
Open
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,249 @@ | ||
| use http_body_util::StreamBody; | ||
| use hyper::body::Bytes; | ||
| use hyper::body::Frame; | ||
| use hyper::rt::{Read, ReadBufCursor, Write}; | ||
| use hyper::server::conn::http1; | ||
| use hyper::service::service_fn; | ||
| use hyper::{Response, StatusCode}; | ||
| use pin_project_lite::pin_project; | ||
| use std::convert::Infallible; | ||
| use std::io; | ||
| use std::pin::Pin; | ||
| use std::task::{ready, Context, Poll}; | ||
| use tokio::sync::mpsc; | ||
| use tracing::{error, info}; | ||
|
|
||
| pin_project! { | ||
| #[derive(Debug)] | ||
| pub struct TxReadyStream { | ||
| #[pin] | ||
| read_rx: mpsc::UnboundedReceiver<Vec<u8>>, | ||
| write_tx: mpsc::UnboundedSender<Vec<u8>>, | ||
| read_buffer: Vec<u8>, | ||
| poll_since_write:bool, | ||
| flush_count: usize, | ||
| panic_task: Option<tokio::task::JoinHandle<()>>, | ||
| } | ||
| } | ||
|
|
||
| impl TxReadyStream { | ||
| fn new( | ||
| read_rx: mpsc::UnboundedReceiver<Vec<u8>>, | ||
| write_tx: mpsc::UnboundedSender<Vec<u8>>, | ||
| ) -> Self { | ||
| Self { | ||
| read_rx, | ||
| write_tx, | ||
| read_buffer: Vec::new(), | ||
| poll_since_write: true, | ||
| flush_count: 0, | ||
| panic_task: None, | ||
| } | ||
| } | ||
|
|
||
| /// Create a new pair of connected ReadyStreams. Returns two streams that are connected to each other. | ||
| fn new_pair() -> (Self, Self) { | ||
| let (s1_tx, s2_rx) = mpsc::unbounded_channel(); | ||
| let (s2_tx, s1_rx) = mpsc::unbounded_channel(); | ||
| let s1 = Self::new(s1_rx, s1_tx); | ||
| let s2 = Self::new(s2_rx, s2_tx); | ||
| (s1, s2) | ||
| } | ||
|
|
||
| /// Send data to the other end of the stream (this will be available for reading on the other stream) | ||
| fn send(&self, data: &[u8]) -> Result<(), mpsc::error::SendError<Vec<u8>>> { | ||
| self.write_tx.send(data.to_vec()) | ||
| } | ||
|
|
||
| /// Receive data written to this stream by the other end (async) | ||
| async fn recv(&mut self) -> Option<Vec<u8>> { | ||
| self.read_rx.recv().await | ||
| } | ||
| } | ||
|
|
||
| impl Read for TxReadyStream { | ||
| fn poll_read( | ||
| mut self: Pin<&mut Self>, | ||
| cx: &mut Context<'_>, | ||
| mut buf: ReadBufCursor<'_>, | ||
| ) -> Poll<io::Result<()>> { | ||
| let mut this = self.as_mut().project(); | ||
|
|
||
| // First, try to satisfy the read request from the internal buffer | ||
| if !this.read_buffer.is_empty() { | ||
| let to_read = std::cmp::min(this.read_buffer.len(), buf.remaining()); | ||
| // Copy data from internal buffer to the read buffer | ||
| buf.put_slice(&this.read_buffer[..to_read]); | ||
| // Remove the consumed data from the internal buffer | ||
| this.read_buffer.drain(..to_read); | ||
| return Poll::Ready(Ok(())); | ||
| } | ||
|
|
||
| // If internal buffer is empty, try to get data from the channel | ||
| match this.read_rx.try_recv() { | ||
| Ok(data) => { | ||
| // Copy as much data as we can fit in the buffer | ||
| let to_read = std::cmp::min(data.len(), buf.remaining()); | ||
| buf.put_slice(&data[..to_read]); | ||
|
|
||
| // Store any remaining data in the internal buffer for next time | ||
| if to_read < data.len() { | ||
| let remaining = &data[to_read..]; | ||
| this.read_buffer.extend_from_slice(remaining); | ||
| } | ||
| Poll::Ready(Ok(())) | ||
| } | ||
| Err(mpsc::error::TryRecvError::Empty) => { | ||
| match ready!(this.read_rx.poll_recv(cx)) { | ||
| Some(data) => { | ||
| // Copy as much data as we can fit in the buffer | ||
| let to_read = std::cmp::min(data.len(), buf.remaining()); | ||
| buf.put_slice(&data[..to_read]); | ||
|
|
||
| // Store any remaining data in the internal buffer for next time | ||
| if to_read < data.len() { | ||
| let remaining = &data[to_read..]; | ||
| this.read_buffer.extend_from_slice(remaining); | ||
| } | ||
| Poll::Ready(Ok(())) | ||
| } | ||
| None => Poll::Ready(Ok(())), | ||
| } | ||
| } | ||
| Err(mpsc::error::TryRecvError::Disconnected) => { | ||
| // Channel closed, return EOF | ||
| Poll::Ready(Ok(())) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl Write for TxReadyStream { | ||
| fn poll_write( | ||
| mut self: Pin<&mut Self>, | ||
| _cx: &mut Context<'_>, | ||
| buf: &[u8], | ||
| ) -> Poll<io::Result<usize>> { | ||
| if !self.poll_since_write { | ||
| return Poll::Pending; | ||
| } | ||
| self.poll_since_write = false; | ||
| let this = self.project(); | ||
| let buf = Vec::from(&buf[..buf.len()]); | ||
| let len = buf.len(); | ||
|
|
||
| // Send data through the channel - this should always be ready for unbounded channels | ||
| match this.write_tx.send(buf) { | ||
| Ok(_) => { | ||
| // Increment write count | ||
| Poll::Ready(Ok(len)) | ||
| } | ||
| Err(_) => { | ||
| error!("ReadyStream::poll_write failed - channel closed"); | ||
| Poll::Ready(Err(io::Error::new( | ||
| io::ErrorKind::BrokenPipe, | ||
| "Write channel closed", | ||
| ))) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| fn poll_flush(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { | ||
| self.flush_count += 1; | ||
| // We require two flushes to complete each chunk, simulating a success at the end of the old | ||
| // poll loop. After all chunks are written, we always succeed on flush to allow for finish. | ||
| if self.flush_count % 2 != 0 && self.flush_count < TOTAL_CHUNKS * 2 { | ||
| // Spawn panic task if not already spawned | ||
| if self.panic_task.is_none() { | ||
| let task = tokio::spawn(async { | ||
| tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; | ||
| }); | ||
| self.panic_task = Some(task); | ||
| } | ||
| return Poll::Pending; | ||
| } | ||
|
|
||
| // Abort the panic task if it exists | ||
| if let Some(task) = self.panic_task.take() { | ||
| info!("Task polled to completion. Aborting panic (aka waker stand-in task)."); | ||
| task.abort(); | ||
| } | ||
|
|
||
| self.poll_since_write = true; | ||
| Poll::Ready(Ok(())) | ||
| } | ||
|
|
||
| fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { | ||
| Poll::Ready(Ok(())) | ||
| } | ||
| } | ||
|
|
||
| fn init_tracing() { | ||
| use std::sync::Once; | ||
| static INIT: Once = Once::new(); | ||
| INIT.call_once(|| { | ||
| tracing_subscriber::fmt() | ||
| .with_max_level(tracing::Level::INFO) | ||
| .with_target(true) | ||
| .with_thread_ids(true) | ||
| .with_thread_names(true) | ||
| .init(); | ||
| }); | ||
| } | ||
|
|
||
| const TOTAL_CHUNKS: usize = 16; | ||
|
|
||
| #[tokio::test(flavor = "multi_thread", worker_threads = 2)] | ||
| async fn body_test() { | ||
| init_tracing(); | ||
| // Create a pair of connected streams | ||
| let (server_stream, mut client_stream) = TxReadyStream::new_pair(); | ||
|
|
||
| let mut http_builder = http1::Builder::new(); | ||
| http_builder.max_buf_size(CHUNK_SIZE); | ||
| const CHUNK_SIZE: usize = 64 * 1024; | ||
| let service = service_fn(|_| async move { | ||
| info!( | ||
| "Creating payload of {} chunks of {} KiB each ({} MiB total)...", | ||
| TOTAL_CHUNKS, | ||
| CHUNK_SIZE / 1024, | ||
| TOTAL_CHUNKS * CHUNK_SIZE / (1024 * 1024) | ||
| ); | ||
| let bytes = Bytes::from(vec![0; CHUNK_SIZE]); | ||
| let data = vec![bytes.clone(); TOTAL_CHUNKS]; | ||
| let stream = futures_util::stream::iter( | ||
| data.into_iter() | ||
| .map(|b| Ok::<_, Infallible>(Frame::data(b))), | ||
| ); | ||
| let body = StreamBody::new(stream); | ||
| info!("Server: Sending data response..."); | ||
| Ok::<_, hyper::Error>( | ||
| Response::builder() | ||
| .status(StatusCode::OK) | ||
| .header("content-type", "application/octet-stream") | ||
| .header("content-length", (TOTAL_CHUNKS * CHUNK_SIZE).to_string()) | ||
| .body(body) | ||
| .unwrap(), | ||
| ) | ||
| }); | ||
|
|
||
| let server_task = tokio::spawn(async move { | ||
| let conn = http_builder.serve_connection(server_stream, service); | ||
| if let Err(e) = conn.await { | ||
| error!("Server connection error: {}", e); | ||
| } | ||
| }); | ||
|
|
||
| let get_request = "GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n"; | ||
| client_stream.send(get_request.as_bytes()).unwrap(); | ||
|
|
||
| info!("Client is reading response..."); | ||
| let mut bytes_received = 0; | ||
| while let Some(chunk) = client_stream.recv().await { | ||
| bytes_received += chunk.len(); | ||
| } | ||
| // Clean up | ||
| server_task.abort(); | ||
|
|
||
| info!(bytes_received, "Client done receiving bytes"); | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think my only comment for this fix is that if possible, a simpler unit test would be easier to understand and maintain in the future. Can that be done?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a little difficult to simplify because I needed to create a future that would deterministically trigger this edge case.
What's "weird" about this future mock is that it can return Poll::Pending once, but then reach completion on a future poll without the waker being woken, so I don't think it ever happens in Tokio for example but it does happen (probabilistically) with my actual futures that this is modeling. And as far as I know it's no violation of the Futures contract to do so.
Anyway, I'm not sure I can create the situation with less or more simple code. That being said, maybe a fixture like this is generalizable for the project? Creating a "connection" that's purely channel based software could allow other edge cases to be explored. Partial reads and writes or different sequence of readiness.
All that being said, I'll still take another critical pass at what I've done here next week and I'll see what I can do to simplify.