diff --git a/Cargo.lock b/Cargo.lock index f16ffa1..a09bcef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -70,6 +70,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "clang-sys" version = "1.8.1" @@ -108,6 +114,7 @@ version = "0.9.2" dependencies = [ "bincode", "ibverbs-sys", + "nix", "serde", ] @@ -163,6 +170,18 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" +[[package]] +name = "nix" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" +dependencies = [ + "bitflags", + "cfg-if", + "cfg_aliases", + "libc", +] + [[package]] name = "nom" version = "7.1.3" diff --git a/ibverbs/Cargo.toml b/ibverbs/Cargo.toml index c8a4986..e775276 100644 --- a/ibverbs/Cargo.toml +++ b/ibverbs/Cargo.toml @@ -20,6 +20,7 @@ license = "MIT OR Apache-2.0" [dependencies] ffi = { path = "../ibverbs-sys", package = "ibverbs-sys", version = "0.3.0" } +nix = { version = "0.29.0", default-features = false, features = ["fs", "poll"] } [dependencies.serde] version = "1.0.100" diff --git a/ibverbs/src/lib.rs b/ibverbs/src/lib.rs index af6cd21..8d8f19c 100644 --- a/ibverbs/src/lib.rs +++ b/ibverbs/src/lib.rs @@ -70,8 +70,10 @@ use std::ffi::CStr; use std::io; use std::marker::PhantomData; use std::mem; +use std::os::fd::BorrowedFd; use std::os::raw::c_void; use std::ptr; +use std::time::Duration; const PORT_NUM: u8 = 1; @@ -424,12 +426,26 @@ impl Context { /// - `EINVAL`: Invalid `min_cq_entries` (must be `1 <= cqe <= dev_cap.max_cqe`). /// - `ENOMEM`: Not enough resources to complete this operation. pub fn create_cq(&self, min_cq_entries: i32, id: isize) -> io::Result> { + let cc = unsafe { ffi::ibv_create_comp_channel(self.ctx) }; + if cc.is_null() { + return Err(io::Error::last_os_error()); + } + + let cc_fd = unsafe { *cc }.fd; + let flags = nix::fcntl::fcntl(cc_fd, nix::fcntl::F_GETFL)?; + // the file descriptor needs to be set to non-blocking because `ibv_get_cq_event()` + // would block otherwise. + let arg = nix::fcntl::FcntlArg::F_SETFL( + nix::fcntl::OFlag::from_bits_retain(flags) | nix::fcntl::OFlag::O_NONBLOCK, + ); + nix::fcntl::fcntl(cc_fd, arg)?; + let cq = unsafe { ffi::ibv_create_cq( self.ctx, min_cq_entries, ptr::null::().offset(id) as *mut _, - ptr::null::() as *mut _, + cc, 0, ) }; @@ -439,6 +455,7 @@ impl Context { } else { Ok(CompletionQueue { _phantom: PhantomData, + cc, cq, }) } @@ -479,6 +496,7 @@ impl Drop for Context { /// A completion queue that allows subscribing to the completion of queued sends and receives. pub struct CompletionQueue<'ctx> { _phantom: PhantomData<&'ctx ()>, + cc: *mut ffi::ibv_comp_channel, cq: *mut ffi::ibv_cq, } @@ -504,6 +522,10 @@ impl<'ctx> CompletionQueue<'ctx> { /// is not `IBV_WC_SUCCESS`, only the following attributes are valid: `wr_id`, `status`, /// `qp_num`, and `vendor_err`. /// + /// Callers must ensure the CQ does not overrun (exceed its capacity), as this triggers an + /// `IBV_EVENT_CQ_ERR` async event, rendering the CQ unusable. You can do this by limiting + /// the number of inflight Work Requests. + /// /// Note that `poll` does not block or cause a context switch. This is why RDMA technologies /// can achieve very low latency (below 1 µs). #[inline] @@ -533,6 +555,103 @@ impl<'ctx> CompletionQueue<'ctx> { Ok(&mut completions[0..n as usize]) } } + + /// Waits for one or more work completions in a Completion Queue (CQ). + /// + /// Unlike `poll`, this method blocks until at least one work completion is available or the + /// optional timeout expires. It is designed to wait efficiently for completions when polling + /// alone is insufficient, such as in low-traffic scenarios. + /// + /// The returned slice reflects completed work requests (e.g., sends, receives) from the + /// associated Work Queue. Not all fields in `ibv_wc` are valid unless the status is + /// `IBV_WC_SUCCESS`. + /// + /// # Errors + /// - `TimedOut`: If the timeout expires before any completions are available. + /// - System errors: From underlying calls like `req_notify_cq`, `poll`, or `ibv_get_cq_event`. + pub fn wait<'c>( + &self, + completions: &'c mut [ffi::ibv_wc], + timeout: Option, + ) -> io::Result<&'c mut [ffi::ibv_wc]> { + let c = completions as *mut [ffi::ibv_wc]; + + loop { + let polled_completions = self.poll(unsafe { &mut *c })?; + if !polled_completions.is_empty() { + return Ok(polled_completions); + } + + // SAFETY: dereferencing completion queue context, which is guaranteed to not have + // been destroyed yet because we don't destroy it until in Drop, and given we have + // self, Drop has not been called. The context is guaranteed to not have been destroyed + // because the `CompletionQueue` holds a reference to the `Context` and we only destroy + // the context in Drop implementation of the `Context`. + let ctx = unsafe { *self.cq }.context; + let errno = unsafe { + let ops = &mut { &mut *ctx }.ops; + ops.req_notify_cq.as_mut().unwrap()(self.cq, 0) + }; + if errno != 0 { + return Err(io::Error::from_raw_os_error(errno)); + } + + // We poll again to avoid a race when Work Completions arrive between the first `poll()` and `req_notify_cq()`. + let polled_completions = self.poll(unsafe { &mut *c })?; + if !polled_completions.is_empty() { + return Ok(polled_completions); + } + + let pollfd = nix::poll::PollFd::new( + // SAFETY: dereferencing completion queue context, which is guaranteed to not have + // been destroyed yet because we don't destroy it until in Drop, and given we have + // self, Drop has not been called. `fd` is guaranteed to not have been destroyed + // because only destroy it in the Drop implementation of this `CompletionQueue` and + // we still hold `self` here. + unsafe { BorrowedFd::borrow_raw({ *self.cc }.fd) }, + nix::poll::PollFlags::POLLIN, + ); + let ret = nix::poll::poll( + &mut [pollfd], + timeout + .map(nix::poll::PollTimeout::try_from) + .transpose() + .map_err(|_| io::Error::other("failed to convert timeout to PollTimeout"))?, + )?; + match ret { + 0 => { + return Err(io::Error::new( + io::ErrorKind::TimedOut, + "Timed out during completion queue wait", + )) + } + 1 => {} + _ => unreachable!("we passed 1 fd to poll, but it returned {ret}"), + } + + let mut out_cq = std::ptr::null_mut(); + let mut out_cq_context = std::ptr::null_mut(); + // The Completion Notification must be read using ibv_get_cq_event(). The file descriptor of + // `cq_context` was put into non-blocking mode to make `ibv_get_cq_event()` non-blocking. + // SAFETY: c ffi call + let rc = unsafe { ffi::ibv_get_cq_event(self.cc, &mut out_cq, &mut out_cq_context) }; + if rc < 0 { + let e = io::Error::last_os_error(); + if e.kind() == io::ErrorKind::WouldBlock { + continue; + } + return Err(e); + } + + assert_eq!(self.cq, out_cq); + // cq_context is the opaque user defined identifier passed to `ibv_create_cq()`. + assert!(out_cq_context.is_null()); + + // All completion events returned by ibv_get_cq_event() must eventually be acknowledged with ibv_ack_cq_events(). + // SAFETY: c ffi call + unsafe { ffi::ibv_ack_cq_events(self.cq, 1) }; + } + } } impl<'a> Drop for CompletionQueue<'a> { @@ -542,6 +661,12 @@ impl<'a> Drop for CompletionQueue<'a> { let e = io::Error::from_raw_os_error(errno); panic!("{}", e); } + + let errno = unsafe { ffi::ibv_destroy_comp_channel(self.cc) }; + if errno != 0 { + let e = io::Error::from_raw_os_error(errno); + panic!("{}", e); + } } }