Skip to content

cq wait using fd #47

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ibverbs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ license = "MIT OR Apache-2.0"

[dependencies]
ffi = { path = "../ibverbs-sys", package = "ibverbs-sys", version = "0.3.0" }
nix = { version = "0.29.0", default-features = false, features = ["fs", "poll"] }

[dependencies.serde]
version = "1.0.100"
Expand Down
127 changes: 126 additions & 1 deletion ibverbs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,10 @@ use std::ffi::CStr;
use std::io;
use std::marker::PhantomData;
use std::mem;
use std::os::fd::BorrowedFd;
use std::os::raw::c_void;
use std::ptr;
use std::time::Duration;

const PORT_NUM: u8 = 1;

Expand Down Expand Up @@ -424,12 +426,26 @@ impl Context {
/// - `EINVAL`: Invalid `min_cq_entries` (must be `1 <= cqe <= dev_cap.max_cqe`).
/// - `ENOMEM`: Not enough resources to complete this operation.
pub fn create_cq(&self, min_cq_entries: i32, id: isize) -> io::Result<CompletionQueue<'_>> {
let cc = unsafe { ffi::ibv_create_comp_channel(self.ctx) };
if cc.is_null() {
return Err(io::Error::last_os_error());
}

let cc_fd = unsafe { *cc }.fd;
let flags = nix::fcntl::fcntl(cc_fd, nix::fcntl::F_GETFL)?;
// the file descriptor needs to be set to non-blocking because `ibv_get_cq_event()`
// would block otherwise.
let arg = nix::fcntl::FcntlArg::F_SETFL(
nix::fcntl::OFlag::from_bits_retain(flags) | nix::fcntl::OFlag::O_NONBLOCK,
);
nix::fcntl::fcntl(cc_fd, arg)?;

let cq = unsafe {
ffi::ibv_create_cq(
self.ctx,
min_cq_entries,
ptr::null::<c_void>().offset(id) as *mut _,
ptr::null::<c_void>() as *mut _,
cc,
0,
)
};
Expand All @@ -439,6 +455,7 @@ impl Context {
} else {
Ok(CompletionQueue {
_phantom: PhantomData,
cc,
cq,
})
}
Expand Down Expand Up @@ -479,6 +496,7 @@ impl Drop for Context {
/// A completion queue that allows subscribing to the completion of queued sends and receives.
pub struct CompletionQueue<'ctx> {
_phantom: PhantomData<&'ctx ()>,
cc: *mut ffi::ibv_comp_channel,
cq: *mut ffi::ibv_cq,
}

Expand All @@ -504,6 +522,10 @@ impl<'ctx> CompletionQueue<'ctx> {
/// is not `IBV_WC_SUCCESS`, only the following attributes are valid: `wr_id`, `status`,
/// `qp_num`, and `vendor_err`.
///
/// Callers must ensure the CQ does not overrun (exceed its capacity), as this triggers an
/// `IBV_EVENT_CQ_ERR` async event, rendering the CQ unusable. You can do this by limiting
/// the number of inflight Work Requests.
///
/// Note that `poll` does not block or cause a context switch. This is why RDMA technologies
/// can achieve very low latency (below 1 µs).
#[inline]
Expand Down Expand Up @@ -533,6 +555,103 @@ impl<'ctx> CompletionQueue<'ctx> {
Ok(&mut completions[0..n as usize])
}
}

/// Waits for one or more work completions in a Completion Queue (CQ).
///
/// Unlike `poll`, this method blocks until at least one work completion is available or the
/// optional timeout expires. It is designed to wait efficiently for completions when polling
/// alone is insufficient, such as in low-traffic scenarios.
///
/// The returned slice reflects completed work requests (e.g., sends, receives) from the
/// associated Work Queue. Not all fields in `ibv_wc` are valid unless the status is
/// `IBV_WC_SUCCESS`.
///
/// # Errors
/// - `TimedOut`: If the timeout expires before any completions are available.
/// - System errors: From underlying calls like `req_notify_cq`, `poll`, or `ibv_get_cq_event`.
pub fn wait<'c>(
&self,
completions: &'c mut [ffi::ibv_wc],
timeout: Option<Duration>,
) -> io::Result<&'c mut [ffi::ibv_wc]> {
let c = completions as *mut [ffi::ibv_wc];

loop {
let polled_completions = self.poll(unsafe { &mut *c })?;
if !polled_completions.is_empty() {
return Ok(polled_completions);
}

// SAFETY: dereferencing completion queue context, which is guaranteed to not have
// been destroyed yet because we don't destroy it until in Drop, and given we have
// self, Drop has not been called. The context is guaranteed to not have been destroyed
// because the `CompletionQueue` holds a reference to the `Context` and we only destroy
// the context in Drop implementation of the `Context`.
let ctx = unsafe { *self.cq }.context;
let errno = unsafe {
let ops = &mut { &mut *ctx }.ops;
ops.req_notify_cq.as_mut().unwrap()(self.cq, 0)
};
if errno != 0 {
return Err(io::Error::from_raw_os_error(errno));
}

// We poll again to avoid a race when Work Completions arrive between the first `poll()` and `req_notify_cq()`.
let polled_completions = self.poll(unsafe { &mut *c })?;
if !polled_completions.is_empty() {
return Ok(polled_completions);
}

let pollfd = nix::poll::PollFd::new(
// SAFETY: dereferencing completion queue context, which is guaranteed to not have
// been destroyed yet because we don't destroy it until in Drop, and given we have
// self, Drop has not been called. `fd` is guaranteed to not have been destroyed
// because only destroy it in the Drop implementation of this `CompletionQueue` and
// we still hold `self` here.
unsafe { BorrowedFd::borrow_raw({ *self.cc }.fd) },
nix::poll::PollFlags::POLLIN,
);
let ret = nix::poll::poll(
&mut [pollfd],
timeout
.map(nix::poll::PollTimeout::try_from)
.transpose()
.map_err(|_| io::Error::other("failed to convert timeout to PollTimeout"))?,
)?;
match ret {
0 => {
return Err(io::Error::new(
io::ErrorKind::TimedOut,
"Timed out during completion queue wait",
))
}
1 => {}
_ => unreachable!("we passed 1 fd to poll, but it returned {ret}"),
}

let mut out_cq = std::ptr::null_mut();
let mut out_cq_context = std::ptr::null_mut();
// The Completion Notification must be read using ibv_get_cq_event(). The file descriptor of
// `cq_context` was put into non-blocking mode to make `ibv_get_cq_event()` non-blocking.
// SAFETY: c ffi call
let rc = unsafe { ffi::ibv_get_cq_event(self.cc, &mut out_cq, &mut out_cq_context) };
if rc < 0 {
let e = io::Error::last_os_error();
if e.kind() == io::ErrorKind::WouldBlock {
continue;
}
return Err(e);
}

assert_eq!(self.cq, out_cq);
// cq_context is the opaque user defined identifier passed to `ibv_create_cq()`.
assert!(out_cq_context.is_null());

// All completion events returned by ibv_get_cq_event() must eventually be acknowledged with ibv_ack_cq_events().
// SAFETY: c ffi call
unsafe { ffi::ibv_ack_cq_events(self.cq, 1) };
}
}
}

impl<'a> Drop for CompletionQueue<'a> {
Expand All @@ -542,6 +661,12 @@ impl<'a> Drop for CompletionQueue<'a> {
let e = io::Error::from_raw_os_error(errno);
panic!("{}", e);
}

let errno = unsafe { ffi::ibv_destroy_comp_channel(self.cc) };
if errno != 0 {
let e = io::Error::from_raw_os_error(errno);
panic!("{}", e);
}
}
}

Expand Down