From cb2cab39e946183877b7b9d8036bf78312fe7b0e Mon Sep 17 00:00:00 2001 From: Oliver Bunting <72926894+ollie-etl@users.noreply.github.com> Date: Tue, 13 Feb 2024 10:46:03 +0000 Subject: [PATCH 1/8] Replace BufResult with BufError --- src/lib.rs | 35 ++++++++++++++++++++++++++++++----- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index d1cc6e02..47aaecba 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -84,6 +84,8 @@ pub use runtime::spawn; pub use runtime::Runtime; use crate::runtime::driver::op::Op; +use std::error::Error; +use std::fmt::{Debug, Display}; use std::future::Future; /// Starts an `io_uring` enabled Tokio runtime. @@ -237,8 +239,7 @@ impl Builder { /// /// This type is used as a return value for asynchronous `io-uring` methods that /// require passing ownership of a buffer to the runtime. When the operation -/// completes, the buffer is returned whether or not the operation completed -/// successfully. +/// completes, the buffer is returned both in the success tuple and as part of the error. /// /// # Examples /// @@ -254,8 +255,7 @@ impl Builder { /// // Read some data, the buffer is passed by ownership and /// // submitted to the kernel. When the operation completes, /// // we get the buffer back. -/// let (res, buf) = file.read_at(buf, 0).await; -/// let n = res?; +/// let (n, buf) = file.read_at(buf, 0).await?; /// /// // Display the contents /// println!("{:?}", &buf[..n]); @@ -264,7 +264,32 @@ impl Builder { /// }) /// } /// ``` -pub type BufResult = (std::io::Result, B); +/// A specialized `Error` type for `io-uring` operations with buffers. +#[derive(Debug)] +pub struct BufError(pub std::io::Error, pub B); + +impl Display for BufError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Debug::fmt(&self.0, f) + } +} + +impl Error for BufError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + Some(&self.0) + } +} + +impl BufError { + /// Applies a function to the contained buffer, returning a new `BufError`. + pub fn map(self, f: F) -> BufError + where + F: FnOnce(B) -> U, + { + BufError(self.0, f(self.1)) + } +} + /// The simplest possible operation. Just posts a completion event, nothing else. /// From d84e764655c6c1633137e33831aaba445107e9cd Mon Sep 17 00:00:00 2001 From: Oliver Bunting <72926894+ollie-etl@users.noreply.github.com> Date: Tue, 13 Feb 2024 12:03:08 +0000 Subject: [PATCH 2/8] Change Result types --- examples/cat.rs | 4 +- examples/mix.rs | 7 +- examples/tcp_listener.rs | 6 +- examples/tcp_listener_fixed_buffers.rs | 7 +- examples/tcp_stream.rs | 7 +- examples/udp_socket.rs | 7 +- examples/unix_listener.rs | 7 +- examples/unix_stream.rs | 7 +- examples/wrk-bench.rs | 3 +- src/fs/file.rs | 127 +++++++++++-------------- src/io/read.rs | 9 +- src/io/read_fixed.rs | 13 ++- src/io/readv.rs | 9 +- src/io/recv_from.rs | 9 +- src/io/recvmsg.rs | 9 +- src/io/send_to.rs | 9 +- src/io/send_zc.rs | 9 +- src/io/socket.rs | 71 +++++++------- src/io/write.rs | 9 +- src/io/write_fixed.rs | 9 +- src/io/writev.rs | 9 +- src/io/writev_all.rs | 6 +- src/lib.rs | 42 ++++++-- src/net/tcp/stream.rs | 12 +-- src/net/udp.rs | 16 ++-- src/net/unix/stream.rs | 12 +-- tests/driver.rs | 2 - tests/fixed_buf.rs | 26 +++-- tests/fs_file.rs | 33 +++---- 29 files changed, 253 insertions(+), 243 deletions(-) diff --git a/examples/cat.rs b/examples/cat.rs index d41b7ab1..824f988b 100644 --- a/examples/cat.rs +++ b/examples/cat.rs @@ -29,9 +29,7 @@ fn main() { loop { // Read a chunk - let (res, b) = file.read_at(buf, pos).await; - let n = res.unwrap(); - + let (n, b) = file.read_at(buf, pos).await.unwrap(); if n == 0 { break; } diff --git a/examples/mix.rs b/examples/mix.rs index 4e094019..a0b99995 100644 --- a/examples/mix.rs +++ b/examples/mix.rs @@ -34,15 +34,14 @@ fn main() { loop { // Read a chunk - let (res, b) = file.read_at(buf, pos).await; - let n = res.unwrap(); + let (n, b) = file.read_at(buf, pos).await.unwrap(); if n == 0 { break; } - let (res, b) = socket.write(b).submit().await; - pos += res.unwrap() as u64; + let (n, b) = socket.write(b).submit().await.unwrap(); + pos += n as u64; buf = b; } diff --git a/examples/tcp_listener.rs b/examples/tcp_listener.rs index 918503ca..45175f16 100644 --- a/examples/tcp_listener.rs +++ b/examples/tcp_listener.rs @@ -29,16 +29,14 @@ fn main() { let mut buf = vec![0u8; 4096]; loop { - let (result, nbuf) = stream.read(buf).await; + let (read, nbuf) = stream.read(buf).await.unwrap(); buf = nbuf; - let read = result.unwrap(); if read == 0 { println!("{} closed, {} total ping-ponged", socket_addr, n); break; } - let (res, slice) = stream.write_all(buf.slice(..read)).await; - let _ = res.unwrap(); + let (_, slice) = stream.write_all(buf.slice(..read)).await.unwrap(); buf = slice.into_inner(); println!("{} all {} bytes ping-ponged", socket_addr, read); n += read; diff --git a/examples/tcp_listener_fixed_buffers.rs b/examples/tcp_listener_fixed_buffers.rs index 69db2c8e..4be6f741 100644 --- a/examples/tcp_listener_fixed_buffers.rs +++ b/examples/tcp_listener_fixed_buffers.rs @@ -79,17 +79,14 @@ async fn echo_handler( // Each time through the loop, use fbuf and then get it back for the next // iteration. - let (result, fbuf1) = stream.read_fixed(fbuf).await; + let (read, fbuf1) = stream.read_fixed(fbuf).await.unwrap(); fbuf = { - let read = result.unwrap(); if read == 0 { break; } assert_eq!(4096, fbuf1.len()); // To prove a point. - let (res, nslice) = stream.write_fixed_all(fbuf1.slice(..read)).await; - - let _ = res.unwrap(); + let (_, nslice) = stream.write_fixed_all(fbuf1.slice(..read)).await.unwrap(); println!("peer {} all {} bytes ping-ponged", peer, read); n += read; diff --git a/examples/tcp_stream.rs b/examples/tcp_stream.rs index 4983ee4c..9b3d4a3a 100644 --- a/examples/tcp_stream.rs +++ b/examples/tcp_stream.rs @@ -15,11 +15,10 @@ fn main() { let stream = TcpStream::connect(socket_addr).await.unwrap(); let buf = vec![1u8; 128]; - let (result, buf) = stream.write(buf).submit().await; - println!("written: {}", result.unwrap()); + let (n, buf) = stream.write(buf).submit().await.unwrap(); + println!("written: {}", n); - let (result, buf) = stream.read(buf).await; - let read = result.unwrap(); + let (read, buf) = stream.read(buf).await.unwrap(); println!("read: {:?}", &buf[..read]); }); } diff --git a/examples/udp_socket.rs b/examples/udp_socket.rs index 01eb9b32..ddbf2138 100644 --- a/examples/udp_socket.rs +++ b/examples/udp_socket.rs @@ -15,12 +15,11 @@ fn main() { let buf = vec![0u8; 128]; - let (result, mut buf) = socket.recv_from(buf).await; - let (read, socket_addr) = result.unwrap(); + let ((read, socket_addr), mut buf) = socket.recv_from(buf).await.unwrap(); buf.resize(read, 0); println!("received from {}: {:?}", socket_addr, &buf[..]); - let (result, _buf) = socket.send_to(buf, socket_addr).await; - println!("sent to {}: {}", socket_addr, result.unwrap()); + let (n, _buf) = socket.send_to(buf, socket_addr).await.unwrap(); + println!("sent to {}: {}", socket_addr, n); }); } diff --git a/examples/unix_listener.rs b/examples/unix_listener.rs index 9e10496d..54261db8 100644 --- a/examples/unix_listener.rs +++ b/examples/unix_listener.rs @@ -20,11 +20,10 @@ fn main() { tokio_uring::spawn(async move { let buf = vec![1u8; 128]; - let (result, buf) = stream.write(buf).submit().await; - println!("written to {}: {}", &socket_addr, result.unwrap()); + let (n, buf) = stream.write(buf).submit().await.unwrap(); + println!("written to {}: {}", &socket_addr, n); - let (result, buf) = stream.read(buf).await; - let read = result.unwrap(); + let (read, buf) = stream.read(buf).await.unwrap(); println!("read from {}: {:?}", &socket_addr, &buf[..read]); }); } diff --git a/examples/unix_stream.rs b/examples/unix_stream.rs index 7caf06f9..5618f4f1 100644 --- a/examples/unix_stream.rs +++ b/examples/unix_stream.rs @@ -15,11 +15,10 @@ fn main() { let stream = UnixStream::connect(socket_addr).await.unwrap(); let buf = vec![1u8; 128]; - let (result, buf) = stream.write(buf).submit().await; - println!("written: {}", result.unwrap()); + let (n, buf) = stream.write(buf).submit().await.unwrap(); + println!("written: {}", n); - let (result, buf) = stream.read(buf).await; - let read = result.unwrap(); + let (read, buf) = stream.read(buf).await.unwrap(); println!("read: {:?}", &buf[..read]); }); } diff --git a/examples/wrk-bench.rs b/examples/wrk-bench.rs index 222df76a..9a9047e9 100644 --- a/examples/wrk-bench.rs +++ b/examples/wrk-bench.rs @@ -21,8 +21,7 @@ fn main() -> io::Result<()> { let (stream, _) = listener.accept().await?; tokio_uring::spawn(async move { - let (result, _) = stream.write(RESPONSE).submit().await; - + let result = stream.write(RESPONSE).submit().await; if let Err(err) = result { eprintln!("Client connection failed: {}", err); } diff --git a/src/fs/file.rs b/src/fs/file.rs index 9cd47f21..68dab6a1 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -4,6 +4,7 @@ use crate::fs::OpenOptions; use crate::io::SharedFd; use crate::runtime::driver::op::Op; +use crate::sealed::MapResultBuf; use crate::{UnsubmittedOneshot, UnsubmittedWrite}; use std::fmt; use std::io; @@ -176,7 +177,7 @@ impl File { /// }) /// } /// ``` - pub async fn read_at(&self, buf: T, pos: u64) -> crate::BufResult { + pub async fn read_at(&self, buf: T, pos: u64) -> crate::Result { // Submit the read operation let op = Op::read_at(&self.fd, buf, pos).unwrap(); op.await @@ -231,7 +232,7 @@ impl File { &self, bufs: Vec, pos: u64, - ) -> crate::BufResult> { + ) -> crate::Result> { // Submit the read operation let op = Op::readv_at(&self.fd, bufs, pos).unwrap(); op.await @@ -288,7 +289,7 @@ impl File { &self, buf: Vec, pos: u64, - ) -> crate::BufResult> { + ) -> crate::Result> { let op = Op::writev_at(&self.fd, buf, pos).unwrap(); op.await } @@ -341,7 +342,7 @@ impl File { &self, buf: Vec, pos: Option, // Use None for files that can't seek - ) -> crate::BufResult> { + ) -> crate::Result> { let op = crate::io::writev_at_all(&self.fd, buf, pos); op.await } @@ -392,43 +393,37 @@ impl File { /// ``` /// /// [`ErrorKind::UnexpectedEof`]: std::io::ErrorKind::UnexpectedEof - pub async fn read_exact_at(&self, buf: T, pos: u64) -> crate::BufResult<(), T> + pub async fn read_exact_at(&self, buf: T, pos: u64) -> crate::Result<(), T> where T: BoundedBufMut, { let orig_bounds = buf.bounds(); - let (res, buf) = self.read_exact_slice_at(buf.slice_full(), pos).await; - (res, T::from_buf_bounds(buf, orig_bounds)) + self.read_exact_slice_at(buf.slice_full(), pos) + .await + .map_buf(|buf| T::from_buf_bounds(buf, orig_bounds)) } async fn read_exact_slice_at( &self, mut buf: Slice, mut pos: u64, - ) -> crate::BufResult<(), T> { + ) -> crate::Result<(), T> { if pos.checked_add(buf.bytes_total() as u64).is_none() { - return ( - Err(io::Error::new( - io::ErrorKind::InvalidInput, - "buffer too large for file", - )), + return Err(crate::Error( + io::Error::new(io::ErrorKind::InvalidInput, "buffer too large for file"), buf.into_inner(), - ); + )); } while buf.bytes_total() != 0 { - let (res, slice) = self.read_at(buf, pos).await; - match res { - Ok(0) => { - return ( - Err(io::Error::new( - io::ErrorKind::UnexpectedEof, - "failed to fill whole buffer", - )), + match self.read_at(buf, pos).await { + Ok((0, slice)) => { + return Err(crate::Error( + io::Error::new(io::ErrorKind::UnexpectedEof, "failed to fill whole buffer"), slice.into_inner(), - ) + )) } - Ok(n) => { + Ok((n, slice)) => { pos += n as u64; buf = slice.slice(n..); } @@ -437,11 +432,11 @@ impl File { // crate's design ensures we are not calling the 'wait' option // in the ENTER syscall. Only an Enter with 'wait' can generate // an EINTR according to the io_uring man pages. - Err(e) => return (Err(e), slice.into_inner()), + Err(e) => return Err(e.map(|slice| slice.into_inner())), }; } - (Ok(()), buf.into_inner()) + Ok(((), buf.into_inner())) } /// Like [`read_at`], but using a pre-mapped buffer @@ -484,7 +479,7 @@ impl File { /// }) ///# } /// ``` - pub async fn read_fixed_at(&self, buf: T, pos: u64) -> crate::BufResult + pub async fn read_fixed_at(&self, buf: T, pos: u64) -> crate::Result where T: BoundedBufMut, { @@ -584,43 +579,37 @@ impl File { /// ``` /// /// [`write_at`]: File::write_at - pub async fn write_all_at(&self, buf: T, pos: u64) -> crate::BufResult<(), T> + pub async fn write_all_at(&self, buf: T, pos: u64) -> crate::Result<(), T> where T: BoundedBuf, { let orig_bounds = buf.bounds(); - let (res, buf) = self.write_all_slice_at(buf.slice_full(), pos).await; - (res, T::from_buf_bounds(buf, orig_bounds)) + self.write_all_slice_at(buf.slice_full(), pos) + .await + .map_buf(|buf| T::from_buf_bounds(buf, orig_bounds)) } async fn write_all_slice_at( &self, mut buf: Slice, mut pos: u64, - ) -> crate::BufResult<(), T> { + ) -> crate::Result<(), T> { if pos.checked_add(buf.bytes_init() as u64).is_none() { - return ( - Err(io::Error::new( - io::ErrorKind::InvalidInput, - "buffer too large for file", - )), + return Err(crate::Error( + io::Error::new(io::ErrorKind::InvalidInput, "buffer too large for file"), buf.into_inner(), - ); + )); } while buf.bytes_init() != 0 { - let (res, slice) = self.write_at(buf, pos).submit().await; - match res { - Ok(0) => { - return ( - Err(io::Error::new( - io::ErrorKind::WriteZero, - "failed to write whole buffer", - )), + match self.write_at(buf, pos).submit().await { + Ok((0, slice)) => { + return Err(crate::Error( + io::Error::new(io::ErrorKind::WriteZero, "failed to write whole buffer"), slice.into_inner(), - ) + )) } - Ok(n) => { + Ok((n, slice)) => { pos += n as u64; buf = slice.slice(n..); } @@ -629,11 +618,11 @@ impl File { // crate's design ensures we are not calling the 'wait' option // in the ENTER syscall. Only an Enter with 'wait' can generate // an EINTR according to the io_uring man pages. - Err(e) => return (Err(e), slice.into_inner()), + Err(e) => return Err(e.map(|slice| slice.into_inner())), }; } - (Ok(()), buf.into_inner()) + Ok(((), buf.into_inner())) } /// Like [`write_at`], but using a pre-mapped buffer @@ -677,7 +666,7 @@ impl File { /// }) ///# } /// ``` - pub async fn write_fixed_at(&self, buf: T, pos: u64) -> crate::BufResult + pub async fn write_fixed_at(&self, buf: T, pos: u64) -> crate::Result where T: BoundedBuf, { @@ -704,43 +693,37 @@ impl File { /// This function will return the first error that [`write_fixed_at`] returns. /// /// [`write_fixed_at`]: Self::write_fixed_at - pub async fn write_fixed_all_at(&self, buf: T, pos: u64) -> crate::BufResult<(), T> + pub async fn write_fixed_all_at(&self, buf: T, pos: u64) -> crate::Result<(), T> where T: BoundedBuf, { let orig_bounds = buf.bounds(); - let (res, buf) = self.write_fixed_all_at_slice(buf.slice_full(), pos).await; - (res, T::from_buf_bounds(buf, orig_bounds)) + self.write_fixed_all_at_slice(buf.slice_full(), pos) + .await + .map_buf(|buf| T::from_buf_bounds(buf, orig_bounds)) } async fn write_fixed_all_at_slice( &self, mut buf: Slice, mut pos: u64, - ) -> crate::BufResult<(), FixedBuf> { + ) -> crate::Result<(), FixedBuf> { if pos.checked_add(buf.bytes_init() as u64).is_none() { - return ( - Err(io::Error::new( - io::ErrorKind::InvalidInput, - "buffer too large for file", - )), + return Err(crate::Error( + io::Error::new(io::ErrorKind::InvalidInput, "buffer too large for file"), buf.into_inner(), - ); + )); } while buf.bytes_init() != 0 { - let (res, slice) = self.write_fixed_at(buf, pos).await; - match res { - Ok(0) => { - return ( - Err(io::Error::new( - io::ErrorKind::WriteZero, - "failed to write whole buffer", - )), + match self.write_fixed_at(buf, pos).await { + Ok((0, slice)) => { + return Err(crate::Error( + io::Error::new(io::ErrorKind::WriteZero, "failed to write whole buffer"), slice.into_inner(), - ) + )) } - Ok(n) => { + Ok((n, slice)) => { pos += n as u64; buf = slice.slice(n..); } @@ -749,11 +732,11 @@ impl File { // crate's design ensures we are not calling the 'wait' option // in the ENTER syscall. Only an Enter with 'wait' can generate // an EINTR according to the io_uring man pages. - Err(e) => return (Err(e), slice.into_inner()), + Err(e) => return Err(e.map(|slice| slice.into_inner())), }; } - (Ok(()), buf.into_inner()) + Ok(((), buf.into_inner())) } /// Attempts to sync all OS-internal metadata to disk. diff --git a/src/io/read.rs b/src/io/read.rs index c3395b40..37b11a3a 100644 --- a/src/io/read.rs +++ b/src/io/read.rs @@ -1,6 +1,6 @@ use crate::buf::BoundedBufMut; use crate::io::SharedFd; -use crate::BufResult; +use crate::Result; use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; @@ -43,7 +43,7 @@ impl Completable for Read where T: BoundedBufMut, { - type Output = BufResult; + type Output = Result; fn complete(self, cqe: CqeResult) -> Self::Output { // Convert the operation result to `usize` @@ -59,6 +59,9 @@ where } } - (res, buf) + match res { + Ok(n) => Ok((n, buf)), + Err(e) => Err(crate::Error(e, buf)), + } } } diff --git a/src/io/read_fixed.rs b/src/io/read_fixed.rs index 3cb96cdb..fa5b159d 100644 --- a/src/io/read_fixed.rs +++ b/src/io/read_fixed.rs @@ -2,7 +2,7 @@ use crate::buf::fixed::FixedBuf; use crate::buf::BoundedBufMut; use crate::io::SharedFd; use crate::runtime::driver::op::{self, Completable, Op}; -use crate::BufResult; +use crate::Result; use crate::runtime::CONTEXT; use std::io; @@ -52,13 +52,13 @@ impl Completable for ReadFixed where T: BoundedBufMut, { - type Output = BufResult; + type Output = Result; fn complete(self, cqe: op::CqeResult) -> Self::Output { - // Convert the operation result to `usize` - let res = cqe.result.map(|v| v as usize); // Recover the buffer let mut buf = self.buf; + // Convert the operation result to `usize` + let res = cqe.result.map(|v| v as usize); // If the operation was successful, advance the initialized cursor. if let Ok(n) = res { @@ -68,6 +68,9 @@ where } } - (res, buf) + match res { + Ok(n) => Ok((n, buf)), + Err(e) => Err(crate::Error(e, buf)), + } } } diff --git a/src/io/readv.rs b/src/io/readv.rs index ff71dc79..859f9c42 100644 --- a/src/io/readv.rs +++ b/src/io/readv.rs @@ -1,5 +1,5 @@ use crate::buf::BoundedBufMut; -use crate::BufResult; +use crate::Result; use crate::io::SharedFd; use crate::runtime::driver::op::{Completable, CqeResult, Op}; @@ -62,7 +62,7 @@ impl Completable for Readv where T: BoundedBufMut, { - type Output = BufResult>; + type Output = Result>; fn complete(self, cqe: CqeResult) -> Self::Output { // Convert the operation result to `usize` @@ -87,6 +87,9 @@ where assert_eq!(count, 0); } - (res, bufs) + match res { + Ok(n) => Ok((n, bufs)), + Err(e) => Err(crate::Error(e, bufs)), + } } } diff --git a/src/io/recv_from.rs b/src/io/recv_from.rs index e9b360ca..c9336271 100644 --- a/src/io/recv_from.rs +++ b/src/io/recv_from.rs @@ -1,6 +1,6 @@ use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; -use crate::{buf::BoundedBufMut, io::SharedFd, BufResult}; +use crate::{buf::BoundedBufMut, io::SharedFd, Result}; use socket2::SockAddr; use std::{ io::IoSliceMut, @@ -57,7 +57,7 @@ impl Completable for RecvFrom where T: BoundedBufMut, { - type Output = BufResult<(usize, SocketAddr), T>; + type Output = Result<(usize, SocketAddr), T>; fn complete(self, cqe: CqeResult) -> Self::Output { // Convert the operation result to `usize` @@ -78,6 +78,9 @@ where (n, socket_addr) }); - (res, buf) + match res { + Ok(n) => Ok((n, buf)), + Err(e) => Err(crate::Error(e, buf)), + } } } diff --git a/src/io/recvmsg.rs b/src/io/recvmsg.rs index 3cae2e50..67175328 100644 --- a/src/io/recvmsg.rs +++ b/src/io/recvmsg.rs @@ -1,6 +1,6 @@ use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; -use crate::{buf::BoundedBufMut, io::SharedFd, BufResult}; +use crate::{buf::BoundedBufMut, io::SharedFd, Result}; use socket2::SockAddr; use std::{ io::IoSliceMut, @@ -61,7 +61,7 @@ impl Completable for RecvMsg where T: BoundedBufMut, { - type Output = BufResult<(usize, SocketAddr), Vec>; + type Output = Result<(usize, SocketAddr), Vec>; fn complete(self, cqe: CqeResult) -> Self::Output { // Convert the operation result to `usize` @@ -92,6 +92,9 @@ where (n, socket_addr) }); - (res, bufs) + match res { + Ok(n) => Ok((n, bufs)), + Err(e) => Err(crate::Error(e, bufs)), + } } } diff --git a/src/io/send_to.rs b/src/io/send_to.rs index 8895f5fa..805980ae 100644 --- a/src/io/send_to.rs +++ b/src/io/send_to.rs @@ -2,7 +2,7 @@ use crate::buf::BoundedBuf; use crate::io::SharedFd; use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; -use crate::BufResult; +use crate::Result; use socket2::SockAddr; use std::io::IoSlice; use std::{boxed::Box, io, net::SocketAddr}; @@ -70,7 +70,7 @@ impl Op> { } impl Completable for SendTo { - type Output = BufResult; + type Output = Result; fn complete(self, cqe: CqeResult) -> Self::Output { // Convert the operation result to `usize` @@ -78,6 +78,9 @@ impl Completable for SendTo { // Recover the buffer let buf = self.buf; - (res, buf) + match res { + Ok(n) => Ok((n, buf)), + Err(e) => Err(crate::Error(e, buf)), + } } } diff --git a/src/io/send_zc.rs b/src/io/send_zc.rs index df37722b..2602567b 100644 --- a/src/io/send_zc.rs +++ b/src/io/send_zc.rs @@ -1,6 +1,6 @@ use crate::runtime::driver::op::{Completable, CqeResult, MultiCQEFuture, Op, Updateable}; use crate::runtime::CONTEXT; -use crate::{buf::BoundedBuf, io::SharedFd, BufResult}; +use crate::{buf::BoundedBuf, io::SharedFd, Result}; use std::io; pub(crate) struct SendZc { @@ -39,14 +39,17 @@ impl Op, MultiCQEFuture> { } impl Completable for SendZc { - type Output = BufResult; + type Output = Result; fn complete(self, cqe: CqeResult) -> Self::Output { // Convert the operation result to `usize` let res = cqe.result.map(|v| self.bytes + v as usize); // Recover the buffer let buf = self.buf; - (res, buf) + match res { + Ok(n) => Ok((n, buf)), + Err(e) => Err(crate::Error(e, buf)), + } } } diff --git a/src/io/socket.rs b/src/io/socket.rs index dda1bb36..5cc2071e 100644 --- a/src/io/socket.rs +++ b/src/io/socket.rs @@ -47,26 +47,28 @@ impl Socket { UnsubmittedOneshot::write_at(&self.fd, buf, 0) } - pub async fn write_all(&self, buf: T) -> crate::BufResult<(), T> { + pub async fn write_all(&self, buf: T) -> crate::Result<(), T> { let orig_bounds = buf.bounds(); - let (res, buf) = self.write_all_slice(buf.slice_full()).await; - (res, T::from_buf_bounds(buf, orig_bounds)) + match self.write_all_slice(buf.slice_full()).await { + Ok((x, buf)) => Ok((x, T::from_buf_bounds(buf, orig_bounds))), + Err(e) => Err(e.map(|buf| T::from_buf_bounds(buf, orig_bounds))), + } } - async fn write_all_slice(&self, mut buf: Slice) -> crate::BufResult<(), T> { + async fn write_all_slice(&self, mut buf: Slice) -> crate::Result<(), T> { while buf.bytes_init() != 0 { let res = self.write(buf).submit().await; match res { - (Ok(0), slice) => { - return ( - Err(std::io::Error::new( + Ok((0, slice)) => { + return Err(crate::Error( + std::io::Error::new( std::io::ErrorKind::WriteZero, "failed to write whole buffer", - )), + ), slice.into_inner(), - ) + )) } - (Ok(n), slice) => { + Ok((n, slice)) => { buf = slice.slice(n..); } @@ -74,14 +76,14 @@ impl Socket { // crate's design ensures we are not calling the 'wait' option // in the ENTER syscall. Only an Enter with 'wait' can generate // an EINTR according to the io_uring man pages. - (Err(e), slice) => return (Err(e), slice.into_inner()), + Err(e) => return Err(e.map(|slice| slice.into_inner())), } } - (Ok(()), buf.into_inner()) + Ok(((), buf.into_inner())) } - pub(crate) async fn write_fixed(&self, buf: T) -> crate::BufResult + pub(crate) async fn write_fixed(&self, buf: T) -> crate::Result where T: BoundedBuf, { @@ -89,32 +91,31 @@ impl Socket { op.await } - pub(crate) async fn write_fixed_all(&self, buf: T) -> crate::BufResult<(), T> + pub(crate) async fn write_fixed_all(&self, buf: T) -> crate::Result<(), T> where T: BoundedBuf, { let orig_bounds = buf.bounds(); - let (res, buf) = self.write_fixed_all_slice(buf.slice_full()).await; - (res, T::from_buf_bounds(buf, orig_bounds)) + match self.write_fixed_all_slice(buf.slice_full()).await { + Ok((r, buf)) => Ok((r, T::from_buf_bounds(buf, orig_bounds))), + Err(e) => Err(e.map(|buf| T::from_buf_bounds(buf, orig_bounds))), + } } - async fn write_fixed_all_slice( - &self, - mut buf: Slice, - ) -> crate::BufResult<(), FixedBuf> { + async fn write_fixed_all_slice(&self, mut buf: Slice) -> crate::Result<(), FixedBuf> { while buf.bytes_init() != 0 { let res = self.write_fixed(buf).await; match res { - (Ok(0), slice) => { - return ( - Err(std::io::Error::new( + Ok((0, slice)) => { + return Err(crate::Error( + std::io::Error::new( std::io::ErrorKind::WriteZero, "failed to write whole buffer", - )), + ), slice.into_inner(), - ) + )) } - (Ok(n), slice) => { + Ok((n, slice)) => { buf = slice.slice(n..); } @@ -122,14 +123,14 @@ impl Socket { // crate's design ensures we are not calling the 'wait' option // in the ENTER syscall. Only an Enter with 'wait' can generate // an EINTR according to the io_uring man pages. - (Err(e), slice) => return (Err(e), slice.into_inner()), + Err(e) => return Err(e.map(|slice| slice.into_inner())), } } - (Ok(()), buf.into_inner()) + Ok(((), buf.into_inner())) } - pub async fn writev(&self, buf: Vec) -> crate::BufResult> { + pub async fn writev(&self, buf: Vec) -> crate::Result> { let op = Op::writev_at(&self.fd, buf, 0).unwrap(); op.await } @@ -138,12 +139,12 @@ impl Socket { &self, buf: T, socket_addr: Option, - ) -> crate::BufResult { + ) -> crate::Result { let op = Op::send_to(&self.fd, buf, socket_addr).unwrap(); op.await } - pub(crate) async fn send_zc(&self, buf: T) -> crate::BufResult { + pub(crate) async fn send_zc(&self, buf: T) -> crate::Result { let op = Op::send_zc(&self.fd, buf).unwrap(); op.await } @@ -168,12 +169,12 @@ impl Socket { op.await } - pub(crate) async fn read(&self, buf: T) -> crate::BufResult { + pub(crate) async fn read(&self, buf: T) -> crate::Result { let op = Op::read_at(&self.fd, buf, 0).unwrap(); op.await } - pub(crate) async fn read_fixed(&self, buf: T) -> crate::BufResult + pub(crate) async fn read_fixed(&self, buf: T) -> crate::Result where T: BoundedBufMut, { @@ -184,7 +185,7 @@ impl Socket { pub(crate) async fn recv_from( &self, buf: T, - ) -> crate::BufResult<(usize, SocketAddr), T> { + ) -> crate::Result<(usize, SocketAddr), T> { let op = Op::recv_from(&self.fd, buf).unwrap(); op.await } @@ -192,7 +193,7 @@ impl Socket { pub(crate) async fn recvmsg( &self, buf: Vec, - ) -> crate::BufResult<(usize, SocketAddr), Vec> { + ) -> crate::Result<(usize, SocketAddr), Vec> { let op = Op::recvmsg(&self.fd, buf).unwrap(); op.await } diff --git a/src/io/write.rs b/src/io/write.rs index 6c607f75..f903f4af 100644 --- a/src/io/write.rs +++ b/src/io/write.rs @@ -1,4 +1,4 @@ -use crate::{buf::BoundedBuf, io::SharedFd, BufResult, OneshotOutputTransform, UnsubmittedOneshot}; +use crate::{buf::BoundedBuf, io::SharedFd, OneshotOutputTransform, Result, UnsubmittedOneshot}; use io_uring::cqueue::Entry; use std::io; use std::marker::PhantomData; @@ -21,7 +21,7 @@ pub struct WriteTransform { } impl OneshotOutputTransform for WriteTransform { - type Output = BufResult; + type Output = Result; type StoredData = WriteData; fn transform_oneshot_output(self, data: Self::StoredData, cqe: Entry) -> Self::Output { @@ -31,7 +31,10 @@ impl OneshotOutputTransform for WriteTransform { Err(io::Error::from_raw_os_error(-cqe.result())) }; - (res, data.buf) + match res { + Ok(n) => Ok((n, data.buf)), + Err(e) => Err(crate::Error(e, data.buf)), + } } } diff --git a/src/io/write_fixed.rs b/src/io/write_fixed.rs index 1d2c3e38..acf40f30 100644 --- a/src/io/write_fixed.rs +++ b/src/io/write_fixed.rs @@ -2,7 +2,7 @@ use crate::buf::fixed::FixedBuf; use crate::buf::BoundedBuf; use crate::io::SharedFd; use crate::runtime::driver::op::{self, Completable, Op}; -use crate::BufResult; +use crate::Result; use crate::runtime::CONTEXT; use std::io; @@ -48,7 +48,7 @@ where } impl Completable for WriteFixed { - type Output = BufResult; + type Output = Result; fn complete(self, cqe: op::CqeResult) -> Self::Output { // Convert the operation result to `usize` @@ -56,6 +56,9 @@ impl Completable for WriteFixed { // Recover the buffer let buf = self.buf; - (res, buf) + match res { + Ok(n) => Ok((n, buf)), + Err(e) => Err(crate::Error(e, buf)), + } } } diff --git a/src/io/writev.rs b/src/io/writev.rs index 86236ebc..33b34abe 100644 --- a/src/io/writev.rs +++ b/src/io/writev.rs @@ -1,6 +1,6 @@ use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; -use crate::{buf::BoundedBuf, io::SharedFd, BufResult}; +use crate::{buf::BoundedBuf, io::SharedFd, Result}; use libc::iovec; use std::io; @@ -58,7 +58,7 @@ impl Completable for Writev where T: BoundedBuf, { - type Output = BufResult>; + type Output = Result>; fn complete(self, cqe: CqeResult) -> Self::Output { // Convert the operation result to `usize` @@ -66,6 +66,9 @@ where // Recover the buffer let buf = self.bufs; - (res, buf) + match res { + Ok(n) => Ok((n, buf)), + Err(e) => Err(crate::Error(e, buf)), + } } } diff --git a/src/io/writev_all.rs b/src/io/writev_all.rs index ef5b9d40..1182532c 100644 --- a/src/io/writev_all.rs +++ b/src/io/writev_all.rs @@ -17,7 +17,7 @@ pub(crate) async fn writev_at_all( fd: &SharedFd, mut bufs: Vec, offset: Option, -) -> crate::BufResult> { +) -> crate::Result> { // TODO decide if the function should return immediately if all the buffer lengths // were to sum to zero. That would save an allocation and one call into writev. @@ -59,7 +59,7 @@ pub(crate) async fn writev_at_all( // On error, there is no indication how many bytes were written. This is standard. // The device doesn't tell us that either. - Err(e) => return (Err(e), bufs), + Err(e) => return Err(crate::Error(e, bufs)), }; // TODO if n is zero, while there was more data to be written, should this be interpreted @@ -101,7 +101,7 @@ pub(crate) async fn writev_at_all( break; } } - (Ok(total), bufs) + Ok((total, bufs)) } struct WritevAll { diff --git a/src/lib.rs b/src/lib.rs index 47aaecba..c328c3ac 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -84,7 +84,6 @@ pub use runtime::spawn; pub use runtime::Runtime; use crate::runtime::driver::op::Op; -use std::error::Error; use std::fmt::{Debug, Display}; use std::future::Future; @@ -264,32 +263,55 @@ impl Builder { /// }) /// } /// ``` -/// A specialized `Error` type for `io-uring` operations with buffers. -#[derive(Debug)] -pub struct BufError(pub std::io::Error, pub B); +pub type Result = std::result::Result<(T, B), Error>; -impl Display for BufError { +/// A specialized `Error` type for `io-uring` operations with buffers. +pub struct Error(pub std::io::Error, pub B); +impl Debug for Error { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { Debug::fmt(&self.0, f) } } -impl Error for BufError { - fn source(&self) -> Option<&(dyn Error + 'static)> { +impl Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Display::fmt(&self.0, f) + } +} + +impl std::error::Error for Error { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { Some(&self.0) } } -impl BufError { +impl Error { /// Applies a function to the contained buffer, returning a new `BufError`. - pub fn map(self, f: F) -> BufError + pub fn map(self, f: F) -> Error where F: FnOnce(B) -> U, { - BufError(self.0, f(self.1)) + Error(self.0, f(self.1)) } } +mod sealed { + /// A Specialized trait for mapping over the buffer in both sides of a Result + pub trait MapResultBuf { + type Output; + fn map_buf(self, f: impl FnOnce(B) -> U) -> Self::Output; + } +} + +impl sealed::MapResultBuf for Result { + type Output = Result; + fn map_buf(self, f: impl FnOnce(B) -> U) -> Self::Output { + match self { + Ok((r, b)) => Ok((r, f(b))), + Err(e) => Err(e.map(|e| f(e))), + } + } +} /// The simplest possible operation. Just posts a completion event, nothing else. /// diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 2450dcb9..c9651a81 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -74,7 +74,7 @@ impl TcpStream { /// Read some data from the stream into the buffer. /// /// Returns the original buffer and quantity of data read. - pub async fn read(&self, buf: T) -> crate::BufResult { + pub async fn read(&self, buf: T) -> crate::Result { self.inner.read(buf).await } @@ -91,7 +91,7 @@ impl TcpStream { /// In addition to errors that can be reported by `read`, /// this operation fails if the buffer is not registered in the /// current `tokio-uring` runtime. - pub async fn read_fixed(&self, buf: T) -> crate::BufResult + pub async fn read_fixed(&self, buf: T) -> crate::Result where T: BoundedBufMut, { @@ -155,7 +155,7 @@ impl TcpStream { /// ``` /// /// [`write`]: Self::write - pub async fn write_all(&self, buf: T) -> crate::BufResult<(), T> { + pub async fn write_all(&self, buf: T) -> crate::Result<(), T> { self.inner.write_all(buf).await } @@ -172,7 +172,7 @@ impl TcpStream { /// In addition to errors that can be reported by `write`, /// this operation fails if the buffer is not registered in the /// current `tokio-uring` runtime. - pub async fn write_fixed(&self, buf: T) -> crate::BufResult + pub async fn write_fixed(&self, buf: T) -> crate::Result where T: BoundedBuf, { @@ -192,7 +192,7 @@ impl TcpStream { /// This function will return the first error that [`write_fixed`] returns. /// /// [`write_fixed`]: Self::write_fixed - pub async fn write_fixed_all(&self, buf: T) -> crate::BufResult<(), T> + pub async fn write_fixed_all(&self, buf: T) -> crate::Result<(), T> where T: BoundedBuf, { @@ -222,7 +222,7 @@ impl TcpStream { /// written to this writer. /// /// [`Ok(n)`]: Ok - pub async fn writev(&self, buf: Vec) -> crate::BufResult> { + pub async fn writev(&self, buf: Vec) -> crate::Result> { self.inner.writev(buf).await } diff --git a/src/net/udp.rs b/src/net/udp.rs index cb0cef66..92529952 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -212,7 +212,7 @@ impl UdpSocket { /// Sends data on the connected socket /// /// On success, returns the number of bytes written. - pub async fn send(&self, buf: T) -> crate::BufResult { + pub async fn send(&self, buf: T) -> crate::Result { self.inner.send_to(buf, None).await } @@ -223,7 +223,7 @@ impl UdpSocket { &self, buf: T, socket_addr: SocketAddr, - ) -> crate::BufResult { + ) -> crate::Result { self.inner.send_to(buf, Some(socket_addr)).await } @@ -240,7 +240,7 @@ impl UdpSocket { /// > at writes over around 10 KB. /// /// Note: Using fixed buffers [#54](https://github.com/tokio-rs/tokio-uring/pull/54), avoids the page-pinning overhead - pub async fn send_zc(&self, buf: T) -> crate::BufResult { + pub async fn send_zc(&self, buf: T) -> crate::Result { self.inner.send_zc(buf).await } @@ -299,7 +299,7 @@ impl UdpSocket { pub async fn recv_from( &self, buf: T, - ) -> crate::BufResult<(usize, SocketAddr), T> { + ) -> crate::Result<(usize, SocketAddr), T> { self.inner.recv_from(buf).await } @@ -309,14 +309,14 @@ impl UdpSocket { pub async fn recvmsg( &self, buf: Vec, - ) -> crate::BufResult<(usize, SocketAddr), Vec> { + ) -> crate::Result<(usize, SocketAddr), Vec> { self.inner.recvmsg(buf).await } /// Reads a packet of data from the socket into the buffer. /// /// Returns the original buffer and quantity of data read. - pub async fn read(&self, buf: T) -> crate::BufResult { + pub async fn read(&self, buf: T) -> crate::Result { self.inner.read(buf).await } @@ -333,7 +333,7 @@ impl UdpSocket { /// In addition to errors that can be reported by `read`, /// this operation fails if the buffer is not registered in the /// current `tokio-uring` runtime. - pub async fn read_fixed(&self, buf: T) -> crate::BufResult + pub async fn read_fixed(&self, buf: T) -> crate::Result where T: BoundedBufMut, { @@ -360,7 +360,7 @@ impl UdpSocket { /// In addition to errors that can be reported by `write`, /// this operation fails if the buffer is not registered in the /// current `tokio-uring` runtime. - pub async fn write_fixed(&self, buf: T) -> crate::BufResult + pub async fn write_fixed(&self, buf: T) -> crate::Result where T: BoundedBuf, { diff --git a/src/net/unix/stream.rs b/src/net/unix/stream.rs index 40e7ddc5..d1ab6c41 100644 --- a/src/net/unix/stream.rs +++ b/src/net/unix/stream.rs @@ -75,7 +75,7 @@ impl UnixStream { /// Read some data from the stream into the buffer, returning the original buffer and /// quantity of data read. - pub async fn read(&self, buf: T) -> crate::BufResult { + pub async fn read(&self, buf: T) -> crate::Result { self.inner.read(buf).await } @@ -90,7 +90,7 @@ impl UnixStream { /// In addition to errors that can be reported by `read`, /// this operation fails if the buffer is not registered in the /// current `tokio-uring` runtime. - pub async fn read_fixed(&self, buf: T) -> crate::BufResult + pub async fn read_fixed(&self, buf: T) -> crate::Result where T: BoundedBufMut, { @@ -116,7 +116,7 @@ impl UnixStream { /// This function will return the first error that [`write`] returns. /// /// [`write`]: Self::write - pub async fn write_all(&self, buf: T) -> crate::BufResult<(), T> { + pub async fn write_all(&self, buf: T) -> crate::Result<(), T> { self.inner.write_all(buf).await } @@ -131,7 +131,7 @@ impl UnixStream { /// In addition to errors that can be reported by `write`, /// this operation fails if the buffer is not registered in the /// current `tokio-uring` runtime. - pub async fn write_fixed(&self, buf: T) -> crate::BufResult + pub async fn write_fixed(&self, buf: T) -> crate::Result where T: BoundedBuf, { @@ -151,7 +151,7 @@ impl UnixStream { /// This function will return the first error that [`write_fixed`] returns. /// /// [`write_fixed`]: Self::write - pub async fn write_fixed_all(&self, buf: T) -> crate::BufResult<(), T> + pub async fn write_fixed_all(&self, buf: T) -> crate::Result<(), T> where T: BoundedBuf, { @@ -182,7 +182,7 @@ impl UnixStream { /// written to this writer. /// /// [`Ok(n)`]: Ok - pub async fn writev(&self, buf: Vec) -> crate::BufResult> { + pub async fn writev(&self, buf: Vec) -> crate::Result> { self.inner.writev(buf).await } diff --git a/tests/driver.rs b/tests/driver.rs index f4381dd5..2daa288d 100644 --- a/tests/driver.rs +++ b/tests/driver.rs @@ -59,7 +59,6 @@ fn complete_ops_on_drop() { 25 * 1024 * 1024, ) .await - .0 .unwrap(); }) .await; @@ -86,7 +85,6 @@ fn too_many_submissions() { file.write_at(b"hello world".to_vec(), 0) .submit() .await - .0 .unwrap(); }) .await; diff --git a/tests/fixed_buf.rs b/tests/fixed_buf.rs index a5442217..3e0562e2 100644 --- a/tests/fixed_buf.rs +++ b/tests/fixed_buf.rs @@ -42,8 +42,7 @@ fn fixed_buf_turnaround() { // for another instance. assert!(buffers.check_out(0).is_none()); - let (res, buf) = op.await; - let n = res.unwrap(); + let (n, buf) = op.await.unwrap(); assert_eq!(n, HELLO.len()); // The buffer is owned by `buf`, can't check it out @@ -81,12 +80,11 @@ fn unregister_invalidates_checked_out_buffers() { // The old buffer's index no longer matches the memory area of the // currently registered buffer, so the read operation using the old // buffer's memory should fail. - let (res, _) = file.read_fixed_at(fixed_buf, 0).await; + let res = file.read_fixed_at(fixed_buf, 0).await; assert_err!(res); let fixed_buf = buffers.check_out(0).unwrap(); - let (res, buf) = file.read_fixed_at(fixed_buf, 0).await; - let n = res.unwrap(); + let (n, buf) = file.read_fixed_at(fixed_buf, 0).await.unwrap(); assert_eq!(n, HELLO.len()); assert_eq!(&buf[..], HELLO); }); @@ -112,18 +110,17 @@ fn slicing() { let fixed_buf = buffers.check_out(0).unwrap(); // Read no more than 8 bytes into the fixed buffer. - let (res, slice) = file.read_fixed_at(fixed_buf.slice(..8), 3).await; - let n = res.unwrap(); + let (n, slice) = file.read_fixed_at(fixed_buf.slice(..8), 3).await.unwrap(); assert_eq!(n, 8); assert_eq!(slice[..], HELLO[3..11]); let fixed_buf = slice.into_inner(); // Write from the fixed buffer, starting at offset 1, // up to the end of the initialized bytes in the buffer. - let (res, slice) = file + let (n, slice) = file .write_fixed_at(fixed_buf.slice(1..), HELLO.len() as u64) - .await; - let n = res.unwrap(); + .await + .unwrap(); assert_eq!(n, 7); assert_eq!(slice[..], HELLO[4..11]); let fixed_buf = slice.into_inner(); @@ -131,8 +128,7 @@ fn slicing() { // Read into the fixed buffer, overwriting bytes starting from offset 3 // and then extending the initialized part with as many bytes as // the operation can read. - let (res, slice) = file.read_fixed_at(fixed_buf.slice(3..), 0).await; - let n = res.unwrap(); + let (n, slice) = file.read_fixed_at(fixed_buf.slice(3..), 0).await.unwrap(); assert_eq!(n, HELLO.len() + 7); assert_eq!(slice[..HELLO.len()], HELLO[..]); assert_eq!(slice[HELLO.len()..], HELLO[4..11]); @@ -167,8 +163,10 @@ fn pool_next_as_concurrency_limit() { let file = File::from_std(cloned_file); let data = [b'0' + i as u8; BUF_SIZE]; buf.put_slice(&data); - let (res, buf) = file.write_fixed_all_at(buf, BUF_SIZE as u64 * i).await; - res.unwrap(); + let (_, buf) = file + .write_fixed_all_at(buf, BUF_SIZE as u64 * i) + .await + .unwrap(); println!("[worker {}]: dropping buffer {}", i, buf.buf_index()); }); diff --git a/tests/fs_file.rs b/tests/fs_file.rs index 6ec14d43..45edf0c7 100644 --- a/tests/fs_file.rs +++ b/tests/fs_file.rs @@ -19,9 +19,7 @@ const HELLO: &[u8] = b"hello world..."; async fn read_hello(file: &File) { let buf = Vec::with_capacity(1024); - let (res, buf) = file.read_at(buf, 0).await; - let n = res.unwrap(); - + let (n, buf) = file.read_at(buf, 0).await.unwrap(); assert_eq!(n, HELLO.len()); assert_eq!(&buf[..n], HELLO); } @@ -47,8 +45,7 @@ fn basic_read_exact() { tempfile.write_all(&data).unwrap(); let file = File::open(tempfile.path()).await.unwrap(); - let (res, buf) = file.read_exact_at(buf, 0).await; - res.unwrap(); + let (_, buf) = file.read_exact_at(buf, 0).await.unwrap(); assert_eq!(buf, data); }); } @@ -60,7 +57,7 @@ fn basic_write() { let file = File::create(tempfile.path()).await.unwrap(); - file.write_at(HELLO, 0).submit().await.0.unwrap(); + file.write_at(HELLO, 0).submit().await.unwrap(); let file = std::fs::read(tempfile.path()).unwrap(); assert_eq!(file, HELLO); @@ -75,8 +72,7 @@ fn vectored_read() { let file = File::open(tempfile.path()).await.unwrap(); let bufs = vec![Vec::::with_capacity(5), Vec::::with_capacity(9)]; - let (res, bufs) = file.readv_at(bufs, 0).await; - let n = res.unwrap(); + let (n, bufs) = file.readv_at(bufs, 0).await.unwrap(); assert_eq!(n, HELLO.len()); assert_eq!(bufs[1][0], b' '); @@ -93,7 +89,7 @@ fn vectored_write() { let buf2 = " world...".to_owned().into_bytes(); let bufs = vec![buf1, buf2]; - file.writev_at(bufs, 0).await.0.unwrap(); + file.writev_at(bufs, 0).await.unwrap(); let file = std::fs::read(tempfile.path()).unwrap(); assert_eq!(file, HELLO); @@ -108,8 +104,7 @@ fn basic_write_all() { let tempfile = tempfile(); let file = File::create(tempfile.path()).await.unwrap(); - let (ret, data) = file.write_all_at(data, 0).await; - ret.unwrap(); + let (_, data) = file.write_all_at(data, 0).await.unwrap(); let file = std::fs::read(tempfile.path()).unwrap(); assert_eq!(file, data); @@ -155,7 +150,7 @@ fn drop_open() { // Do something else let file = File::create(tempfile.path()).await.unwrap(); - file.write_at(HELLO, 0).submit().await.0.unwrap(); + file.write_at(HELLO, 0).submit().await.unwrap(); let file = std::fs::read(tempfile.path()).unwrap(); assert_eq!(file, HELLO); @@ -183,7 +178,7 @@ fn sync_doesnt_kill_anything() { let file = File::create(tempfile.path()).await.unwrap(); file.sync_all().await.unwrap(); file.sync_data().await.unwrap(); - file.write_at(&b"foo"[..], 0).submit().await.0.unwrap(); + file.write_at(&b"foo"[..], 0).submit().await.unwrap(); file.sync_all().await.unwrap(); file.sync_data().await.unwrap(); }); @@ -236,16 +231,14 @@ fn read_fixed() { let fixed_buf = buffers.check_out(0).unwrap(); assert_eq!(fixed_buf.bytes_total(), 6); - let (res, buf) = file.read_fixed_at(fixed_buf.slice(..), 0).await; - let n = res.unwrap(); + let (n, buf) = file.read_fixed_at(fixed_buf.slice(..), 0).await.unwrap(); assert_eq!(n, 6); assert_eq!(&buf[..], &HELLO[..6]); let fixed_buf = buffers.check_out(1).unwrap(); assert_eq!(fixed_buf.bytes_total(), 1024); - let (res, buf) = file.read_fixed_at(fixed_buf.slice(..), 6).await; - let n = res.unwrap(); + let (n, buf) = file.read_fixed_at(fixed_buf.slice(..), 6).await.unwrap(); assert_eq!(n, HELLO.len() - 6); assert_eq!(&buf[..], &HELLO[6..]); @@ -266,16 +259,14 @@ fn write_fixed() { let mut buf = fixed_buf; buf.put_slice(&HELLO[..6]); - let (res, _) = file.write_fixed_at(buf, 0).await; - let n = res.unwrap(); + let (n, _) = file.write_fixed_at(buf, 0).await.unwrap(); assert_eq!(n, 6); let fixed_buf = buffers.check_out(1).unwrap(); let mut buf = fixed_buf; buf.put_slice(&HELLO[6..]); - let (res, _) = file.write_fixed_at(buf, 6).await; - let n = res.unwrap(); + let (n, _) = file.write_fixed_at(buf, 6).await.unwrap(); assert_eq!(n, HELLO.len() - 6); let file = std::fs::read(tempfile.path()).unwrap(); From ed5faa9ab86090b35d123e00c5629004d0896c12 Mon Sep 17 00:00:00 2001 From: Oliver Bunting <72926894+ollie-etl@users.noreply.github.com> Date: Tue, 13 Feb 2024 12:27:23 +0000 Subject: [PATCH 3/8] Fix doctests --- src/fs/file.rs | 33 +++++++++++---------------------- src/lib.rs | 8 +++----- src/net/tcp/listener.rs | 4 ++-- src/net/tcp/stream.rs | 9 +++------ src/net/udp.rs | 27 ++++++++++----------------- src/net/unix/listener.rs | 4 ++-- src/net/unix/stream.rs | 3 +-- src/runtime/driver/op/mod.rs | 1 + src/runtime/mod.rs | 2 +- 9 files changed, 34 insertions(+), 57 deletions(-) diff --git a/src/fs/file.rs b/src/fs/file.rs index 68dab6a1..9c3944cf 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -40,8 +40,7 @@ use std::path::Path; /// let file = File::create("hello.txt").await?; /// /// // Write some data -/// let (res, buf) = file.write_at(&b"hello world"[..], 0).submit().await; -/// let n = res?; +/// let (n, buf) = file.write_at(&b"hello world"[..], 0).submit().await?; /// /// println!("wrote {} bytes", n); /// @@ -166,8 +165,7 @@ impl File { /// let buffer = vec![0; 10]; /// /// // Read up to 10 bytes - /// let (res, buffer) = f.read_at(buffer, 0).await; - /// let n = res?; + /// let (n, buffer) = f.read_at(buffer, 0).await?; /// /// println!("The bytes: {:?}", &buffer[..n]); /// @@ -217,8 +215,7 @@ impl File { /// let buffers = vec![Vec::::with_capacity(10), Vec::::with_capacity(10)]; /// /// // Read up to 20 bytes - /// let (res, buffer) = f.readv_at(buffers, 0).await; - /// let n = res?; + /// let (n, _) = f.readv_at(buffers, 0).await?; /// /// println!("Read {} bytes", n); /// @@ -272,8 +269,7 @@ impl File { /// /// // Writes some prefix of the byte string, not necessarily all of it. /// let bufs = vec!["some".to_owned().into_bytes(), " bytes".to_owned().into_bytes()]; - /// let (res, _) = file.writev_at(bufs, 0).await; - /// let n = res?; + /// let (n, _) = file.writev_at(bufs, 0).await?; /// /// println!("wrote {} bytes", n); /// @@ -380,8 +376,7 @@ impl File { /// let buffer = Vec::with_capacity(10); /// /// // Read up to 10 bytes - /// let (res, buffer) = f.read_exact_at(buffer, 0).await; - /// res?; + /// let (_, buffer) = f.read_exact_at(buffer, 0).await?; /// /// println!("The bytes: {:?}", buffer); /// @@ -468,8 +463,7 @@ impl File { /// let buffer = registry.check_out(2).unwrap(); /// /// // Read up to 10 bytes - /// let (res, buffer) = f.read_fixed_at(buffer, 0).await; - /// let n = res?; + /// let (n, buffer) = f.read_fixed_at(buffer, 0).await?; /// /// println!("The bytes: {:?}", &buffer[..n]); /// @@ -521,8 +515,7 @@ impl File { /// let file = File::create("foo.txt").await?; /// /// // Writes some prefix of the byte string, not necessarily all of it. - /// let (res, _) = file.write_at(&b"some bytes"[..], 0).submit().await; - /// let n = res?; + /// let (n, _) = file.write_at(&b"some bytes"[..], 0).submit().await?; /// /// println!("wrote {} bytes", n); /// @@ -566,8 +559,7 @@ impl File { /// let file = File::create("foo.txt").await?; /// /// // Writes some prefix of the byte string, not necessarily all of it. - /// let (res, _) = file.write_all_at(&b"some bytes"[..], 0).await; - /// res?; + /// file.write_all_at(&b"some bytes"[..], 0).await?; /// /// println!("wrote all bytes"); /// @@ -655,8 +647,7 @@ impl File { /// /// // Writes some prefix of the buffer content, /// // not necessarily all of it. - /// let (res, _) = file.write_fixed_at(buffer, 0).await; - /// let n = res?; + /// let (n, _) = file.write_fixed_at(buffer, 0).await?; /// /// println!("wrote {} bytes", n); /// @@ -756,8 +747,7 @@ impl File { /// fn main() -> Result<(), Box> { /// tokio_uring::start(async { /// let f = File::create("foo.txt").await?; - /// let (res, buf) = f.write_at(&b"Hello, world!"[..], 0).submit().await; - /// let n = res?; + /// f.write_at(&b"Hello, world!"[..], 0).submit().await?; /// /// f.sync_all().await?; /// @@ -793,8 +783,7 @@ impl File { /// fn main() -> Result<(), Box> { /// tokio_uring::start(async { /// let f = File::create("foo.txt").await?; - /// let (res, buf) = f.write_at(&b"Hello, world!"[..], 0).submit().await; - /// let n = res?; + /// f.write_at(&b"Hello, world!"[..], 0).submit().await?; /// /// f.sync_data().await?; /// diff --git a/src/lib.rs b/src/lib.rs index c328c3ac..a9f5f9c9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,8 +20,7 @@ //! // Read some data, the buffer is passed by ownership and //! // submitted to the kernel. When the operation completes, //! // we get the buffer back. -//! let (res, buf) = file.read_at(buf, 0).await; -//! let n = res?; +//! let (n, buf) = file.read_at(buf, 0).await?; //! //! // Display the contents //! println!("{:?}", &buf[..n]); @@ -116,8 +115,7 @@ use std::future::Future; /// // Read some data, the buffer is passed by ownership and /// // submitted to the kernel. When the operation completes, /// // we get the buffer back. -/// let (res, buf) = file.read_at(buf, 0).await; -/// let n = res?; +/// let (n, buf) = file.read_at(buf, 0).await?; /// /// // Display the contents /// println!("{:?}", &buf[..n]); @@ -308,7 +306,7 @@ impl sealed::MapResultBuf for Result { fn map_buf(self, f: impl FnOnce(B) -> U) -> Self::Output { match self { Ok((r, b)) => Ok((r, f(b))), - Err(e) => Err(e.map(|e| f(e))), + Err(e) => Err(e.map(f)), } } } diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index 2435c61b..6df26589 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -33,9 +33,9 @@ use std::{ /// let tx = TcpStream::connect("127.0.0.1:2345".parse().unwrap()).await.unwrap(); /// let rx = rx_ch.await.expect("The spawned task expected to send a TcpStream"); /// -/// tx.write(b"test" as &'static [u8]).submit().await.0.unwrap(); +/// tx.write(b"test" as &'static [u8]).submit().await.unwrap(); /// -/// let (_, buf) = rx.read(vec![0; 4]).await; +/// let (_, buf) = rx.read(vec![0; 4]).await.unwrap(); /// /// assert_eq!(buf, b"test"); /// }); diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index c9651a81..f135618d 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -28,8 +28,7 @@ use crate::{ /// let mut stream = TcpStream::connect("127.0.0.1:8080".parse().unwrap()).await?; /// /// // Write some data. -/// let (result, _) = stream.write(b"hello world!".as_slice()).submit().await; -/// result.unwrap(); +/// stream.write(b"hello world!".as_slice()).submit().await.unwrap(); /// /// Ok(()) /// }) @@ -137,15 +136,13 @@ impl TcpStream { /// let mut n = 0; /// let mut buf = vec![0u8; 4096]; /// loop { - /// let (result, nbuf) = stream.read(buf).await; + /// let (read, nbuf) = stream.read(buf).await.unwrap(); /// buf = nbuf; - /// let read = result.unwrap(); /// if read == 0 { /// break; /// } /// - /// let (res, slice) = stream.write_all(buf.slice(..read)).await; - /// let _ = res.unwrap(); + /// let (_, slice) = stream.write_all(buf.slice(..read)).await.unwrap(); /// buf = slice.into_inner(); /// n += read; /// } diff --git a/src/net/udp.rs b/src/net/udp.rs index 92529952..0714d88e 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -43,22 +43,18 @@ use std::{ /// let buf = vec![0; 32]; /// /// // write data -/// let (result, _) = socket.write(b"hello world".as_slice()).submit().await; -/// result.unwrap(); +/// socket.write(b"hello world".as_slice()).submit().await.unwrap(); /// /// // read data -/// let (result, buf) = other_socket.read(buf).await; -/// let n_bytes = result.unwrap(); +/// let (n_bytes, buf) = other_socket.read(buf).await.unwrap(); /// /// assert_eq!(b"hello world", &buf[..n_bytes]); /// /// // write data using send on connected socket -/// let (result, _) = socket.send(b"hello world via send".as_slice()).await; -/// result.unwrap(); +/// socket.send(b"hello world via send".as_slice()).await.unwrap(); /// /// // read data -/// let (result, buf) = other_socket.read(buf).await; -/// let n_bytes = result.unwrap(); +/// let (n_bytes, buf) = other_socket.read(buf).await.unwrap(); /// /// assert_eq!(b"hello world via send", &buf[..n_bytes]); /// @@ -83,12 +79,10 @@ use std::{ /// let buf = vec![0; 32]; /// /// // write data -/// let (result, _) = socket.send_to(b"hello world".as_slice(), second_addr).await; -/// result.unwrap(); +/// socket.send_to(b"hello world".as_slice(), second_addr).await.unwrap(); /// /// // read data -/// let (result, buf) = other_socket.recv_from(buf).await; -/// let (n_bytes, addr) = result.unwrap(); +/// let ((n_bytes, addr), buf) = other_socket.recv_from(buf).await.unwrap(); /// /// assert_eq!(addr, first_addr); /// assert_eq!(b"hello world", &buf[..n_bytes]); @@ -172,14 +166,13 @@ impl UdpSocket { /// let buf = vec![0; 32]; /// /// // write data - /// let (result, _) = std_socket + /// std_socket /// .send_to(b"hello world".as_slice(), second_addr) - /// .await; - /// result.unwrap(); + /// .await + /// .unwrap(); /// /// // read data - /// let (result, buf) = other_socket.recv_from(buf).await; - /// let (n_bytes, addr) = result.unwrap(); + /// let ((n_bytes, addr), buf) = other_socket.recv_from(buf).await.unwrap(); /// /// assert_eq!(addr, std_addr); /// assert_eq!(b"hello world", &buf[..n_bytes]); diff --git a/src/net/unix/listener.rs b/src/net/unix/listener.rs index ffabb5d2..b56f3f77 100644 --- a/src/net/unix/listener.rs +++ b/src/net/unix/listener.rs @@ -30,9 +30,9 @@ use std::{io, path::Path}; /// let tx = UnixStream::connect(&sock_file).await.unwrap(); /// let rx = rx_ch.await.expect("The spawned task expected to send a UnixStream"); /// -/// tx.write(b"test" as &'static [u8]).submit().await.0.unwrap(); +/// tx.write(b"test" as &'static [u8]).submit().await.unwrap(); /// -/// let (_, buf) = rx.read(vec![0; 4]).await; +/// let (_, buf) = rx.read(vec![0; 4]).await.unwrap(); /// /// assert_eq!(buf, b"test"); /// }); diff --git a/src/net/unix/stream.rs b/src/net/unix/stream.rs index d1ab6c41..8234b479 100644 --- a/src/net/unix/stream.rs +++ b/src/net/unix/stream.rs @@ -28,8 +28,7 @@ use std::{ /// let mut stream = UnixStream::connect("/tmp/tokio-uring-unix-test.sock").await?; /// /// // Write some data. -/// let (result, _) = stream.write(b"hello world!".as_slice()).submit().await; -/// result.unwrap(); +/// stream.write(b"hello world!".as_slice()).submit().await.unwrap(); /// /// Ok(()) /// }) diff --git a/src/runtime/driver/op/mod.rs b/src/runtime/driver/op/mod.rs index 32ba3e7a..7abc3030 100644 --- a/src/runtime/driver/op/mod.rs +++ b/src/runtime/driver/op/mod.rs @@ -165,6 +165,7 @@ pub(crate) enum Lifecycle { /// The submitter no longer has interest in the operation result. The state /// must be passed to the driver and held until the operation completes. + #[allow(dead_code)] Ignored(Box), /// The operation has completed with a single cqe result diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 369c060b..394c06b5 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -10,7 +10,7 @@ pub(crate) mod driver; pub(crate) use context::RuntimeContext; thread_local! { - pub(crate) static CONTEXT: RuntimeContext = RuntimeContext::new(); + pub(crate) static CONTEXT: RuntimeContext = const {RuntimeContext::new() }; } /// The Runtime Executor From fe33a845bbd2824f8c013139f300eb57ae9dfc19 Mon Sep 17 00:00:00 2001 From: ollie <72926894+ollie-etl@users.noreply.github.com> Date: Tue, 13 Feb 2024 12:30:48 +0000 Subject: [PATCH 4/8] Update comment in src/lib.rs --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index a9f5f9c9..aabea2f3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -284,7 +284,7 @@ impl std::error::Error for Error { } impl Error { - /// Applies a function to the contained buffer, returning a new `BufError`. + /// Applies a function to the contained buffer, returning a new `Error`. pub fn map(self, f: F) -> Error where F: FnOnce(B) -> U, From 3bcabef55e8c6891a6e01bd8c33455d258b7f0cf Mon Sep 17 00:00:00 2001 From: Oliver Bunting <72926894+ollie-etl@users.noreply.github.com> Date: Wed, 14 Feb 2024 12:40:18 +0000 Subject: [PATCH 5/8] Address feedback: Add and use WithBufer trait --- src/fs/file.rs | 2 +- src/io/read.rs | 6 +-- src/io/readv.rs | 6 +-- src/io/recv_from.rs | 6 +-- src/io/recvmsg.rs | 6 +-- src/io/send_to.rs | 11 +---- src/io/send_zc.rs | 12 ++---- src/io/write.rs | 6 +-- src/io/write_fixed.rs | 11 +---- src/io/writev.rs | 11 +---- src/lib.rs | 82 +---------------------------------- src/types.rs | 99 +++++++++++++++++++++++++++++++++++++++++++ 12 files changed, 122 insertions(+), 136 deletions(-) create mode 100644 src/types.rs diff --git a/src/fs/file.rs b/src/fs/file.rs index 9c3944cf..ebf061ad 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -4,7 +4,7 @@ use crate::fs::OpenOptions; use crate::io::SharedFd; use crate::runtime::driver::op::Op; -use crate::sealed::MapResultBuf; +use crate::sealed::MapResult; use crate::{UnsubmittedOneshot, UnsubmittedWrite}; use std::fmt; use std::io; diff --git a/src/io/read.rs b/src/io/read.rs index 37b11a3a..d87af8c1 100644 --- a/src/io/read.rs +++ b/src/io/read.rs @@ -1,5 +1,6 @@ use crate::buf::BoundedBufMut; use crate::io::SharedFd; +use crate::sealed::WithBuffer; use crate::Result; use crate::runtime::driver::op::{Completable, CqeResult, Op}; @@ -59,9 +60,6 @@ where } } - match res { - Ok(n) => Ok((n, buf)), - Err(e) => Err(crate::Error(e, buf)), - } + res.with_buffer(buf) } } diff --git a/src/io/readv.rs b/src/io/readv.rs index 859f9c42..44843a21 100644 --- a/src/io/readv.rs +++ b/src/io/readv.rs @@ -1,4 +1,5 @@ use crate::buf::BoundedBufMut; +use crate::sealed::WithBuffer; use crate::Result; use crate::io::SharedFd; @@ -87,9 +88,6 @@ where assert_eq!(count, 0); } - match res { - Ok(n) => Ok((n, bufs)), - Err(e) => Err(crate::Error(e, bufs)), - } + res.with_buffer(bufs) } } diff --git a/src/io/recv_from.rs b/src/io/recv_from.rs index c9336271..f7537801 100644 --- a/src/io/recv_from.rs +++ b/src/io/recv_from.rs @@ -1,5 +1,6 @@ use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; +use crate::sealed::WithBuffer; use crate::{buf::BoundedBufMut, io::SharedFd, Result}; use socket2::SockAddr; use std::{ @@ -78,9 +79,6 @@ where (n, socket_addr) }); - match res { - Ok(n) => Ok((n, buf)), - Err(e) => Err(crate::Error(e, buf)), - } + res.with_buffer(buf) } } diff --git a/src/io/recvmsg.rs b/src/io/recvmsg.rs index 67175328..71dc8215 100644 --- a/src/io/recvmsg.rs +++ b/src/io/recvmsg.rs @@ -1,5 +1,6 @@ use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; +use crate::sealed::WithBuffer; use crate::{buf::BoundedBufMut, io::SharedFd, Result}; use socket2::SockAddr; use std::{ @@ -92,9 +93,6 @@ where (n, socket_addr) }); - match res { - Ok(n) => Ok((n, bufs)), - Err(e) => Err(crate::Error(e, bufs)), - } + res.with_buffer(bufs) } } diff --git a/src/io/send_to.rs b/src/io/send_to.rs index 805980ae..e41cd399 100644 --- a/src/io/send_to.rs +++ b/src/io/send_to.rs @@ -2,6 +2,7 @@ use crate::buf::BoundedBuf; use crate::io::SharedFd; use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; +use crate::sealed::WithBuffer; use crate::Result; use socket2::SockAddr; use std::io::IoSlice; @@ -73,14 +74,6 @@ impl Completable for SendTo { type Output = Result; fn complete(self, cqe: CqeResult) -> Self::Output { - // Convert the operation result to `usize` - let res = cqe.result.map(|v| v as usize); - // Recover the buffer - let buf = self.buf; - - match res { - Ok(n) => Ok((n, buf)), - Err(e) => Err(crate::Error(e, buf)), - } + cqe.result.map(|v| v as usize).with_buffer(self.buf) } } diff --git a/src/io/send_zc.rs b/src/io/send_zc.rs index 2602567b..838e1801 100644 --- a/src/io/send_zc.rs +++ b/src/io/send_zc.rs @@ -1,5 +1,6 @@ use crate::runtime::driver::op::{Completable, CqeResult, MultiCQEFuture, Op, Updateable}; use crate::runtime::CONTEXT; +use crate::sealed::WithBuffer; use crate::{buf::BoundedBuf, io::SharedFd, Result}; use std::io; @@ -42,14 +43,9 @@ impl Completable for SendZc { type Output = Result; fn complete(self, cqe: CqeResult) -> Self::Output { - // Convert the operation result to `usize` - let res = cqe.result.map(|v| self.bytes + v as usize); - // Recover the buffer - let buf = self.buf; - match res { - Ok(n) => Ok((n, buf)), - Err(e) => Err(crate::Error(e, buf)), - } + cqe.result + .map(|v| self.bytes + v as usize) + .with_buffer(self.buf) } } diff --git a/src/io/write.rs b/src/io/write.rs index f903f4af..72bee1d8 100644 --- a/src/io/write.rs +++ b/src/io/write.rs @@ -1,3 +1,4 @@ +use crate::sealed::WithBuffer; use crate::{buf::BoundedBuf, io::SharedFd, OneshotOutputTransform, Result, UnsubmittedOneshot}; use io_uring::cqueue::Entry; use std::io; @@ -31,10 +32,7 @@ impl OneshotOutputTransform for WriteTransform { Err(io::Error::from_raw_os_error(-cqe.result())) }; - match res { - Ok(n) => Ok((n, data.buf)), - Err(e) => Err(crate::Error(e, data.buf)), - } + res.with_buffer(data.buf) } } diff --git a/src/io/write_fixed.rs b/src/io/write_fixed.rs index acf40f30..d99e74d0 100644 --- a/src/io/write_fixed.rs +++ b/src/io/write_fixed.rs @@ -2,6 +2,7 @@ use crate::buf::fixed::FixedBuf; use crate::buf::BoundedBuf; use crate::io::SharedFd; use crate::runtime::driver::op::{self, Completable, Op}; +use crate::sealed::WithBuffer; use crate::Result; use crate::runtime::CONTEXT; @@ -51,14 +52,6 @@ impl Completable for WriteFixed { type Output = Result; fn complete(self, cqe: op::CqeResult) -> Self::Output { - // Convert the operation result to `usize` - let res = cqe.result.map(|v| v as usize); - // Recover the buffer - let buf = self.buf; - - match res { - Ok(n) => Ok((n, buf)), - Err(e) => Err(crate::Error(e, buf)), - } + cqe.result.map(|v| v as usize).with_buffer(self.buf) } } diff --git a/src/io/writev.rs b/src/io/writev.rs index 33b34abe..e73de206 100644 --- a/src/io/writev.rs +++ b/src/io/writev.rs @@ -1,5 +1,6 @@ use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; +use crate::sealed::WithBuffer; use crate::{buf::BoundedBuf, io::SharedFd, Result}; use libc::iovec; use std::io; @@ -61,14 +62,6 @@ where type Output = Result>; fn complete(self, cqe: CqeResult) -> Self::Output { - // Convert the operation result to `usize` - let res = cqe.result.map(|v| v as usize); - // Recover the buffer - let buf = self.bufs; - - match res { - Ok(n) => Ok((n, buf)), - Err(e) => Err(crate::Error(e, buf)), - } + cqe.result.map(|v| v as usize).with_buffer(self.bufs) } } diff --git a/src/lib.rs b/src/lib.rs index a9f5f9c9..5405c9c5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -72,6 +72,7 @@ macro_rules! syscall { mod future; mod io; mod runtime; +mod types; pub mod buf; pub mod fs; @@ -81,9 +82,9 @@ pub use io::write::*; pub use runtime::driver::op::{InFlightOneshot, OneshotOutputTransform, UnsubmittedOneshot}; pub use runtime::spawn; pub use runtime::Runtime; +pub use types::*; use crate::runtime::driver::op::Op; -use std::fmt::{Debug, Display}; use std::future::Future; /// Starts an `io_uring` enabled Tokio runtime. @@ -232,85 +233,6 @@ impl Builder { } } -/// A specialized `Result` type for `io-uring` operations with buffers. -/// -/// This type is used as a return value for asynchronous `io-uring` methods that -/// require passing ownership of a buffer to the runtime. When the operation -/// completes, the buffer is returned both in the success tuple and as part of the error. -/// -/// # Examples -/// -/// ```no_run -/// use tokio_uring::fs::File; -/// -/// fn main() -> Result<(), Box> { -/// tokio_uring::start(async { -/// // Open a file -/// let file = File::open("hello.txt").await?; -/// -/// let buf = vec![0; 4096]; -/// // Read some data, the buffer is passed by ownership and -/// // submitted to the kernel. When the operation completes, -/// // we get the buffer back. -/// let (n, buf) = file.read_at(buf, 0).await?; -/// -/// // Display the contents -/// println!("{:?}", &buf[..n]); -/// -/// Ok(()) -/// }) -/// } -/// ``` -pub type Result = std::result::Result<(T, B), Error>; - -/// A specialized `Error` type for `io-uring` operations with buffers. -pub struct Error(pub std::io::Error, pub B); -impl Debug for Error { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - Debug::fmt(&self.0, f) - } -} - -impl Display for Error { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - Display::fmt(&self.0, f) - } -} - -impl std::error::Error for Error { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - Some(&self.0) - } -} - -impl Error { - /// Applies a function to the contained buffer, returning a new `BufError`. - pub fn map(self, f: F) -> Error - where - F: FnOnce(B) -> U, - { - Error(self.0, f(self.1)) - } -} - -mod sealed { - /// A Specialized trait for mapping over the buffer in both sides of a Result - pub trait MapResultBuf { - type Output; - fn map_buf(self, f: impl FnOnce(B) -> U) -> Self::Output; - } -} - -impl sealed::MapResultBuf for Result { - type Output = Result; - fn map_buf(self, f: impl FnOnce(B) -> U) -> Self::Output { - match self { - Ok((r, b)) => Ok((r, f(b))), - Err(e) => Err(e.map(f)), - } - } -} - /// The simplest possible operation. Just posts a completion event, nothing else. /// /// This has a place in benchmarking and sanity checking uring. diff --git a/src/types.rs b/src/types.rs new file mode 100644 index 00000000..c063eddd --- /dev/null +++ b/src/types.rs @@ -0,0 +1,99 @@ +use std::fmt::{Debug, Display}; + +/// A specialized `Result` type for `io-uring` operations with buffers. +/// +/// This type is used as a return value for asynchronous `io-uring` methods that +/// require passing ownership of a buffer to the runtime. When the operation +/// completes, the buffer is returned both in the success tuple and as part of the error. +/// +/// # Examples +/// +/// ```no_run +/// use tokio_uring::fs::File; +/// +/// fn main() -> Result<(), Box> { +/// tokio_uring::start(async { +/// // Open a file +/// let file = File::open("hello.txt").await?; +/// +/// let buf = vec![0; 4096]; +/// // Read some data, the buffer is passed by ownership and +/// // submitted to the kernel. When the operation completes, +/// // we get the buffer back. +/// let (n, buf) = file.read_at(buf, 0).await?; +/// +/// // Display the contents +/// println!("{:?}", &buf[..n]); +/// +/// Ok(()) +/// }) +/// } +/// ``` +pub type Result = std::result::Result<(T, B), Error>; + +/// A specialized `Error` type for `io-uring` operations with buffers. +pub struct Error(pub std::io::Error, pub B); +impl Debug for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Debug::fmt(&self.0, f) + } +} + +impl Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Display::fmt(&self.0, f) + } +} + +impl std::error::Error for Error { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + Some(&self.0) + } +} + +impl Error { + /// Applies a function to the contained buffer, returning a new `BufError`. + pub fn map(self, f: F) -> Error + where + F: FnOnce(B) -> U, + { + Error(self.0, f(self.1)) + } +} + +pub(super) mod sealed { + /// A Specialized trait for mapping over the buffer in both sides of a Result + pub trait MapResult { + type Output; + fn map_buf(self, f: impl FnOnce(B) -> U) -> Self::Output; + } + + /// Adapter trait to convert result::Result to crate::Result where E can be + /// converted to std::io::Error. + pub trait WithBuffer: Sized { + fn with_buffer(self, buf: B) -> T; + } +} + +impl sealed::MapResult for Result { + type Output = Result; + fn map_buf(self, f: impl FnOnce(B) -> U) -> Self::Output { + match self { + Ok((r, b)) => Ok((r, f(b))), + Err(e) => Err(e.map(f)), + } + } +} + +/// Adaptor implementation for Result to Result. +impl sealed::WithBuffer, B> for std::result::Result +where + E: Into, +{ + fn with_buffer(self, buf: B) -> Result { + match self { + Ok(res) => Ok((res, buf)), + Err(e) => Err(crate::Error(e.into(), buf)), + } + } +} From bba4b45664a132aa57d91a1b73fc64159d812622 Mon Sep 17 00:00:00 2001 From: Oliver Bunting <72926894+ollie-etl@users.noreply.github.com> Date: Thu, 15 Feb 2024 08:36:15 +0000 Subject: [PATCH 6/8] Expose trait externally --- src/fs/file.rs | 2 +- src/io/read.rs | 2 +- src/io/readv.rs | 2 +- src/io/recv_from.rs | 2 +- src/io/recvmsg.rs | 2 +- src/io/send_to.rs | 2 +- src/io/send_zc.rs | 2 +- src/io/write.rs | 2 +- src/io/write_fixed.rs | 2 +- src/io/writev.rs | 2 +- src/types.rs | 31 ++++++++++++++++++------------- 11 files changed, 28 insertions(+), 23 deletions(-) diff --git a/src/fs/file.rs b/src/fs/file.rs index ebf061ad..451223ac 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -4,7 +4,7 @@ use crate::fs::OpenOptions; use crate::io::SharedFd; use crate::runtime::driver::op::Op; -use crate::sealed::MapResult; +use crate::MapResult; use crate::{UnsubmittedOneshot, UnsubmittedWrite}; use std::fmt; use std::io; diff --git a/src/io/read.rs b/src/io/read.rs index d87af8c1..c98ed532 100644 --- a/src/io/read.rs +++ b/src/io/read.rs @@ -1,7 +1,7 @@ use crate::buf::BoundedBufMut; use crate::io::SharedFd; -use crate::sealed::WithBuffer; use crate::Result; +use crate::WithBuffer; use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; diff --git a/src/io/readv.rs b/src/io/readv.rs index 44843a21..9fc0a902 100644 --- a/src/io/readv.rs +++ b/src/io/readv.rs @@ -1,6 +1,6 @@ use crate::buf::BoundedBufMut; -use crate::sealed::WithBuffer; use crate::Result; +use crate::WithBuffer; use crate::io::SharedFd; use crate::runtime::driver::op::{Completable, CqeResult, Op}; diff --git a/src/io/recv_from.rs b/src/io/recv_from.rs index f7537801..2dc979a9 100644 --- a/src/io/recv_from.rs +++ b/src/io/recv_from.rs @@ -1,6 +1,6 @@ use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; -use crate::sealed::WithBuffer; +use crate::WithBuffer; use crate::{buf::BoundedBufMut, io::SharedFd, Result}; use socket2::SockAddr; use std::{ diff --git a/src/io/recvmsg.rs b/src/io/recvmsg.rs index 71dc8215..ad634422 100644 --- a/src/io/recvmsg.rs +++ b/src/io/recvmsg.rs @@ -1,6 +1,6 @@ use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; -use crate::sealed::WithBuffer; +use crate::WithBuffer; use crate::{buf::BoundedBufMut, io::SharedFd, Result}; use socket2::SockAddr; use std::{ diff --git a/src/io/send_to.rs b/src/io/send_to.rs index e41cd399..9a15210c 100644 --- a/src/io/send_to.rs +++ b/src/io/send_to.rs @@ -2,8 +2,8 @@ use crate::buf::BoundedBuf; use crate::io::SharedFd; use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; -use crate::sealed::WithBuffer; use crate::Result; +use crate::WithBuffer; use socket2::SockAddr; use std::io::IoSlice; use std::{boxed::Box, io, net::SocketAddr}; diff --git a/src/io/send_zc.rs b/src/io/send_zc.rs index 838e1801..f15ccbe7 100644 --- a/src/io/send_zc.rs +++ b/src/io/send_zc.rs @@ -1,6 +1,6 @@ use crate::runtime::driver::op::{Completable, CqeResult, MultiCQEFuture, Op, Updateable}; use crate::runtime::CONTEXT; -use crate::sealed::WithBuffer; +use crate::WithBuffer; use crate::{buf::BoundedBuf, io::SharedFd, Result}; use std::io; diff --git a/src/io/write.rs b/src/io/write.rs index 72bee1d8..e2a95f32 100644 --- a/src/io/write.rs +++ b/src/io/write.rs @@ -1,4 +1,4 @@ -use crate::sealed::WithBuffer; +use crate::WithBuffer; use crate::{buf::BoundedBuf, io::SharedFd, OneshotOutputTransform, Result, UnsubmittedOneshot}; use io_uring::cqueue::Entry; use std::io; diff --git a/src/io/write_fixed.rs b/src/io/write_fixed.rs index d99e74d0..9119e8c9 100644 --- a/src/io/write_fixed.rs +++ b/src/io/write_fixed.rs @@ -2,8 +2,8 @@ use crate::buf::fixed::FixedBuf; use crate::buf::BoundedBuf; use crate::io::SharedFd; use crate::runtime::driver::op::{self, Completable, Op}; -use crate::sealed::WithBuffer; use crate::Result; +use crate::WithBuffer; use crate::runtime::CONTEXT; use std::io; diff --git a/src/io/writev.rs b/src/io/writev.rs index e73de206..a2833100 100644 --- a/src/io/writev.rs +++ b/src/io/writev.rs @@ -1,6 +1,6 @@ use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; -use crate::sealed::WithBuffer; +use crate::WithBuffer; use crate::{buf::BoundedBuf, io::SharedFd, Result}; use libc::iovec; use std::io; diff --git a/src/types.rs b/src/types.rs index c063eddd..40742bd8 100644 --- a/src/types.rs +++ b/src/types.rs @@ -61,21 +61,26 @@ impl Error { } } -pub(super) mod sealed { - /// A Specialized trait for mapping over the buffer in both sides of a Result - pub trait MapResult { - type Output; - fn map_buf(self, f: impl FnOnce(B) -> U) -> Self::Output; - } +mod private { + pub trait Sealed {} +} +impl private::Sealed for std::result::Result {} - /// Adapter trait to convert result::Result to crate::Result where E can be - /// converted to std::io::Error. - pub trait WithBuffer: Sized { - fn with_buffer(self, buf: B) -> T; - } +/// A Specialized trait for mapping over the buffer in both sides of a Result +pub trait MapResult: private::Sealed { + /// The result type after applying the map operation + type Output; + /// Apply a function over the buffer on both sides of the result + fn map_buf(self, f: impl FnOnce(B) -> U) -> Self::Output; } -impl sealed::MapResult for Result { +/// Adapter trait to convert result::Result to crate::Result where E can be +/// converted to std::io::Error. +pub trait WithBuffer: private::Sealed { + /// Insert a buffer into each side of the result + fn with_buffer(self, buf: B) -> T; +} +impl MapResult for Result { type Output = Result; fn map_buf(self, f: impl FnOnce(B) -> U) -> Self::Output { match self { @@ -86,7 +91,7 @@ impl sealed::MapResult for Result { } /// Adaptor implementation for Result to Result. -impl sealed::WithBuffer, B> for std::result::Result +impl WithBuffer, B> for std::result::Result where E: Into, { From 9533572655eac7efe8305d08df9abe43cc7e3803 Mon Sep 17 00:00:00 2001 From: ollie <72926894+ollie-etl@users.noreply.github.com> Date: Tue, 20 Feb 2024 10:46:23 +0000 Subject: [PATCH 7/8] Use with_buffer --- src/io/read_fixed.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/io/read_fixed.rs b/src/io/read_fixed.rs index fa5b159d..9c637110 100644 --- a/src/io/read_fixed.rs +++ b/src/io/read_fixed.rs @@ -68,9 +68,6 @@ where } } - match res { - Ok(n) => Ok((n, buf)), - Err(e) => Err(crate::Error(e, buf)), - } + res.with_buffer(buf) } } From 85b2e778e3c194262f4fdea95c17ba593fdeec2e Mon Sep 17 00:00:00 2001 From: ollie <72926894+ollie-etl@users.noreply.github.com> Date: Tue, 20 Feb 2024 10:47:09 +0000 Subject: [PATCH 8/8] Update src/io/read_fixed.rs --- src/io/read_fixed.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/io/read_fixed.rs b/src/io/read_fixed.rs index 9c637110..dba9d094 100644 --- a/src/io/read_fixed.rs +++ b/src/io/read_fixed.rs @@ -3,6 +3,7 @@ use crate::buf::BoundedBufMut; use crate::io::SharedFd; use crate::runtime::driver::op::{self, Completable, Op}; use crate::Result; +use crate::WithBuffer; use crate::runtime::CONTEXT; use std::io;