@@ -70,6 +70,7 @@ use std::ffi::CStr;
70
70
use std:: io;
71
71
use std:: marker:: PhantomData ;
72
72
use std:: mem;
73
+ use std:: os:: fd:: BorrowedFd ;
73
74
use std:: os:: raw:: c_void;
74
75
use std:: ptr;
75
76
use std:: time:: Duration ;
@@ -431,15 +432,11 @@ impl Context {
431
432
}
432
433
433
434
let cc_fd = unsafe { * cc } . fd ;
434
- let flags = unsafe { libc:: fcntl ( cc_fd, libc:: F_GETFL ) } ;
435
- if flags < 0 {
436
- return Err ( io:: Error :: last_os_error ( ) ) ;
437
- }
438
-
439
- let rc = unsafe { libc:: fcntl ( cc_fd, libc:: F_SETFL , flags | libc:: O_NONBLOCK ) } ;
440
- if rc < 0 {
441
- return Err ( io:: Error :: last_os_error ( ) ) ;
442
- }
435
+ let flags = nix:: fcntl:: fcntl ( cc_fd, nix:: fcntl:: F_GETFL ) ?;
436
+ let arg = nix:: fcntl:: FcntlArg :: F_SETFL (
437
+ nix:: fcntl:: OFlag :: from_bits_retain ( flags) | nix:: fcntl:: OFlag :: O_NONBLOCK ,
438
+ ) ;
439
+ nix:: fcntl:: fcntl ( cc_fd, arg) ?;
443
440
444
441
let cq = unsafe {
445
442
ffi:: ibv_create_cq (
@@ -523,6 +520,10 @@ impl<'ctx> CompletionQueue<'ctx> {
523
520
/// is not `IBV_WC_SUCCESS`, only the following attributes are valid: `wr_id`, `status`,
524
521
/// `qp_num`, and `vendor_err`.
525
522
///
523
+ /// Callers must ensure the CQ does not overrun (exceed its capacity), as this triggers an
524
+ /// `IBV_EVENT_CQ_ERR` async event, rendering the CQ unusable. You can do this by limiting
525
+ /// the number of inflight Work Requests.
526
+ ///
526
527
/// Note that `poll` does not block or cause a context switch. This is why RDMA technologies
527
528
/// can achieve very low latency (below 1 µs).
528
529
#[ inline]
@@ -566,26 +567,22 @@ impl<'ctx> CompletionQueue<'ctx> {
566
567
/// # Errors
567
568
/// - `TimedOut`: If the timeout expires before any completions are available.
568
569
/// - System errors: From underlying calls like `req_notify_cq`, `poll`, or `ibv_get_cq_event`.
569
- ///
570
- /// # Notes
571
- /// - This method blocks, unlike `poll`, but avoids busy-waiting by leveraging CQ events and
572
- /// system polling.
573
- /// - Callers must ensure the CQ does not overrun (exceed its capacity), as this triggers an
574
- /// `IBV_EVENT_CQ_ERR` async event, rendering the CQ unusable.
575
570
pub fn wait < ' c > (
576
571
& self ,
577
572
completions : & ' c mut [ ffi:: ibv_wc ] ,
578
573
timeout : Option < Duration > ,
579
574
) -> io:: Result < & ' c mut [ ffi:: ibv_wc ] > {
580
575
let c = completions as * mut [ ffi:: ibv_wc ] ;
581
576
577
+ //
582
578
loop {
583
- let completions = self . poll ( unsafe { & mut * c } ) ?;
584
- if !completions . is_empty ( ) {
585
- return Ok ( completions ) ;
579
+ let polled_completions = self . poll ( unsafe { & mut * c } ) ?;
580
+ if !polled_completions . is_empty ( ) {
581
+ return Ok ( polled_completions ) ;
586
582
}
587
583
588
- let ctx: * mut ffi:: ibv_context = unsafe { * self . cq } . context ;
584
+ // SAFETY: dereferencing completion queue, which is guaranteed to not have been destroyed yet.
585
+ let ctx = unsafe { * self . cq } . context ;
589
586
let errno = unsafe {
590
587
let ops = & mut { & mut * ctx } . ops ;
591
588
ops. req_notify_cq . as_mut ( ) . unwrap ( ) ( self . cq , 0 )
@@ -594,25 +591,26 @@ impl<'ctx> CompletionQueue<'ctx> {
594
591
return Err ( io:: Error :: from_raw_os_error ( errno) ) ;
595
592
}
596
593
597
- let completions = self . poll ( unsafe { & mut * c } ) ?;
598
- if !completions. is_empty ( ) {
599
- return Ok ( completions) ;
594
+ // We poll again to avoid a race when Work Completions arrive between the first `poll()` and `req_notify_cq()`.
595
+ let polled_completions = self . poll ( unsafe { & mut * c } ) ?;
596
+ if !polled_completions. is_empty ( ) {
597
+ return Ok ( polled_completions) ;
600
598
}
601
599
602
- let mut pollfd = libc :: pollfd {
603
- fd : unsafe { * self . cc } . fd ,
604
- events : libc :: POLLIN ,
605
- revents : 0 ,
606
- } ;
607
- let rc = unsafe {
608
- libc :: poll (
609
- & mut pollfd,
610
- 1 ,
611
- timeout . map_or ( - 1 , |x| x . as_millis ( ) as libc :: c_int ) ,
612
- )
613
- } ;
614
- match rc {
615
- - 1 => return Err ( io :: Error :: last_os_error ( ) ) ,
600
+ // ibv_get_cq_event supports blocking operations, but the fd of cq_context was put into non blocking mode to support timeouts.
601
+ let pollfd = nix :: poll :: PollFd :: new (
602
+ // SAFETY: dereferencing completion queue context, which is guaranteed to not have been destroyed yet.
603
+ unsafe { BorrowedFd :: borrow_raw ( { * self . cc } . fd ) } ,
604
+ nix :: poll :: PollFlags :: POLLIN ,
605
+ ) ;
606
+ let ret = nix :: poll :: poll (
607
+ & mut [ pollfd] ,
608
+ timeout
609
+ . map ( nix :: poll :: PollTimeout :: try_from )
610
+ . transpose ( )
611
+ . map_err ( |_| io :: Error :: other ( "failedd to convert timeout to PollTimeout" ) ) ? ,
612
+ ) ? ;
613
+ match ret {
616
614
0 => {
617
615
return Err ( io:: Error :: new (
618
616
io:: ErrorKind :: TimedOut ,
@@ -624,7 +622,10 @@ impl<'ctx> CompletionQueue<'ctx> {
624
622
}
625
623
626
624
let mut out_cq = std:: ptr:: null_mut ( ) ;
625
+ // The cq_context is an opaque identifier that
627
626
let mut out_cq_context = std:: ptr:: null_mut ( ) ;
627
+ // The Completion Notification must be read using ibv_get_cq_event().
628
+ // SAFETY: c ffi call
628
629
let rc = unsafe { ffi:: ibv_get_cq_event ( self . cc , & mut out_cq, & mut out_cq_context) } ;
629
630
if rc < 0 {
630
631
let e = io:: Error :: last_os_error ( ) ;
@@ -635,9 +636,12 @@ impl<'ctx> CompletionQueue<'ctx> {
635
636
}
636
637
637
638
assert_eq ! ( self . cq, out_cq) ;
638
- unsafe {
639
- ffi:: ibv_ack_cq_events ( self . cq , 1 ) ;
640
- } ;
639
+ // cq_context is the user defined value passed during ibv_create_cq().
640
+ assert ! ( out_cq_context. is_null( ) ) ;
641
+
642
+ // All completion events returned by ibv_get_cq_event() must eventually be acknowledged with ibv_ack_cq_events().
643
+ // SAFETY: c ffi call
644
+ unsafe { ffi:: ibv_ack_cq_events ( self . cq , 1 ) } ;
641
645
}
642
646
}
643
647
}
0 commit comments