diff --git a/resources/seccomp/aarch64-unknown-linux-musl.json b/resources/seccomp/aarch64-unknown-linux-musl.json index 868e7ce0e99..3aaef84daa3 100644 --- a/resources/seccomp/aarch64-unknown-linux-musl.json +++ b/resources/seccomp/aarch64-unknown-linux-musl.json @@ -25,6 +25,10 @@ { "syscall": "read" }, + { + "syscall": "readv", + "comment": "Used by the VirtIO net device to read from tap" + }, { "syscall": "write" }, diff --git a/resources/seccomp/x86_64-unknown-linux-musl.json b/resources/seccomp/x86_64-unknown-linux-musl.json index e5b4b690196..c06b69572cc 100644 --- a/resources/seccomp/x86_64-unknown-linux-musl.json +++ b/resources/seccomp/x86_64-unknown-linux-musl.json @@ -25,6 +25,10 @@ { "syscall": "read" }, + { + "syscall": "readv", + "comment": "Used by the VirtIO net device to read from tap" + }, { "syscall": "write" }, diff --git a/src/vmm/src/devices/virtio/iovec.rs b/src/vmm/src/devices/virtio/iovec.rs index fd48a94ca2c..336a1fb8118 100644 --- a/src/vmm/src/devices/virtio/iovec.rs +++ b/src/vmm/src/devices/virtio/iovec.rs @@ -45,6 +45,16 @@ pub struct IoVecBuffer { len: u32, } +impl IoVecBuffer { + /// Create an empty `IoVecBuffer` with a given capacity. + pub fn with_capacity(cap: usize) -> IoVecBuffer { + IoVecBuffer { + vecs: IoVecVec::with_capacity(cap), + len: 0, + } + } +} + // SAFETY: `IoVecBuffer` doesn't allow for interior mutability and no shared ownership is possible // as it doesn't implement clone unsafe impl Send for IoVecBuffer {} @@ -57,12 +67,11 @@ impl IoVecBuffer { /// The descriptor chain cannot be referencing the same memory location as another chain pub unsafe fn load_descriptor_chain( &mut self, - head: DescriptorChain, + mut desc: DescriptorChain, ) -> Result<(), IoVecError> { self.clear(); - let mut next_descriptor = Some(head); - while let Some(desc) = next_descriptor { + loop { if desc.is_write_only() { return Err(IoVecError::WriteOnlyDescriptor); } @@ -85,7 +94,9 @@ impl IoVecBuffer { .checked_add(desc.len) .ok_or(IoVecError::OverflowedDescriptor)?; - next_descriptor = desc.next_descriptor(); + if desc.load_next_descriptor().is_none() { + break; + } } Ok(()) @@ -217,7 +228,7 @@ impl IoVecBuffer { /// It describes a write-only buffer passed to us by the guest that is scattered across multiple /// memory regions. Additionally, this wrapper provides methods that allow reading arbitrary ranges /// of data from that buffer. -#[derive(Debug)] +#[derive(Debug, Default)] pub struct IoVecBufferMut { // container of the memory regions included in this IO vector vecs: IoVecVec, @@ -225,13 +236,31 @@ pub struct IoVecBufferMut { len: u32, } +impl IoVecBufferMut { + /// Create an empty `IoVecBufferMut` with a given capacity. + pub fn with_capacity(cap: usize) -> IoVecBufferMut { + IoVecBufferMut { + vecs: IoVecVec::with_capacity(cap), + len: 0, + } + } +} + +// SAFETY: iovec pointers are safe to send across threads. +unsafe impl Send for IoVecBufferMut {} + impl IoVecBufferMut { /// Create an `IoVecBufferMut` from a `DescriptorChain` - pub fn from_descriptor_chain(head: DescriptorChain) -> Result { - let mut vecs = IoVecVec::new(); - let mut len = 0u32; + /// # Safety + /// The descriptor chain cannot be referencing the same memory location as another chain. + pub unsafe fn load_descriptor_chain( + &mut self, + mut desc: DescriptorChain, + max_size: Option, + ) -> Result<(), IoVecError> { + self.clear(); - for desc in head { + loop { if !desc.is_write_only() { return Err(IoVecError::ReadOnlyDescriptor); } @@ -247,16 +276,34 @@ impl IoVecBufferMut { slice.bitmap().mark_dirty(0, desc.len as usize); let iov_base = slice.ptr_guard_mut().as_ptr().cast::(); - vecs.push(iovec { + self.vecs.push(iovec { iov_base, iov_len: desc.len as size_t, }); - len = len + self.len = self + .len .checked_add(desc.len) .ok_or(IoVecError::OverflowedDescriptor)?; + if matches!(max_size, Some(max) if self.len >= max) { + break; + } + if desc.load_next_descriptor().is_none() { + break; + } } - Ok(Self { vecs, len }) + Ok(()) + } + + /// Create an `IoVecBufferMut` from a `DescriptorChain` + /// # Safety + /// The descriptor chain cannot be referencing the same memory location as another chain. + pub unsafe fn from_descriptor_chain(head: DescriptorChain) -> Result { + let mut new_buffer = Self::default(); + + Self::load_descriptor_chain(&mut new_buffer, head, None)?; + + Ok(new_buffer) } /// Get the total length of the memory regions covered by this `IoVecBuffer` @@ -264,6 +311,39 @@ impl IoVecBufferMut { self.len } + /// Returns a pointer to the memory keeping the `iovec` structs + pub fn as_iovec_ptr(&self) -> *const iovec { + self.vecs.as_ptr() + } + + /// Returns the length of the `iovec` array. + pub fn iovec_count(&self) -> usize { + self.vecs.len() + } + + /// Clears the `iovec` array + pub fn clear(&mut self) { + self.vecs.clear(); + self.len = 0u32; + } + + /// Push an iovec into the `IoVecBufferMut`. + /// # Safety + /// The iovec must refer to a valid memory slice. + pub unsafe fn push(&mut self, iovec: iovec) -> Result<(), IoVecError> { + let iov_len = iovec + .iov_len + .try_into() + .map_err(|_| IoVecError::OverflowedDescriptor)?; + + self.vecs.push(iovec); + self.len = self + .len + .checked_add(iov_len) + .ok_or(IoVecError::OverflowedDescriptor)?; + Ok(()) + } + /// Writes a number of bytes into the `IoVecBufferMut` starting at a given offset. /// /// This will try to fill `IoVecBufferMut` writing bytes from the `buf` starting from @@ -407,6 +487,24 @@ mod tests { } } + impl<'a> From> for IoVecBufferMut { + fn from(buffer: Vec<&'a mut [u8]>) -> Self { + let mut len = 0_u32; + let vecs = buffer + .into_iter() + .map(|slice| { + len += TryInto::::try_into(slice.len()).unwrap(); + iovec { + iov_base: slice.as_ptr() as *mut c_void, + iov_len: slice.len(), + } + }) + .collect(); + + Self { vecs, len } + } + } + fn default_mem() -> GuestMemoryMmap { multi_region_mem(&[ (GuestAddress(0), 0x10000), @@ -468,11 +566,13 @@ mod tests { let (mut q, _) = read_only_chain(&mem); let head = q.pop(&mem).unwrap(); - IoVecBufferMut::from_descriptor_chain(head).unwrap_err(); + // SAFETY: This descriptor chain is only loaded into one buffer. + unsafe { IoVecBufferMut::from_descriptor_chain(head).unwrap_err() }; let (mut q, _) = write_only_chain(&mem); let head = q.pop(&mem).unwrap(); - IoVecBufferMut::from_descriptor_chain(head).unwrap(); + // SAFETY: This descriptor chain is only loaded into one buffer. + unsafe { IoVecBufferMut::from_descriptor_chain(head).unwrap() }; } #[test] @@ -493,7 +593,7 @@ mod tests { let head = q.pop(&mem).unwrap(); // SAFETY: This descriptor chain is only loaded once in this test - let iovec = IoVecBufferMut::from_descriptor_chain(head).unwrap(); + let iovec = unsafe { IoVecBufferMut::from_descriptor_chain(head).unwrap() }; assert_eq!(iovec.len(), 4 * 64); } @@ -558,7 +658,8 @@ mod tests { // This is a descriptor chain with 4 elements 64 bytes long each. let head = q.pop(&mem).unwrap(); - let mut iovec = IoVecBufferMut::from_descriptor_chain(head).unwrap(); + // SAFETY: This descriptor chain is only loaded into one buffer. + let mut iovec = unsafe { IoVecBufferMut::from_descriptor_chain(head).unwrap() }; let buf = vec![0u8, 1, 2, 3, 4]; // One test vector for each part of the chain diff --git a/src/vmm/src/devices/virtio/net/device.rs b/src/vmm/src/devices/virtio/net/device.rs index e34676b2c31..e978f3b92c8 100755 --- a/src/vmm/src/devices/virtio/net/device.rs +++ b/src/vmm/src/devices/virtio/net/device.rs @@ -5,13 +5,11 @@ // Use of this source code is governed by a BSD-style license that can be // found in the THIRD-PARTY file. -#[cfg(not(test))] -use std::io::Read; use std::mem; use std::net::Ipv4Addr; +use std::num::NonZeroUsize; use std::sync::{Arc, Mutex}; -use libc::EAGAIN; use log::{error, warn}; use utils::eventfd::EventFd; use utils::net::mac::MacAddr; @@ -26,7 +24,7 @@ use crate::devices::virtio::gen::virtio_net::{ VIRTIO_NET_F_HOST_TSO6, VIRTIO_NET_F_HOST_UFO, VIRTIO_NET_F_MAC, }; use crate::devices::virtio::gen::virtio_ring::VIRTIO_RING_F_EVENT_IDX; -use crate::devices::virtio::iovec::IoVecBuffer; +use crate::devices::virtio::iovec::{IoVecBuffer, IoVecBufferMut}; use crate::devices::virtio::net::metrics::{NetDeviceMetrics, NetMetricsPerDevice}; use crate::devices::virtio::net::tap::Tap; use crate::devices::virtio::net::{ @@ -47,12 +45,8 @@ const FRAME_HEADER_MAX_LEN: usize = PAYLOAD_OFFSET + ETH_IPV4_FRAME_LEN; #[derive(Debug, thiserror::Error, displaydoc::Display)] enum FrontendError { - /// Add user. - AddUsed, /// Descriptor chain too mall. DescriptorChainTooSmall, - /// Empty queue. - EmptyQueue, /// Guest memory error: {0} GuestMemory(GuestMemoryError), /// Read only descriptor. @@ -103,6 +97,60 @@ pub struct ConfigSpace { // SAFETY: `ConfigSpace` contains only PODs in `repr(C)` or `repr(transparent)`, without padding. unsafe impl ByteValued for ConfigSpace {} +// Ideally, we should put all readiness inside the Readiness struct. +// But let rate limiter maintain its own readiness is simpler. +#[derive(Debug)] +struct Readiness(u8); + +impl Default for Readiness { + fn default() -> Self { + Readiness(Self::FULL) + } +} + +macro_rules! define_mark_clear { + ($mark:ident, $clear:ident, $get:ident, $mask:ident) => { + #[allow(unused)] + fn $mark(&mut self) { + self.0 |= Self::$mask; + } + + #[allow(unused)] + fn $clear(&mut self) { + self.0 &= !Self::$mask; + } + + #[allow(unused)] + fn $get(&self) -> bool { + self.0 & Self::$mask != 0 + } + }; +} + +impl Readiness { + const TAP_READY: u8 = 1 << 0; + const RX_READY: u8 = 1 << 1; + const TX_READY: u8 = 1 << 2; + const MMDS_READY: u8 = 1 << 3; + + const FULL: u8 = Self::TAP_READY | Self::RX_READY | Self::TX_READY | Self::MMDS_READY; + const RX_MMDS_CHECK: u8 = Self::MMDS_READY | Self::RX_READY; + const RX_TAP_CHECK: u8 = Self::TAP_READY | Self::RX_READY; + + define_mark_clear!(mark_tap_ready, clear_tap_ready, tap_ready, TAP_READY); + define_mark_clear!(mark_rx_ready, clear_rx_ready, rx_ready, RX_READY); + define_mark_clear!(mark_tx_ready, clear_tx_ready, tx_ready, TX_READY); + define_mark_clear!(mark_mmds_ready, clear_mmds_ready, mmds_ready, MMDS_READY); + + fn rx_mmds_pre_check(&self) -> bool { + Self::RX_MMDS_CHECK & self.0 == Self::RX_MMDS_CHECK + } + + fn rx_tap_pre_check(&self) -> bool { + Self::RX_TAP_CHECK & self.0 == Self::RX_TAP_CHECK + } +} + /// VirtIO network device. /// /// It emulates a network device able to exchange L2 frames between the guest @@ -122,12 +170,10 @@ pub struct Net { pub(crate) rx_rate_limiter: RateLimiter, pub(crate) tx_rate_limiter: RateLimiter, + readiness: Readiness, - pub(crate) rx_deferred_frame: bool, - - rx_bytes_read: usize, - rx_frame_buf: [u8; MAX_BUFFER_SIZE], - + rx_mmds_read: Option, + rx_mmds_buf: [u8; MAX_BUFFER_SIZE], tx_frame_headers: [u8; frame_hdr_len()], pub(crate) irq_trigger: IrqTrigger, @@ -144,9 +190,13 @@ pub struct Net { pub(crate) metrics: Arc, tx_buffer: IoVecBuffer, + rx_buffer: IoVecBufferMut, } impl Net { + // 19 = 1 vnethdr + 1 ethhdr + 17 MAX_SKB_FRAGS. + const COMMON_CHAIN_LEN: usize = 19; + /// Create a new virtio network device with the given TAP interface. pub fn new_with_tap( id: String, @@ -190,9 +240,9 @@ impl Net { queue_evts, rx_rate_limiter, tx_rate_limiter, - rx_deferred_frame: false, - rx_bytes_read: 0, - rx_frame_buf: [0u8; MAX_BUFFER_SIZE], + readiness: Readiness::default(), + rx_mmds_read: None, + rx_mmds_buf: [0u8; MAX_BUFFER_SIZE], tx_frame_headers: [0u8; frame_hdr_len()], irq_trigger: IrqTrigger::new().map_err(NetError::EventFd)?, config_space, @@ -201,7 +251,8 @@ impl Net { activate_evt: EventFd::new(libc::EFD_NONBLOCK).map_err(NetError::EventFd)?, mmds_ns: None, metrics: NetMetricsPerDevice::alloc(id), - tx_buffer: Default::default(), + tx_buffer: IoVecBuffer::with_capacity(Self::COMMON_CHAIN_LEN), + rx_buffer: IoVecBufferMut::with_capacity(Self::COMMON_CHAIN_LEN), }) } @@ -312,25 +363,381 @@ impl Net { rate_limiter.manual_replenish(size, TokenType::Bytes); } - // Attempts to copy a single frame into the guest if there is enough - // rate limiting budget. - // Returns true on successful frame delivery. - fn rate_limited_rx_single_frame(&mut self) -> bool { - if !Self::rate_limiter_consume_op(&mut self.rx_rate_limiter, self.rx_bytes_read as u64) { + pub fn process_rx_rate_limiter_event(&mut self) { + self.metrics.rx_event_rate_limiter_count.inc(); + + // Upon rate limiter event, call the rate limiter handler + // and restart processing the queue. + if let Err(err) = self.rx_rate_limiter.event_handler() { + error!("Failed to get rx rate-limiter event: {:?}", err); + self.metrics.event_fails.inc(); + return; + } + + // There might be enough budget now to receive the frame. + self.process_rx() + .unwrap_or_else(|err| report_net_event_fail(&self.metrics, err)); + } + + pub fn process_rx_queue_event(&mut self) { + self.metrics.rx_queue_event_count.inc(); + self.readiness.mark_rx_ready(); + + if let Err(err) = self.queue_evts[RX_INDEX].read() { + // rate limiters present but with _very high_ allowed rate + error!("Failed to get rx queue event: {:?}", err); + self.metrics.event_fails.inc(); + return; + } + + if self.rx_rate_limiter.is_blocked() { self.metrics.rx_rate_limiter_throttled.inc(); - return false; + return; + } + + // If the limiter is not blocked, handle rx. + self.process_rx() + .unwrap_or_else(|err| report_net_event_fail(&self.metrics, err)); + } + + pub fn process_tap_rx_event(&mut self) { + self.metrics.rx_tap_event_count.inc(); + self.readiness.mark_tap_ready(); + + // While limiter is blocked, don't process any more incoming. + if self.rx_rate_limiter.is_blocked() { + self.metrics.rx_rate_limiter_throttled.inc(); + return; + } + + self.process_rx() + .unwrap_or_else(|err| report_net_event_fail(&self.metrics, err)); + } + + // Caller should check rate limiter before calling this function. + fn process_rx(&mut self) -> Result<(), DeviceError> { + let rx_queue = &mut self.queues[RX_INDEX]; + let mut notify_guest = false; + let mut tried_pop = false; + let mut poped_any = false; + + // Read from MMDS first. + if self.readiness.rx_mmds_pre_check() { + if let Some(ns) = self.mmds_ns.as_mut() { + // This is safe since we checked in the event handler that the device is activated. + let mem = self.device_state.mem().unwrap(); + 'read: loop { + let rx_frame_buf_size = match self.rx_mmds_read { + Some(len) => len.get(), + None => { + // Read from mmds to the rx_frame_buf if not exist. + // Safe to unwrap because we know rx_frame_buf size is bigger than + // vnet_hdr_len + if let Some(len) = ns.write_next_frame( + frame_bytes_from_buf_mut(&mut self.rx_mmds_buf).unwrap(), + ) { + let len = len.get(); + METRICS.mmds.tx_frames.inc(); + METRICS.mmds.tx_bytes.add(len as u64); + init_vnet_hdr(&mut self.rx_mmds_buf); + let size = vnet_hdr_len() + len; + self.rx_mmds_read = + // SAFETY: `size`` is at least vnet_hdr_len() which is 12 so it is safe. + Some(unsafe { NonZeroUsize::new_unchecked(size) }); + size + } else { + // When no mmds frame to send, clear mmds readiness and leave. + self.readiness.clear_mmds_ready(); + break; + } + } + }; + + // Do our best to send mmds frame to guest. + let max_iterations = rx_queue.actual_size(); + tried_pop = true; + for _ in 0..max_iterations { + // Pop a descriptor chain from the rx queue. + let Some(head) = rx_queue.pop_or_enable_notification(mem) else { + // Queue is empty now, clear readiness and leave. + self.readiness.clear_rx_ready(); + break 'read; + }; + + poped_any = true; + // Copy mmds to descriptor chain and write. + let head_index = head.index; + let result = Self::write_to_descriptor_chain( + mem, + &self.rx_mmds_buf[..rx_frame_buf_size], + head, + &self.metrics, + ); + + // Mark the descriptor chain as used. If an error occurred, skip the + // descriptor chain. + let used_len = if result.is_err() { + self.metrics.rx_fails.inc(); + 0 + } else { + self.rx_mmds_read = None; + u32::try_from(rx_frame_buf_size).unwrap() + }; + rx_queue + .add_used(mem, head_index, used_len) + .map_err(DeviceError::QueueError)?; + notify_guest = true; + if result.is_ok() { + continue 'read; + } + } + // Give up sending as enough retry. + self.rx_mmds_read = None; + } + } + } + + // Read from tap. + if self.readiness.rx_tap_pre_check() { + let mem = self.device_state.mem().unwrap(); + loop { + // Check rate limiter. + if self.rx_rate_limiter.is_blocked() { + self.metrics.rx_rate_limiter_throttled.inc(); + break; + } + + // Pop a descriptor chain from the rx queue. + tried_pop = true; + let Some(head) = rx_queue.pop_or_enable_notification(mem) else { + // Queue is empty now, clear readiness and leave. + self.readiness.clear_rx_ready(); + break; + }; + self.rx_rate_limiter.consume(1, TokenType::Ops); + + poped_any = true; + // Read from tap to descriptor chain directly. + // First write the descriptor chain to IoVecBufferMut. + let head_index = head.index; + + macro_rules! push_desc { + ($n:expr) => { + rx_queue + .add_used(mem, head_index, $n) + .map_err(DeviceError::QueueError)?; + notify_guest = true; + }; + } + + // SAFETY: we only use the single buffer for the tap read. + if unsafe { + self.rx_buffer + .load_descriptor_chain(head, Some(u32::try_from(MAX_BUFFER_SIZE).unwrap())) + .is_err() + } { + self.metrics.rx_fails.inc(); + // It's the desc chain's fault, not the tap's, so skip it. + push_desc!(0); + continue; + }; + + // Put a one-byte canary at the end of the iovecs to detect partial writes. + let prev_len = self.rx_buffer.len(); + let mut _canary: u8 = 0; + // This unwrap is safe because the const MAX_BUFFER_SIZE is small enough. + if prev_len < u32::try_from(MAX_BUFFER_SIZE).unwrap() { + // SAFETY: the pointer and length is valid. + // SAFETY: we have checked the length of the buffer and it cannot fail. + unsafe { + self.rx_buffer + .push(libc::iovec { + iov_base: (&mut _canary as *mut u8).cast(), + iov_len: 1, + }) + .unwrap_unchecked(); + }; + } + + match Self::read_tap(&mut self.tap, &mut self.rx_buffer) { + Ok(n) => { + // TODO: set vnet_hdr num_buffers to 1. + let mut n = u32::try_from(n).unwrap(); + if n > prev_len { + // The canary should have been overwritten by the tap. + self.metrics.rx_partial_writes.inc(); + n = prev_len; + } else { + self.metrics.rx_count.inc(); + } + + push_desc!(n); + // Consume rate limiter, but no matter pass or not, let the data go. + // It may cause rate limiter inaccuracy, but it's better than blocking. + self.rx_rate_limiter.consume(u64::from(n), TokenType::Bytes); + self.metrics.rx_bytes_count.add(u64::from(n)); + self.metrics.rx_packets_count.inc(); + } + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + // If the tap device is not readable, stop reading. + self.readiness.clear_tap_ready(); + rx_queue.undo_pop(); + break; + } + Err(err) => { + error!("Failed to read tap: {:?}", err); + self.metrics.tap_read_fails.inc(); + // Tap read failed unexpected, stop reading. + push_desc!(0); + break; + } + } + } + } + + if notify_guest { + self.try_signal_queue(NetQueue::Rx)?; + } + if tried_pop && !poped_any { + self.metrics.no_rx_avail_buffer.inc(); + } + Ok(()) + } + + pub fn process_tx_rate_limiter_event(&mut self) { + self.metrics.tx_rate_limiter_event_count.inc(); + + // Upon rate limiter event, call the rate limiter handler + // and restart processing the queue. + if let Err(err) = self.tx_rate_limiter.event_handler() { + error!("Failed to get tx rate-limiter event: {:?}", err); + self.metrics.event_fails.inc(); + return; + } + + // There might be enough budget now to send the frame. + self.process_tx() + .unwrap_or_else(|err| report_net_event_fail(&self.metrics, err)); + } + + /// Process a single TX queue event. + /// + /// This is called by the event manager responding to the guest adding a new + /// buffer in the TX queue. + pub fn process_tx_queue_event(&mut self) { + self.metrics.tx_queue_event_count.inc(); + self.readiness.mark_tx_ready(); + + if let Err(err) = self.queue_evts[TX_INDEX].read() { + error!("Failed to get tx queue event: {:?}", err); + self.metrics.event_fails.inc(); + return; } - // Attempt frame delivery. - let success = self.write_frame_to_guest(); + if self.tx_rate_limiter.is_blocked() { + self.metrics.tx_rate_limiter_throttled.inc(); + return; + } + + // If the limiter is not blocked, continue transmitting bytes. + self.process_tx() + .unwrap_or_else(|err| report_net_event_fail(&self.metrics, err)); + } + + fn process_tx(&mut self) -> Result<(), DeviceError> { + if !self.readiness.tx_ready() { + return Ok(()); + } + + // This is safe since we checked in the event handler that the device is activated. + let mem = self.device_state.mem().unwrap(); + + // The MMDS network stack works like a state machine, based on synchronous calls, and + // without being added to any event loop. If any frame is accepted by the MMDS, we also + // trigger a process_rx() which checks if there are any new frames to be sent, starting + // with the MMDS network stack. + let mut process_rx_for_mmds = false; + let mut used_any = false; + let tx_queue = &mut self.queues[TX_INDEX]; + + loop { + let Some(head) = tx_queue.pop_or_enable_notification(mem) else { + // Queue is empty now, clear readiness and leave. + self.readiness.clear_tx_ready(); + break; + }; + self.metrics + .tx_remaining_reqs_count + .add(tx_queue.len(mem).into()); + let head_index = head.index; + + macro_rules! push_desc { + () => { + tx_queue + .add_used(mem, head_index, 0) + .map_err(DeviceError::QueueError)?; + used_any = true; + }; + } + + // Parse IoVecBuffer from descriptor head + // SAFETY: This descriptor chain is only loaded once + // virtio requests are handled sequentially so no two IoVecBuffers + // are live at the same time, meaning this has exclusive ownership over the memory + if unsafe { self.tx_buffer.load_descriptor_chain(head).is_err() } { + self.metrics.tx_fails.inc(); + push_desc!(); + continue; + }; + + // We only handle frames that are up to MAX_BUFFER_SIZE + if self.tx_buffer.len() as usize > MAX_BUFFER_SIZE { + error!("net: received too big frame from driver"); + self.metrics.tx_malformed_frames.inc(); + push_desc!(); + continue; + } + + if !Self::rate_limiter_consume_op( + &mut self.tx_rate_limiter, + u64::from(self.tx_buffer.len()), + ) { + tx_queue.undo_pop(); + self.metrics.tx_rate_limiter_throttled.inc(); + break; + } + + let frame_consumed_by_mmds = Self::write_to_mmds_or_tap( + self.mmds_ns.as_mut(), + &mut self.tx_rate_limiter, + &mut self.tx_frame_headers, + &self.tx_buffer, + &mut self.tap, + self.guest_mac, + &self.metrics, + ) + .unwrap_or(false); + if frame_consumed_by_mmds { + // MMDS consumed this frame/request, let's also try to process the response. + process_rx_for_mmds = true; + self.readiness.mark_mmds_ready(); + } + push_desc!(); + } - // Undo the tokens consumption if guest delivery failed. - if !success { - // revert the rate limiting budget consumption - Self::rate_limiter_replenish_op(&mut self.rx_rate_limiter, self.rx_bytes_read as u64); + // Cleanup tx_buffer to ensure no two buffers point at the same memory. + self.tx_buffer.clear(); + if used_any { + self.try_signal_queue(NetQueue::Tx)?; + } else { + self.metrics.no_tx_avail_buffer.inc(); } - success + // Write to mmds can be seen as a event source, so we need to process rx after it. + if process_rx_for_mmds { + self.process_rx()?; + } + Ok(()) } /// Write a slice in a descriptor chain @@ -383,60 +790,6 @@ impl Net { Err(FrontendError::DescriptorChainTooSmall) } - // Copies a single frame from `self.rx_frame_buf` into the guest. - fn do_write_frame_to_guest(&mut self) -> Result<(), FrontendError> { - // This is safe since we checked in the event handler that the device is activated. - let mem = self.device_state.mem().unwrap(); - - let queue = &mut self.queues[RX_INDEX]; - let head_descriptor = queue.pop_or_enable_notification(mem).ok_or_else(|| { - self.metrics.no_rx_avail_buffer.inc(); - FrontendError::EmptyQueue - })?; - let head_index = head_descriptor.index; - - let result = Self::write_to_descriptor_chain( - mem, - &self.rx_frame_buf[..self.rx_bytes_read], - head_descriptor, - &self.metrics, - ); - // Mark the descriptor chain as used. If an error occurred, skip the descriptor chain. - let used_len = if result.is_err() { - self.metrics.rx_fails.inc(); - 0 - } else { - // Safe to unwrap because a frame must be smaller than 2^16 bytes. - u32::try_from(self.rx_bytes_read).unwrap() - }; - queue.add_used(mem, head_index, used_len).map_err(|err| { - error!("Failed to add available descriptor {}: {}", head_index, err); - FrontendError::AddUsed - })?; - - result - } - - // Copies a single frame from `self.rx_frame_buf` into the guest. In case of an error retries - // the operation if possible. Returns true if the operation was successfull. - fn write_frame_to_guest(&mut self) -> bool { - let max_iterations = self.queues[RX_INDEX].actual_size(); - for _ in 0..max_iterations { - match self.do_write_frame_to_guest() { - Ok(()) => return true, - Err(FrontendError::EmptyQueue) | Err(FrontendError::AddUsed) => { - return false; - } - Err(_) => { - // retry - continue; - } - } - } - - false - } - // Tries to detour the frame to MMDS and if MMDS doesn't accept it, sends it on the host TAP. // // Returns whether MMDS consumed the frame. @@ -511,162 +864,6 @@ impl Net { Ok(false) } - // We currently prioritize packets from the MMDS over regular network packets. - fn read_from_mmds_or_tap(&mut self) -> Result { - if let Some(ns) = self.mmds_ns.as_mut() { - if let Some(len) = - ns.write_next_frame(frame_bytes_from_buf_mut(&mut self.rx_frame_buf)?) - { - let len = len.get(); - METRICS.mmds.tx_frames.inc(); - METRICS.mmds.tx_bytes.add(len as u64); - init_vnet_hdr(&mut self.rx_frame_buf); - return Ok(vnet_hdr_len() + len); - } - } - - self.read_tap().map_err(NetError::IO) - } - - fn process_rx(&mut self) -> Result<(), DeviceError> { - // Read as many frames as possible. - loop { - match self.read_from_mmds_or_tap() { - Ok(count) => { - self.rx_bytes_read = count; - self.metrics.rx_count.inc(); - if !self.rate_limited_rx_single_frame() { - self.rx_deferred_frame = true; - break; - } - } - Err(NetError::IO(err)) => { - // The tap device is non-blocking, so any error aside from EAGAIN is - // unexpected. - match err.raw_os_error() { - Some(err) if err == EAGAIN => (), - _ => { - error!("Failed to read tap: {:?}", err); - self.metrics.tap_read_fails.inc(); - return Err(DeviceError::FailedReadTap); - } - }; - break; - } - Err(err) => { - error!("Spurious error in network RX: {:?}", err); - } - } - } - - self.try_signal_queue(NetQueue::Rx) - } - - // Process the deferred frame first, then continue reading from tap. - fn handle_deferred_frame(&mut self) -> Result<(), DeviceError> { - if self.rate_limited_rx_single_frame() { - self.rx_deferred_frame = false; - // process_rx() was interrupted possibly before consuming all - // packets in the tap; try continuing now. - return self.process_rx(); - } - - self.try_signal_queue(NetQueue::Rx) - } - - fn resume_rx(&mut self) -> Result<(), DeviceError> { - if self.rx_deferred_frame { - self.handle_deferred_frame() - } else { - Ok(()) - } - } - - fn process_tx(&mut self) -> Result<(), DeviceError> { - // This is safe since we checked in the event handler that the device is activated. - let mem = self.device_state.mem().unwrap(); - - // The MMDS network stack works like a state machine, based on synchronous calls, and - // without being added to any event loop. If any frame is accepted by the MMDS, we also - // trigger a process_rx() which checks if there are any new frames to be sent, starting - // with the MMDS network stack. - let mut process_rx_for_mmds = false; - let mut used_any = false; - let tx_queue = &mut self.queues[TX_INDEX]; - - while let Some(head) = tx_queue.pop_or_enable_notification(mem) { - self.metrics - .tx_remaining_reqs_count - .add(tx_queue.len(mem).into()); - let head_index = head.index; - // Parse IoVecBuffer from descriptor head - // SAFETY: This descriptor chain is only loaded once - // virtio requests are handled sequentially so no two IoVecBuffers - // are live at the same time, meaning this has exclusive ownership over the memory - if unsafe { self.tx_buffer.load_descriptor_chain(head).is_err() } { - self.metrics.tx_fails.inc(); - tx_queue - .add_used(mem, head_index, 0) - .map_err(DeviceError::QueueError)?; - continue; - }; - - // We only handle frames that are up to MAX_BUFFER_SIZE - if self.tx_buffer.len() as usize > MAX_BUFFER_SIZE { - error!("net: received too big frame from driver"); - self.metrics.tx_malformed_frames.inc(); - tx_queue - .add_used(mem, head_index, 0) - .map_err(DeviceError::QueueError)?; - continue; - } - - if !Self::rate_limiter_consume_op( - &mut self.tx_rate_limiter, - u64::from(self.tx_buffer.len()), - ) { - tx_queue.undo_pop(); - self.metrics.tx_rate_limiter_throttled.inc(); - break; - } - - let frame_consumed_by_mmds = Self::write_to_mmds_or_tap( - self.mmds_ns.as_mut(), - &mut self.tx_rate_limiter, - &mut self.tx_frame_headers, - &self.tx_buffer, - &mut self.tap, - self.guest_mac, - &self.metrics, - ) - .unwrap_or(false); - if frame_consumed_by_mmds && !self.rx_deferred_frame { - // MMDS consumed this frame/request, let's also try to process the response. - process_rx_for_mmds = true; - } - - tx_queue - .add_used(mem, head_index, 0) - .map_err(DeviceError::QueueError)?; - used_any = true; - } - - if !used_any { - self.metrics.no_tx_avail_buffer.inc(); - } - - // Cleanup tx_buffer to ensure no two buffers point at the same memory - self.tx_buffer.clear(); - self.try_signal_queue(NetQueue::Tx)?; - - // An incoming frame for the MMDS may trigger the transmission of a new message. - if process_rx_for_mmds { - self.process_rx() - } else { - Ok(()) - } - } - /// Builds the offload features we will setup on the TAP device based on the features that the /// guest supports. fn build_tap_offload_features(guest_supported_features: u64) -> u32 { @@ -719,125 +916,19 @@ impl Net { self.tx_rate_limiter.update_buckets(tx_bytes, tx_ops); } - #[cfg(not(test))] - fn read_tap(&mut self) -> std::io::Result { - self.tap.read(&mut self.rx_frame_buf) - } - #[cfg(not(test))] fn write_tap(tap: &mut Tap, buf: &IoVecBuffer) -> std::io::Result { tap.write_iovec(buf) } - /// Process a single RX queue event. - /// - /// This is called by the event manager responding to the guest adding a new - /// buffer in the RX queue. - pub fn process_rx_queue_event(&mut self) { - self.metrics.rx_queue_event_count.inc(); - - if let Err(err) = self.queue_evts[RX_INDEX].read() { - // rate limiters present but with _very high_ allowed rate - error!("Failed to get rx queue event: {:?}", err); - self.metrics.event_fails.inc(); - } else if self.rx_rate_limiter.is_blocked() { - self.metrics.rx_rate_limiter_throttled.inc(); - } else { - // If the limiter is not blocked, resume the receiving of bytes. - self.resume_rx() - .unwrap_or_else(|err| report_net_event_fail(&self.metrics, err)); - } - } - - pub fn process_tap_rx_event(&mut self) { - // This is safe since we checked in the event handler that the device is activated. - let mem = self.device_state.mem().unwrap(); - self.metrics.rx_tap_event_count.inc(); - - // While there are no available RX queue buffers and there's a deferred_frame - // don't process any more incoming. Otherwise start processing a frame. In the - // process the deferred_frame flag will be set in order to avoid freezing the - // RX queue. - if self.queues[RX_INDEX].is_empty(mem) && self.rx_deferred_frame { - self.metrics.no_rx_avail_buffer.inc(); - return; - } - - // While limiter is blocked, don't process any more incoming. - if self.rx_rate_limiter.is_blocked() { - self.metrics.rx_rate_limiter_throttled.inc(); - return; - } - - if self.rx_deferred_frame - // Process a deferred frame first if available. Don't read from tap again - // until we manage to receive this deferred frame. - { - self.handle_deferred_frame() - .unwrap_or_else(|err| report_net_event_fail(&self.metrics, err)); - } else { - self.process_rx() - .unwrap_or_else(|err| report_net_event_fail(&self.metrics, err)); - } - } - - /// Process a single TX queue event. - /// - /// This is called by the event manager responding to the guest adding a new - /// buffer in the TX queue. - pub fn process_tx_queue_event(&mut self) { - self.metrics.tx_queue_event_count.inc(); - if let Err(err) = self.queue_evts[TX_INDEX].read() { - error!("Failed to get tx queue event: {:?}", err); - self.metrics.event_fails.inc(); - } else if !self.tx_rate_limiter.is_blocked() - // If the limiter is not blocked, continue transmitting bytes. - { - self.process_tx() - .unwrap_or_else(|err| report_net_event_fail(&self.metrics, err)); - } else { - self.metrics.tx_rate_limiter_throttled.inc(); - } - } - - pub fn process_rx_rate_limiter_event(&mut self) { - self.metrics.rx_event_rate_limiter_count.inc(); - // Upon rate limiter event, call the rate limiter handler - // and restart processing the queue. - - match self.rx_rate_limiter.event_handler() { - Ok(_) => { - // There might be enough budget now to receive the frame. - self.resume_rx() - .unwrap_or_else(|err| report_net_event_fail(&self.metrics, err)); - } - Err(err) => { - error!("Failed to get rx rate-limiter event: {:?}", err); - self.metrics.event_fails.inc(); - } - } - } - - pub fn process_tx_rate_limiter_event(&mut self) { - self.metrics.tx_rate_limiter_event_count.inc(); - // Upon rate limiter event, call the rate limiter handler - // and restart processing the queue. - match self.tx_rate_limiter.event_handler() { - Ok(_) => { - // There might be enough budget now to send the frame. - self.process_tx() - .unwrap_or_else(|err| report_net_event_fail(&self.metrics, err)); - } - Err(err) => { - error!("Failed to get tx rate-limiter event: {:?}", err); - self.metrics.event_fails.inc(); - } - } + #[cfg(not(test))] + fn read_tap(tap: &mut Tap, buf: &mut IoVecBufferMut) -> std::io::Result { + tap.read_iovec(buf) } /// Process device virtio queue(s). pub fn process_virtio_queues(&mut self) { - let _ = self.resume_rx(); + let _ = self.process_rx(); let _ = self.process_tx(); } } @@ -931,7 +1022,6 @@ impl VirtioDevice for Net { #[cfg(test)] #[macro_use] pub mod tests { - use std::io::Read; use std::net::Ipv4Addr; use std::str::FromStr; use std::time::Duration; @@ -961,17 +1051,17 @@ pub mod tests { use crate::vstate::memory::{Address, GuestMemory}; impl Net { - pub(crate) fn read_tap(&mut self) -> io::Result { - match &self.tap.mocks.read_tap { + pub(crate) fn read_tap(tap: &mut Tap, buf: &mut IoVecBufferMut) -> io::Result { + match &tap.mocks.read_tap { ReadTapMock::MockFrame(frame) => { - self.rx_frame_buf[..frame.len()].copy_from_slice(frame); + buf.write_all_volatile_at(frame, 0).unwrap(); Ok(frame.len()) } ReadTapMock::Failure => Err(io::Error::new( io::ErrorKind::Other, "Read tap synthetically failed.", )), - ReadTapMock::TapFrame => self.tap.read(&mut self.rx_frame_buf), + ReadTapMock::TapFrame => tap.read_iovec(buf), } } @@ -1170,9 +1260,8 @@ pub mod tests { (2, 1000, VIRTQ_DESC_F_WRITE), ], ); - let frame = th.check_rx_deferred_frame(1000); + let frame = th.check_rx_frame(1000, 0); th.rxq.check_used_elem(0, 0, 0); - th.check_rx_queue_resume(&frame); } @@ -1182,10 +1271,10 @@ pub mod tests { th.activate_net(); th.add_desc_chain(NetQueue::Rx, 0, &[(0, 100, VIRTQ_DESC_F_WRITE)]); - let frame = th.check_rx_deferred_frame(1000); - th.rxq.check_used_elem(0, 0, 0); - - th.check_rx_queue_resume(&frame); + let frame = th.check_rx_frame(1000, 1); + // TAP may truncate the frame to the descriptor size. + th.rxq.check_used_elem(0, 0, 100); + th.rxq.dtable[0].check_data(&frame[..100]); } #[test] @@ -1205,7 +1294,7 @@ pub mod tests { (2, 4096, VIRTQ_DESC_F_WRITE), ], ); - let frame = th.check_rx_deferred_frame(1000); + let frame = th.check_rx_frame(1000, 0); th.rxq.check_used_elem(0, 0, 0); th.check_rx_queue_resume(&frame); @@ -1227,7 +1316,7 @@ pub mod tests { (2, 1000, VIRTQ_DESC_F_WRITE), ], ); - // Add invalid descriptor chain - too short. + // Add invalid descriptor chain - too short(but will be used). th.add_desc_chain(NetQueue::Rx, 1200, &[(3, 100, VIRTQ_DESC_F_WRITE)]); // Add invalid descriptor chain - invalid memory offset. th.add_desc_chain( @@ -1240,10 +1329,11 @@ pub mod tests { th.add_desc_chain(NetQueue::Rx, 1300, &[(5, 1000, VIRTQ_DESC_F_WRITE)]); // Inject frame to tap and run epoll. - let frame = inject_tap_tx_frame(&th.net(), 1000); + let frame1 = inject_tap_tx_frame(&th.net(), 1000); + let frame2 = inject_tap_tx_frame(&th.net(), 1000); check_metric_after_block!( th.net().metrics.rx_packets_count, - 1, + 2, th.event_manager.run_with_timeout(100).unwrap() ); @@ -1252,14 +1342,13 @@ pub mod tests { assert!(&th.net().irq_trigger.has_pending_irq(IrqType::Vring)); // Check that the invalid descriptor chains have been discarded th.rxq.check_used_elem(0, 0, 0); - th.rxq.check_used_elem(1, 3, 0); + th.rxq.check_used_elem(1, 3, 100); + th.rxq.dtable[3].check_data(&frame1[..100]); th.rxq.check_used_elem(2, 4, 0); - // Check that the frame wasn't deferred. - assert!(!th.net().rx_deferred_frame); // Check that the frame has been written successfully to the valid Rx descriptor chain. th.rxq - .check_used_elem(3, 5, frame.len().try_into().unwrap()); - th.rxq.dtable[5].check_data(&frame); + .check_used_elem(3, 5, frame2.len().try_into().unwrap()); + th.rxq.dtable[5].check_data(&frame2); } #[test] @@ -1288,8 +1377,6 @@ pub mod tests { th.event_manager.run_with_timeout(100).unwrap() ); - // Check that the frame wasn't deferred. - assert!(!th.net().rx_deferred_frame); // Check that the used queue has advanced. assert_eq!(th.rxq.used.idx.get(), 1); assert!(&th.net().irq_trigger.has_pending_irq(IrqType::Vring)); @@ -1328,8 +1415,6 @@ pub mod tests { th.event_manager.run_with_timeout(100).unwrap() ); - // Check that the frames weren't deferred. - assert!(!th.net().rx_deferred_frame); // Check that the used queue has advanced. assert_eq!(th.rxq.used.idx.get(), 2); assert!(&th.net().irq_trigger.has_pending_irq(IrqType::Vring)); @@ -1645,13 +1730,6 @@ pub mod tests { ) .unwrap()) ); - - // Validate that MMDS has a response and we can retrieve it. - check_metric_after_block!( - &METRICS.mmds.tx_frames, - 1, - net.read_from_mmds_or_tap().unwrap() - ); } #[test] @@ -1730,21 +1808,16 @@ pub mod tests { th.activate_net(); th.net().tap.mocks.set_read_tap(ReadTapMock::Failure); - // The RX queue is empty and rx_deffered_frame is set. - th.net().rx_deferred_frame = true; check_metric_after_block!( th.net().metrics.no_rx_avail_buffer, 1, th.simulate_event(NetEvent::Tap) ); - // We need to set this here to false, otherwise the device will try to - // handle a deferred frame, it will fail and will never try to read from - // the tap. - th.net().rx_deferred_frame = false; - // Fake an avail buffer; this time, tap reading should error out. - th.rxq.avail.idx.set(1); + th.add_desc_chain(NetQueue::Rx, 0, &[(0, 4096, VIRTQ_DESC_F_WRITE)]); + // Mark rx ready otherwise would not read tap. + th.net().readiness.mark_rx_ready(); check_metric_after_block!( th.net().metrics.tap_read_fails, 1, @@ -1752,59 +1825,6 @@ pub mod tests { ); } - #[test] - fn test_deferred_frame() { - let mut th = TestHelper::get_default(); - th.activate_net(); - th.net().tap.mocks.set_read_tap(ReadTapMock::TapFrame); - - let rx_packets_count = th.net().metrics.rx_packets_count.count(); - let _ = inject_tap_tx_frame(&th.net(), 1000); - // Trigger a Tap event that. This should fail since there - // are not any available descriptors in the queue - check_metric_after_block!( - th.net().metrics.no_rx_avail_buffer, - 1, - th.simulate_event(NetEvent::Tap) - ); - // The frame we read from the tap should be deferred now and - // no frames should have been transmitted - assert!(th.net().rx_deferred_frame); - assert_eq!(th.net().metrics.rx_packets_count.count(), rx_packets_count); - - // Let's add a second frame, which should really have the same - // fate. - let _ = inject_tap_tx_frame(&th.net(), 1000); - - // Adding a descriptor in the queue. This should handle the first deferred - // frame. However, this should try to handle the second tap as well and fail - // since there's only one Descriptor Chain in the queue. - th.add_desc_chain(NetQueue::Rx, 0, &[(0, 4096, VIRTQ_DESC_F_WRITE)]); - check_metric_after_block!( - th.net().metrics.no_rx_avail_buffer, - 1, - th.simulate_event(NetEvent::Tap) - ); - // We should still have a deferred frame - assert!(th.net().rx_deferred_frame); - // However, we should have delivered the first frame - assert_eq!( - th.net().metrics.rx_packets_count.count(), - rx_packets_count + 1 - ); - - // Let's add one more descriptor and try to handle the last frame as well. - th.add_desc_chain(NetQueue::Rx, 0, &[(0, 4096, VIRTQ_DESC_F_WRITE)]); - check_metric_after_block!( - th.net().metrics.rx_packets_count, - 1, - th.simulate_event(NetEvent::RxQueue) - ); - - // We should be done with any deferred frame - assert!(!th.net().rx_deferred_frame); - } - #[test] fn test_rx_rate_limiter_handling() { let mut th = TestHelper::get_default(); @@ -1919,10 +1939,9 @@ pub mod tests { th.net().rx_rate_limiter = rl; // set up RX - assert!(!th.net().rx_deferred_frame); th.add_desc_chain(NetQueue::Rx, 0, &[(0, 4096, VIRTQ_DESC_F_WRITE)]); - // following RX procedure should fail because of bandwidth rate limiting + // following RX procedure should pass because of the rate limiter is not blocked { // trigger the RX handler th.simulate_event(NetEvent::Tap); @@ -1930,11 +1949,25 @@ pub mod tests { // assert that limiter is blocked assert!(th.net().rx_rate_limiter.is_blocked()); assert_eq!(th.net().metrics.rx_rate_limiter_throttled.count(), 1); - assert!(th.net().rx_deferred_frame); - // assert that no operation actually completed (limiter blocked it) + // assert that operation actually completed assert!(&th.net().irq_trigger.has_pending_irq(IrqType::Vring)); // make sure the data is still queued for processing - assert_eq!(th.rxq.used.idx.get(), 0); + assert_eq!(th.rxq.used.idx.get(), 1); + } + + th.add_desc_chain(NetQueue::Rx, 0, &[(0, 4096, VIRTQ_DESC_F_WRITE)]); + // following RX procedure should fail because of the rate limiter is blocked + { + // trigger the RX handler + th.simulate_event(NetEvent::Tap); + + // assert that limiter is blocked + assert!(th.net().rx_rate_limiter.is_blocked()); + assert_eq!(th.net().metrics.rx_rate_limiter_throttled.count(), 2); + // assert that operation actually not completed + assert!(!&th.net().irq_trigger.has_pending_irq(IrqType::Vring)); + // make sure the data is still queued for processing + assert_eq!(th.rxq.used.idx.get(), 1); } // An RX queue event should be throttled too @@ -1942,7 +1975,7 @@ pub mod tests { // trigger the RX queue event handler th.simulate_event(NetEvent::RxQueue); - assert_eq!(th.net().metrics.rx_rate_limiter_throttled.count(), 2); + assert_eq!(th.net().metrics.rx_rate_limiter_throttled.count(), 3); } // wait for 100ms to give the rate-limiter timer a chance to replenish @@ -1963,7 +1996,7 @@ pub mod tests { // make sure the virtio queue operation completed this time assert!(&th.net().irq_trigger.has_pending_irq(IrqType::Vring)); // make sure the data queue advanced - assert_eq!(th.rxq.used.idx.get(), 1); + assert_eq!(th.rxq.used.idx.get(), 2); th.rxq .check_used_elem(0, 0, frame.len().try_into().unwrap()); th.rxq.dtable[0].check_data(frame); @@ -2033,7 +2066,7 @@ pub mod tests { th.net().rx_rate_limiter = rl; // set up RX - assert!(!th.net().rx_deferred_frame); + th.add_desc_chain(NetQueue::Rx, 0, &[(0, 4096, VIRTQ_DESC_F_WRITE)]); th.add_desc_chain(NetQueue::Rx, 0, &[(0, 4096, VIRTQ_DESC_F_WRITE)]); // following RX procedure should fail because of ops rate limiting @@ -2048,18 +2081,17 @@ pub mod tests { // assert that limiter is blocked assert!(th.net().rx_rate_limiter.is_blocked()); assert!(th.net().metrics.rx_rate_limiter_throttled.count() >= 1); - assert!(th.net().rx_deferred_frame); // assert that no operation actually completed (limiter blocked it) assert!(&th.net().irq_trigger.has_pending_irq(IrqType::Vring)); - // make sure the data is still queued for processing - assert_eq!(th.rxq.used.idx.get(), 0); + // the data will be processed but only once + assert_eq!(th.rxq.used.idx.get(), 1); // trigger the RX handler again, this time it should do the limiter fast path exit th.simulate_event(NetEvent::Tap); // assert that no operation actually completed, that the limiter blocked it assert!(!&th.net().irq_trigger.has_pending_irq(IrqType::Vring)); // make sure the data is still queued for processing - assert_eq!(th.rxq.used.idx.get(), 0); + assert_eq!(th.rxq.used.idx.get(), 1); } // wait for 100ms to give the rate-limiter timer a chance to replenish @@ -2073,7 +2105,7 @@ pub mod tests { // make sure the virtio queue operation completed this time assert!(&th.net().irq_trigger.has_pending_irq(IrqType::Vring)); // make sure the data queue advanced - assert_eq!(th.rxq.used.idx.get(), 1); + assert_eq!(th.rxq.used.idx.get(), 2); th.rxq .check_used_elem(0, 0, frame.len().try_into().unwrap()); th.rxq.dtable[0].check_data(frame); diff --git a/src/vmm/src/devices/virtio/net/tap.rs b/src/vmm/src/devices/virtio/net/tap.rs index 9da0cbf5785..fb939252ce0 100644 --- a/src/vmm/src/devices/virtio/net/tap.rs +++ b/src/vmm/src/devices/virtio/net/tap.rs @@ -14,7 +14,7 @@ use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; use utils::ioctl::{ioctl_with_mut_ref, ioctl_with_ref, ioctl_with_val}; use utils::{ioctl_ioc_nr, ioctl_iow_nr}; -use crate::devices::virtio::iovec::IoVecBuffer; +use crate::devices::virtio::iovec::{IoVecBuffer, IoVecBufferMut}; use crate::devices::virtio::net::gen; #[cfg(test)] use crate::devices::virtio::net::test_utils::Mocks; @@ -185,8 +185,22 @@ impl Tap { Ok(()) } + /// Read an `IoVecBufferMut` from tap + pub(crate) fn read_iovec(&self, buffer: &mut IoVecBufferMut) -> Result { + let iovcnt = i32::try_from(buffer.iovec_count()).unwrap(); + let iov = buffer.as_iovec_ptr(); + + // SAFETY: `readv` is safe. Called with a valid tap fd, the iovec pointer and length + // is provide by the `IoVecBufferMut` implementation and we check the return value. + let ret = unsafe { libc::readv(self.tap_file.as_raw_fd(), iov, iovcnt) }; + if ret == -1 { + return Err(IoError::last_os_error()); + } + Ok(usize::try_from(ret).unwrap()) + } + /// Write an `IoVecBuffer` to tap - pub(crate) fn write_iovec(&mut self, buffer: &IoVecBuffer) -> Result { + pub(crate) fn write_iovec(&self, buffer: &IoVecBuffer) -> Result { let iovcnt = i32::try_from(buffer.iovec_count()).unwrap(); let iov = buffer.as_iovec_ptr(); @@ -323,6 +337,28 @@ pub mod tests { ); } + #[test] + fn test_read_iovec() { + let tap = Tap::open_named("").unwrap(); + enable(&tap); + let tap_traffic_simulator = TapTrafficSimulator::new(if_index(&tap)); + + let packet = utils::rand::rand_alphanumerics(PAYLOAD_SIZE); + tap_traffic_simulator.push_tx_packet(packet.as_bytes()); + + let mut fragment1 = [0_u8; VNET_HDR_SIZE]; + let mut fragment2 = [0_u8; PAYLOAD_SIZE]; + + let mut scattered = + IoVecBufferMut::from(vec![fragment1.as_mut_slice(), fragment2.as_mut_slice()]); + + assert_eq!( + tap.read_iovec(&mut scattered).unwrap(), + PAYLOAD_SIZE + VNET_HDR_SIZE + ); + assert_eq!(fragment2, packet.as_bytes()); + } + #[test] fn test_write() { let mut tap = Tap::open_named("").unwrap(); @@ -345,7 +381,7 @@ pub mod tests { #[test] fn test_write_iovec() { - let mut tap = Tap::open_named("").unwrap(); + let tap = Tap::open_named("").unwrap(); enable(&tap); let tap_traffic_simulator = TapTrafficSimulator::new(if_index(&tap)); diff --git a/src/vmm/src/devices/virtio/net/test_utils.rs b/src/vmm/src/devices/virtio/net/test_utils.rs index 216db273859..72f527b0ac2 100644 --- a/src/vmm/src/devices/virtio/net/test_utils.rs +++ b/src/vmm/src/devices/virtio/net/test_utils.rs @@ -270,6 +270,13 @@ pub fn if_index(tap: &Tap) -> i32 { unsafe { ifreq.ifr_ifru.ifru_ivalue } } + +pub fn drain_tap(tap: &Tap) { + let mut buf = [0u8; 1024]; + // SAFETY: The call is safe since the parameters are valid. + while unsafe { libc::read(tap.as_raw_fd(), buf.as_mut_ptr().cast(), buf.len()) != -1 } {} +} + /// Enable the tap interface. pub fn enable(tap: &Tap) { // Disable IPv6 router advertisment requests @@ -294,6 +301,8 @@ pub fn enable(tap: &Tap) { ) .execute(&sock, c_ulong::from(super::gen::sockios::SIOCSIFFLAGS)) .unwrap(); + // Drain the initial packets that might be in the tap interface. + drain_tap(tap); } #[cfg(test)] @@ -485,8 +494,8 @@ pub mod test { event_fd.write(1).unwrap(); } - /// Generate a tap frame of `frame_len` and check that it is deferred - pub fn check_rx_deferred_frame(&mut self, frame_len: usize) -> Vec { + /// Generate a tap frame of `frame_len` and check that it is handled or discarded. + pub fn check_rx_frame(&mut self, frame_len: usize, rx_packets_count: u64) -> Vec { self.net().tap.mocks.set_read_tap(ReadTapMock::TapFrame); let used_idx = self.rxq.used.idx.get(); @@ -494,12 +503,10 @@ pub mod test { let frame = inject_tap_tx_frame(&self.net(), frame_len); check_metric_after_block!( self.net().metrics.rx_packets_count, - 0, + rx_packets_count, self.event_manager.run_with_timeout(100).unwrap() ); - // Check that the frame has been deferred. - assert!(self.net().rx_deferred_frame); - // Check that the descriptor chain has been discarded. + // Check that the descriptor chain has been used or discarded. assert_eq!(self.rxq.used.idx.get(), used_idx + 1); assert!(&self.net().irq_trigger.has_pending_irq(IrqType::Vring)); diff --git a/src/vmm/src/devices/virtio/queue.rs b/src/vmm/src/devices/virtio/queue.rs index 0fd6882d201..faf62bb947a 100644 --- a/src/vmm/src/devices/virtio/queue.rs +++ b/src/vmm/src/devices/virtio/queue.rs @@ -114,7 +114,7 @@ impl<'a, M: GuestMemory> DescriptorChain<'a, M> { let desc_head = desc_table.unchecked_add(u64::from(index) * 16); // These reads can't fail unless Guest memory is hopelessly broken. - let desc = match mem.read_obj::(desc_head) { + let desc = match mem.load_obj::(desc_head) { Ok(ret) => ret, Err(err) => { error!( @@ -176,6 +176,41 @@ impl<'a, M: GuestMemory> DescriptorChain<'a, M> { None } } + + /// Load the next descriptor in this descriptor chain. + /// If none is available, return None. + pub fn load_next_descriptor(&mut self) -> Option<()> { + if !self.has_next() { + return None; + } + if self.next >= self.queue_size { + return None; + } + + let desc_head = self.desc_table.unchecked_add(u64::from(self.next) * 16); + let desc = match self.mem.load_obj::(desc_head) { + Ok(ret) => ret, + Err(err) => { + error!( + "Failed to read virtio descriptor from memory at address {:#x}: {}", + desc_head.0, err + ); + return None; + } + }; + self.index = self.next; + self.addr = GuestAddress(desc.addr); + self.len = desc.len; + self.flags = desc.flags; + self.next = desc.next; + self.ttl -= 1; + + if !self.is_valid() { + return None; + } + + Some(()) + } } #[derive(Debug)] @@ -427,7 +462,7 @@ impl Queue { // and virtq rings, so it's safe to unwrap guest memory reads and to use unchecked // offsets. let desc_index: u16 = mem - .read_obj(self.avail_ring.unchecked_add(u64::from(index_offset))) + .load_obj(self.avail_ring.unchecked_add(u64::from(index_offset))) .unwrap(); DescriptorChain::checked_new(mem, self.desc_table, self.actual_size(), desc_index).map( @@ -511,7 +546,7 @@ impl Queue { // guest after device activation, so we can be certain that no change has // occurred since the last `self.is_valid()` check. let addr = self.avail_ring.unchecked_add(2); - Wrapping(mem.read_obj::(addr).unwrap()) + Wrapping(mem.load_obj::(addr).unwrap()) } /// Get the value of the used event field of the avail ring. @@ -524,7 +559,7 @@ impl Queue { .avail_ring .unchecked_add(u64::from(4 + 2 * self.actual_size())); - Wrapping(mem.read_obj::(used_event_addr).unwrap()) + Wrapping(mem.load_obj::(used_event_addr).unwrap()) } /// Helper method that writes to the `avail_event` field of the used ring. @@ -643,6 +678,28 @@ impl Queue { } } +trait MemBytesExt: GuestMemory { + /// Load a object `T` from GPA. + /// + /// Usually used for very small items. + #[inline(always)] + fn load_obj( + &self, + addr: GuestAddress, + ) -> Result>::E> { + if let Ok(s) = self.get_slice(addr, std::mem::size_of::()) { + let ptr = s.ptr_guard().as_ptr().cast::(); + if ptr.is_aligned() { + // SAFETY: We just checked that the slice is of the correct size and require it impl + // ByteValued, also, the pointer is aligned. + return Ok(unsafe { ptr.read_volatile() }); + } + } + self.read_obj::(addr) + } +} +impl MemBytesExt for T {} + #[cfg(kani)] #[allow(dead_code)] mod verification { @@ -1161,10 +1218,25 @@ mod tests { .used_ring .unchecked_add(u64::from(4 + 8 * self.actual_size())); - mem.read_obj::(avail_event_addr).unwrap() + mem.load_obj::(avail_event_addr).unwrap() } } + #[test] + fn test_load_obj() { + let m = &multi_region_mem(&[(GuestAddress(0), 0x10000), (GuestAddress(0x20000), 0x2000)]); + // normal write and read + m.write_obj::(0xdeadbeef, GuestAddress(0)).unwrap(); + assert_eq!(m.load_obj::(GuestAddress(0)).unwrap(), 0xdeadbeef); + // unaligned read + m.write_obj::(0xcafebabe, GuestAddress(1)).unwrap(); + assert_eq!(m.load_obj::(GuestAddress(1)).unwrap(), 0xcafebabe); + // read across regions + m.write_obj::(0xdeadbeef, GuestAddress(0x1fff)) + .unwrap(); + assert_eq!(m.load_obj::(GuestAddress(0x1fff)).unwrap(), 0xdeadbeef); + } + #[test] fn test_checked_new_descriptor_chain() { let m = &multi_region_mem(&[(GuestAddress(0), 0x10000), (GuestAddress(0x20000), 0x2000)]); diff --git a/src/vmm/src/devices/virtio/rng/device.rs b/src/vmm/src/devices/virtio/rng/device.rs index bb01ce5e44e..da39a07b6d3 100644 --- a/src/vmm/src/devices/virtio/rng/device.rs +++ b/src/vmm/src/devices/virtio/rng/device.rs @@ -132,7 +132,8 @@ impl Entropy { let index = desc.index; METRICS.entropy_event_count.inc(); - let bytes = match IoVecBufferMut::from_descriptor_chain(desc) { + // SAFETY: This descriptor chain is only loaded into one buffer. + let bytes = match unsafe { IoVecBufferMut::from_descriptor_chain(desc) } { Ok(mut iovec) => { debug!( "entropy: guest request for {} bytes of entropy", @@ -428,13 +429,15 @@ mod tests { // This should succeed, we just added two descriptors let desc = entropy_dev.queues_mut()[RNG_QUEUE].pop(&mem).unwrap(); assert!(matches!( - IoVecBufferMut::from_descriptor_chain(desc), + // SAFETY: This descriptor chain is only loaded into one buffer. + unsafe { IoVecBufferMut::from_descriptor_chain(desc) }, Err(crate::devices::virtio::iovec::IoVecError::ReadOnlyDescriptor) )); // This should succeed, we should have one more descriptor let desc = entropy_dev.queues_mut()[RNG_QUEUE].pop(&mem).unwrap(); - let mut iovec = IoVecBufferMut::from_descriptor_chain(desc).unwrap(); + // SAFETY: This descriptor chain is only loaded into one buffer. + let mut iovec = unsafe { IoVecBufferMut::from_descriptor_chain(desc).unwrap() }; entropy_dev.handle_one(&mut iovec).unwrap(); } diff --git a/src/vmm/src/devices/virtio/vsock/packet.rs b/src/vmm/src/devices/virtio/vsock/packet.rs index 952f8b1511e..ba8c4ab2e36 100644 --- a/src/vmm/src/devices/virtio/vsock/packet.rs +++ b/src/vmm/src/devices/virtio/vsock/packet.rs @@ -161,7 +161,8 @@ impl VsockPacket { /// Returns [`VsockError::DescChainTooShortForHeader`] if the descriptor chain's total buffer /// length is insufficient to hold the 44 byte vsock header pub fn from_rx_virtq_head(chain: DescriptorChain) -> Result { - let buffer = IoVecBufferMut::from_descriptor_chain(chain)?; + // SAFETY: This descriptor chain is only loaded into one buffer. + let buffer = unsafe { IoVecBufferMut::from_descriptor_chain(chain)? }; if buffer.len() < VSOCK_PKT_HDR_SIZE { return Err(VsockError::DescChainTooShortForHeader(buffer.len() as usize));