diff --git a/glommio/src/iou/sqe.rs b/glommio/src/iou/sqe.rs index 60a2ab53a..59eb6b0d8 100644 --- a/glommio/src/iou/sqe.rs +++ b/glommio/src/iou/sqe.rs @@ -5,7 +5,6 @@ use std::{ ops::{Deref, DerefMut}, os::unix::io::RawFd, ptr, - slice, }; use super::registrar::{UringFd, UringReadBuf, UringWriteBuf}; @@ -606,13 +605,23 @@ bitflags::bitflags! { /// A sequence of [`SQE`]s from the [`SubmissionQueue`][crate::SubmissionQueue]. pub struct SQEs<'ring> { - sqes: slice::IterMut<'ring, uring_sys::io_uring_sqe>, + sq: &'ring mut uring_sys::io_uring_sq, + first: u32, + count: u32, + consumed: u32, } impl<'ring> SQEs<'ring> { - pub(crate) fn new(slice: &'ring mut [uring_sys::io_uring_sqe]) -> SQEs<'ring> { + pub(crate) fn new( + sq: &'ring mut uring_sys::io_uring_sq, + first: u32, + count: u32, + ) -> SQEs<'ring> { SQEs { - sqes: slice.iter_mut(), + sq, + first, + count, + consumed: 0, } } @@ -646,14 +655,23 @@ impl<'ring> SQEs<'ring> { /// Remaining [`SQE`]s that can be modified. pub fn remaining(&self) -> u32 { - self.sqes.len() as u32 + (self.count - self.consumed) as u32 } fn consume(&mut self) -> Option> { - self.sqes.next().map(|sqe| { - unsafe { uring_sys::io_uring_prep_nop(sqe) } - SQE { sqe } - }) + if self.consumed < self.count { + unsafe { + let sqe = self + .sq + .sqes + .offset(((self.first + self.consumed) & *self.sq.kring_mask) as isize); + uring_sys::io_uring_prep_nop(sqe); + self.consumed += 1; + Some(SQE { sqe: &mut *sqe }) + } + } else { + None + } } } diff --git a/glommio/src/iou/submission_queue.rs b/glommio/src/iou/submission_queue.rs index cfd1a87cc..9972929d6 100644 --- a/glommio/src/iou/submission_queue.rs +++ b/glommio/src/iou/submission_queue.rs @@ -3,7 +3,6 @@ use std::{ io, marker::PhantomData, ptr::NonNull, - slice, sync::atomic::{self, Ordering}, time::Duration, }; @@ -119,19 +118,16 @@ pub(crate) unsafe fn prepare_sqe<'a>(ring: &mut uring_sys::io_uring) -> Option( - sq: &mut uring_sys::io_uring_sq, - count: u32, -) -> Option> { +pub(crate) unsafe fn prepare_sqes(sq: &mut uring_sys::io_uring_sq, count: u32) -> Option> { atomic::fence(Ordering::Acquire); let head: u32 = *sq.khead; let next: u32 = sq.sqe_tail + count; if next - head <= *sq.kring_entries { - let sqe = sq.sqes.offset((sq.sqe_tail & *sq.kring_mask) as isize); + let first = sq.sqe_tail; sq.sqe_tail = next; - Some(SQEs::new(slice::from_raw_parts_mut(sqe, count as usize))) + Some(SQEs::new(sq, first, count)) } else { None } diff --git a/glommio/src/sys/mod.rs b/glommio/src/sys/mod.rs index b9487d815..289da5566 100644 --- a/glommio/src/sys/mod.rs +++ b/glommio/src/sys/mod.rs @@ -345,12 +345,6 @@ pub(crate) enum PollableStatus { NonPollable(DirectIO), } -#[derive(Debug, Copy, Clone)] -pub(crate) enum LinkStatus { - Freestanding, - Linked, -} - #[derive(Debug)] pub(crate) enum SourceType { Write(PollableStatus, IOBuffer), @@ -373,7 +367,7 @@ pub(crate) enum SourceType { FdataSync, Fallocate, Close, - LinkRings(LinkStatus), + LinkRings, Statx(CString, Box>), Timeout(TimeSpec64), Connect(SockAddr), diff --git a/glommio/src/sys/uring.rs b/glommio/src/sys/uring.rs index f1ffa4dd2..fd19aa5a8 100644 --- a/glommio/src/sys/uring.rs +++ b/glommio/src/sys/uring.rs @@ -34,7 +34,6 @@ use crate::{ DirectIO, IOBuffer, InnerSource, - LinkStatus, PollableStatus, Source, SourceType, @@ -790,10 +789,6 @@ impl Source { self.inner.source_type.borrow_mut() } - pub(crate) fn update_source_type(&self, source_type: SourceType) -> SourceType { - self.inner.update_source_type(source_type) - } - pub(crate) fn extract_source_type(&self) -> SourceType { self.inner.update_source_type(SourceType::Invalid) } @@ -926,30 +921,21 @@ impl SleepableRing { } fn sleep(&mut self, link: &mut Source, eventfd_src: &Source) -> io::Result { - let is_freestanding = match &*link.source_type() { - SourceType::LinkRings(LinkStatus::Linked) => false, // nothing to do - SourceType::LinkRings(LinkStatus::Freestanding) => true, - _ => panic!("Unexpected source type when linking rings"), - }; - - if is_freestanding { - if let Some(mut sqe) = self.ring.prepare_sqe() { - self.waiting_submission += 1; - link.update_source_type(SourceType::LinkRings(LinkStatus::Linked)); + if let Some(mut sqe) = self.ring.prepare_sqe() { + self.waiting_submission += 1; - let op = UringDescriptor { - fd: link.raw(), - flags: SubmissionFlags::empty(), - user_data: to_user_data(add_source(link, self.submission_queue.clone())), - args: UringOpDescriptor::PollAdd(common_flags() | read_flags()), - }; - fill_sqe(&mut sqe, &op, DmaBuffer::new); - } else { - // Can't link rings because we ran out of CQEs. Just can't sleep. - // Submit what we have, once we're out of here we'll consume them - // and at some point will be able to sleep again. - return self.ring.submit_sqes().map(|x| x as usize); - } + let op = UringDescriptor { + fd: link.raw(), + flags: SubmissionFlags::empty(), + user_data: to_user_data(add_source(link, self.submission_queue.clone())), + args: UringOpDescriptor::PollAdd(common_flags() | read_flags()), + }; + fill_sqe(&mut sqe, &op, DmaBuffer::new); + } else { + // Can't link rings because we ran out of CQEs. Just can't sleep. + // Submit what we have, once we're out of here we'll consume them + // and at some point will be able to sleep again. + return self.ring.submit_sqes().map(|x| x as usize); } let res = eventfd_src.take_result(); @@ -1002,13 +988,7 @@ impl UringCommon for SleepableRing { process_one_event( self.ring.peek_for_cqe(), |source| match &mut *source.source_type.borrow_mut() { - SourceType::LinkRings(status @ LinkStatus::Linked) => { - *status = LinkStatus::Freestanding; - Some(()) - } - SourceType::LinkRings(LinkStatus::Freestanding) => { - panic!("Impossible to have an event firing like this"); - } + SourceType::LinkRings => Some(()), SourceType::Timeout(_) => Some(()), _ => None, }, @@ -1062,9 +1042,10 @@ pub(crate) struct Reactor { latency_ring: RefCell, poll_ring: RefCell, - link_rings_src: RefCell, timeout_src: Cell>, + link_fd: RawFd, + // This keeps the eventfd alive. Drop will close it when we're done notifier: Arc, // This is the source used to handle the notifications into the ring @@ -1158,11 +1139,6 @@ impl Reactor { let latency_ring = SleepableRing::new(128, "latency", allocator.clone())?; let link_fd = latency_ring.ring_fd(); - let link_rings_src = Source::new( - IoRequirements::default(), - link_fd, - SourceType::LinkRings(LinkStatus::Freestanding), - ); let eventfd_src = Source::new( IoRequirements::default(), @@ -1175,8 +1151,8 @@ impl Reactor { main_ring: RefCell::new(main_ring), latency_ring: RefCell::new(latency_ring), poll_ring: RefCell::new(poll_ring), - link_rings_src: RefCell::new(link_rings_src), timeout_src: Cell::new(None), + link_fd, notifier, eventfd_src, }) @@ -1344,7 +1320,11 @@ impl Reactor { ring: &mut SleepableRing, eventfd_src: &Source, ) -> io::Result<()> { - let mut link_rings = self.link_rings_src.borrow_mut(); + let mut link_rings = Source::new( + IoRequirements::default(), + self.link_fd, + SourceType::LinkRings, + ); ring.sleep(&mut link_rings, eventfd_src) .or_else(Self::busy_ok)?; Ok(()) @@ -1467,6 +1447,9 @@ impl Reactor { .expect("some error"); // woke up, so no need to notify us anymore. self.notifier.wake_up(); + // may have new cancellations related to the link ring fd. + flush_cancellations!(into wakers; main_ring); + flush_rings!(main_ring)?; consume_rings!(into wakers; lat_ring, poll_ring, main_ring); } }