diff --git a/src/vmm/src/devices/virtio/net/tap.rs b/src/vmm/src/devices/virtio/net/tap.rs index c516705af31..a18460e1159 100644 --- a/src/vmm/src/devices/virtio/net/tap.rs +++ b/src/vmm/src/devices/virtio/net/tap.rs @@ -241,10 +241,17 @@ pub mod tests { generated::ifreq__bindgen_ty_1::default().ifrn_name.len() }); - // Empty name - The tap should be named "tap0" by default + // Empty name - The tap should be named by the kernel (e.g., "tap0", "tap1", etc.) let tap = Tap::open_named("").unwrap(); - assert_eq!(b"tap0\0\0\0\0\0\0\0\0\0\0\0\0", &tap.if_name); - assert_eq!("tap0", tap.if_name_as_str()); + let tap_name_str = tap.if_name_as_str(); + + // Check that it starts with "tap" and the remainder is numeric. + assert!( + Regex::new(r"^tap\d+$").unwrap().is_match(tap_name_str), + "Generated tap name '{}' does not match expected pattern", + tap_name_str + ); + // Test using '%d' to have the kernel assign an unused name, // and that we correctly copy back that generated name diff --git a/src/vmm/src/devices/virtio/rng/device.rs b/src/vmm/src/devices/virtio/rng/device.rs index 97ac8676e0a..08ef31567ec 100644 --- a/src/vmm/src/devices/virtio/rng/device.rs +++ b/src/vmm/src/devices/virtio/rng/device.rs @@ -9,7 +9,7 @@ use aws_lc_rs::rand; use vm_memory::GuestMemoryError; use vmm_sys_util::eventfd::EventFd; -use super::metrics::METRICS; +use super::metrics::EntropyMetricsPerDevice; use super::{RNG_NUM_QUEUES, RNG_QUEUE}; use crate::devices::DeviceError; use crate::devices::virtio::device::{DeviceState, IrqTrigger, IrqType, VirtioDevice}; @@ -113,6 +113,7 @@ impl Entropy { } fn handle_one(&mut self) -> Result { + let global = EntropyMetricsPerDevice::alloc("global".to_string()); // If guest provided us with an empty buffer just return directly if self.buffer.is_empty() { return Ok(0); @@ -120,7 +121,7 @@ impl Entropy { let mut rand_bytes = vec![0; self.buffer.len() as usize]; rand::fill(&mut rand_bytes).inspect_err(|_| { - METRICS.host_rng_fails.inc(); + global.host_rng_fails.inc(); })?; // It is ok to unwrap here. We are writing `iovec.len()` bytes at offset 0. @@ -129,12 +130,13 @@ impl Entropy { } fn process_entropy_queue(&mut self) { + let global = EntropyMetricsPerDevice::alloc("global".to_string()); let mut used_any = false; while let Some(desc) = self.queues[RNG_QUEUE].pop() { // This is safe since we checked in the event handler that the device is activated. let mem = self.device_state.mem().unwrap(); let index = desc.index; - METRICS.entropy_event_count.inc(); + global.entropy_event_count.inc(); // SAFETY: This descriptor chain points to a single `DescriptorChain` memory buffer, // no other `IoVecBufferMut` object points to the same `DescriptorChain` at the same @@ -151,20 +153,20 @@ impl Entropy { // to handle once we do have budget. if !self.rate_limit_request(u64::from(self.buffer.len())) { debug!("entropy: throttling entropy queue"); - METRICS.entropy_rate_limiter_throttled.inc(); + global.entropy_rate_limiter_throttled.inc(); self.queues[RNG_QUEUE].undo_pop(); break; } self.handle_one().unwrap_or_else(|err| { error!("entropy: {err}"); - METRICS.entropy_event_fails.inc(); + global.entropy_event_fails.inc(); 0 }) } Err(err) => { error!("entropy: Could not parse descriptor chain: {err}"); - METRICS.entropy_event_fails.inc(); + global.entropy_event_fails.inc(); 0 } }; @@ -172,12 +174,12 @@ impl Entropy { match self.queues[RNG_QUEUE].add_used(index, bytes) { Ok(_) => { used_any = true; - METRICS.entropy_bytes.add(bytes.into()); + global.entropy_bytes.add(bytes.into()); } Err(err) => { error!("entropy: Could not add used descriptor to queue: {err}"); Self::rate_limit_replenish_request(&mut self.rate_limiter, bytes.into()); - METRICS.entropy_event_fails.inc(); + global.entropy_event_fails.inc(); // If we are not able to add a buffer to the used queue, something // is probably seriously wrong, so just stop processing additional // buffers @@ -189,25 +191,26 @@ impl Entropy { if used_any { self.signal_used_queue().unwrap_or_else(|err| { error!("entropy: {err:?}"); - METRICS.entropy_event_fails.inc() + global.entropy_event_fails.inc() }); } } pub(crate) fn process_entropy_queue_event(&mut self) { + let global = EntropyMetricsPerDevice::alloc("global".to_string()); if let Err(err) = self.queue_events[RNG_QUEUE].read() { error!("Failed to read entropy queue event: {err}"); - METRICS.entropy_event_fails.inc(); + global.entropy_event_fails.inc(); } else if !self.rate_limiter.is_blocked() { // We are not throttled, handle the entropy queue self.process_entropy_queue(); } else { - METRICS.rate_limiter_event_count.inc(); + global.rate_limiter_event_count.inc(); } } pub(crate) fn process_rate_limiter_event(&mut self) { - METRICS.rate_limiter_event_count.inc(); + global.rate_limiter_event_count.inc(); match self.rate_limiter.event_handler() { Ok(_) => { // There might be enough budget now to process entropy requests. @@ -215,7 +218,7 @@ impl Entropy { } Err(err) => { error!("entropy: Failed to handle rate-limiter event: {err:?}"); - METRICS.entropy_event_fails.inc(); + global.entropy_event_fails.inc(); } } } @@ -291,13 +294,15 @@ impl VirtioDevice for Entropy { } fn activate(&mut self, mem: GuestMemoryMmap) -> Result<(), ActivateError> { + let global = EntropyMetricsPerDevice::alloc("global".to_string()); + for q in self.queues.iter_mut() { q.initialize(&mem) .map_err(ActivateError::QueueMemoryError)?; } self.activate_event.write(1).map_err(|_| { - METRICS.activate_fails.inc(); + global.activate_fails.inc(); ActivateError::EventFd })?; self.device_state = DeviceState::Activated(mem); @@ -454,6 +459,7 @@ mod tests { #[test] fn test_entropy_event() { + let global = EntropyMetricsPerDevice::alloc("global".to_string()); let mem = create_virtio_mem(); let mut th = VirtioTestHelper::::new(&mem, default_entropy()); @@ -462,29 +468,29 @@ mod tests { // Add a read-only descriptor (this should fail) th.add_desc_chain(RNG_QUEUE, 0, &[(0, 64, 0)]); - let entropy_event_fails = METRICS.entropy_event_fails.count(); - let entropy_event_count = METRICS.entropy_event_count.count(); - let entropy_bytes = METRICS.entropy_bytes.count(); - let host_rng_fails = METRICS.host_rng_fails.count(); + let entropy_event_fails = global.entropy_event_fails.count(); + let entropy_event_count = global.entropy_event_count.count(); + let entropy_bytes = global.entropy_bytes.count(); + let host_rng_fails = global.host_rng_fails.count(); assert_eq!(th.emulate_for_msec(100).unwrap(), 1); - assert_eq!(METRICS.entropy_event_fails.count(), entropy_event_fails + 1); - assert_eq!(METRICS.entropy_event_count.count(), entropy_event_count + 1); - assert_eq!(METRICS.entropy_bytes.count(), entropy_bytes); - assert_eq!(METRICS.host_rng_fails.count(), host_rng_fails); + assert_eq!(global.entropy_event_fails.count(), entropy_event_fails + 1); + assert_eq!(global.entropy_event_count.count(), entropy_event_count + 1); + assert_eq!(global.entropy_bytes.count(), entropy_bytes); + assert_eq!(global.host_rng_fails.count(), host_rng_fails); // Add two good descriptors th.add_desc_chain(RNG_QUEUE, 0, &[(1, 10, VIRTQ_DESC_F_WRITE)]); th.add_desc_chain(RNG_QUEUE, 100, &[(2, 20, VIRTQ_DESC_F_WRITE)]); - let entropy_event_fails = METRICS.entropy_event_fails.count(); - let entropy_event_count = METRICS.entropy_event_count.count(); - let entropy_bytes = METRICS.entropy_bytes.count(); - let host_rng_fails = METRICS.host_rng_fails.count(); + let entropy_event_fails = global.entropy_event_fails.count(); + let entropy_event_count = global.entropy_event_count.count(); + let entropy_bytes = global.entropy_bytes.count(); + let host_rng_fails = global.host_rng_fails.count(); assert_eq!(th.emulate_for_msec(100).unwrap(), 1); - assert_eq!(METRICS.entropy_event_fails.count(), entropy_event_fails); - assert_eq!(METRICS.entropy_event_count.count(), entropy_event_count + 2); - assert_eq!(METRICS.entropy_bytes.count(), entropy_bytes + 30); - assert_eq!(METRICS.host_rng_fails.count(), host_rng_fails); + assert_eq!(global.entropy_event_fails.count(), entropy_event_fails); + assert_eq!(global.entropy_event_count.count(), entropy_event_count + 2); + assert_eq!(global.entropy_bytes.count(), entropy_bytes + 30); + assert_eq!(global.host_rng_fails.count(), host_rng_fails); th.add_desc_chain( RNG_QUEUE, @@ -496,19 +502,20 @@ mod tests { ], ); - let entropy_event_fails = METRICS.entropy_event_fails.count(); - let entropy_event_count = METRICS.entropy_event_count.count(); - let entropy_bytes = METRICS.entropy_bytes.count(); - let host_rng_fails = METRICS.host_rng_fails.count(); + let entropy_event_fails = global.entropy_event_fails.count(); + let entropy_event_count = global.entropy_event_count.count(); + let entropy_bytes = global.entropy_bytes.count(); + let host_rng_fails = global.host_rng_fails.count(); assert_eq!(th.emulate_for_msec(100).unwrap(), 1); - assert_eq!(METRICS.entropy_event_fails.count(), entropy_event_fails); - assert_eq!(METRICS.entropy_event_count.count(), entropy_event_count + 1); - assert_eq!(METRICS.entropy_bytes.count(), entropy_bytes + 512); - assert_eq!(METRICS.host_rng_fails.count(), host_rng_fails); + assert_eq!(global.entropy_event_fails.count(), entropy_event_fails); + assert_eq!(global.entropy_event_count.count(), entropy_event_count + 1); + assert_eq!(global.entropy_bytes.count(), entropy_bytes + 512); + assert_eq!(global.host_rng_fails.count(), host_rng_fails); } #[test] fn test_bad_rate_limiter_event() { + let global = EntropyMetricsPerDevice::alloc("global".to_string()); let mem = create_virtio_mem(); let mut th = VirtioTestHelper::::new(&mem, default_entropy()); @@ -516,7 +523,7 @@ mod tests { let mut dev = th.device(); check_metric_after_block!( - &METRICS.entropy_event_fails, + &global.entropy_event_fails, 1, dev.process_rate_limiter_event() ); @@ -524,6 +531,7 @@ mod tests { #[test] fn test_bandwidth_rate_limiter() { + let global = EntropyMetricsPerDevice::alloc("global".to_string()); let mem = create_virtio_mem(); // Rate Limiter with 4000 bytes / sec allowance and no initial burst allowance let device = Entropy::new(RateLimiter::new(4000, 0, 1000, 0, 0, 0).unwrap()).unwrap(); @@ -535,7 +543,7 @@ mod tests { // buffer should be processed normally th.add_desc_chain(RNG_QUEUE, 0, &[(0, 4000, VIRTQ_DESC_F_WRITE)]); check_metric_after_block!( - METRICS.entropy_bytes, + global.entropy_bytes, 4000, th.device().process_entropy_queue() ); @@ -551,12 +559,12 @@ mod tests { th.add_desc_chain(RNG_QUEUE, 0, &[(0, 4000, VIRTQ_DESC_F_WRITE)]); th.add_desc_chain(RNG_QUEUE, 1, &[(1, 1000, VIRTQ_DESC_F_WRITE)]); check_metric_after_block!( - METRICS.entropy_bytes, + global.entropy_bytes, 4000, th.device().process_entropy_queue() ); check_metric_after_block!( - METRICS.entropy_rate_limiter_throttled, + global.entropy_rate_limiter_throttled, 1, th.device().process_entropy_queue() ); @@ -565,12 +573,13 @@ mod tests { // 250 msec should give enough time for replenishing 1000 bytes worth of tokens. // Give it an extra 100 ms just to be sure the timer event reaches us from the kernel. std::thread::sleep(Duration::from_millis(350)); - check_metric_after_block!(METRICS.entropy_bytes, 1000, th.emulate_for_msec(100)); + check_metric_after_block!(global.entropy_bytes, 1000, th.emulate_for_msec(100)); assert!(!th.device().rate_limiter().is_blocked()); } #[test] fn test_ops_rate_limiter() { + let global = EntropyMetricsPerDevice::alloc("global".to_string()); let mem = create_virtio_mem(); // Rate Limiter with unlimited bandwidth and allowance for 1 operation every 100 msec, // (10 ops/sec), without initial burst. @@ -583,7 +592,7 @@ mod tests { // so this should succeed. th.add_desc_chain(RNG_QUEUE, 0, &[(0, 4000, VIRTQ_DESC_F_WRITE)]); check_metric_after_block!( - METRICS.entropy_bytes, + global.entropy_bytes, 4000, th.device().process_entropy_queue() ); @@ -593,30 +602,30 @@ mod tests { std::thread::sleep(Duration::from_millis(1000)); // First one should succeed - let entropy_bytes = METRICS.entropy_bytes.count(); + let entropy_bytes = global.entropy_bytes.count(); th.add_desc_chain(RNG_QUEUE, 0, &[(0, 64, VIRTQ_DESC_F_WRITE)]); - check_metric_after_block!(METRICS.entropy_bytes, 64, th.emulate_for_msec(100)); - assert_eq!(METRICS.entropy_bytes.count(), entropy_bytes + 64); + check_metric_after_block!(global.entropy_bytes, 64, th.emulate_for_msec(100)); + assert_eq!(global.entropy_bytes.count(), entropy_bytes + 64); // The rate limiter is not blocked yet. assert!(!th.device().rate_limiter().is_blocked()); // But immediately asking another operation should block it because we have 1 op every 100 // msec. th.add_desc_chain(RNG_QUEUE, 0, &[(0, 64, VIRTQ_DESC_F_WRITE)]); check_metric_after_block!( - METRICS.entropy_rate_limiter_throttled, + global.entropy_rate_limiter_throttled, 1, th.emulate_for_msec(50) ); // Entropy bytes count should not have increased. - assert_eq!(METRICS.entropy_bytes.count(), entropy_bytes + 64); + assert_eq!(global.entropy_bytes.count(), entropy_bytes + 64); // After 100 msec (plus 50 msec for ensuring the event reaches us from the kernel), the // timer of the rate limiter should fire saying that there's now more tokens available check_metric_after_block!( - METRICS.rate_limiter_event_count, + global.rate_limiter_event_count, 1, th.emulate_for_msec(150) ); // The rate limiter event should have processed the pending buffer as well - assert_eq!(METRICS.entropy_bytes.count(), entropy_bytes + 128); + assert_eq!(global.entropy_bytes.count(), entropy_bytes + 128); } } diff --git a/src/vmm/src/devices/virtio/rng/metrics.rs b/src/vmm/src/devices/virtio/rng/metrics.rs index e02f5ce8cc4..d0af144fb6e 100644 --- a/src/vmm/src/devices/virtio/rng/metrics.rs +++ b/src/vmm/src/devices/virtio/rng/metrics.rs @@ -38,16 +38,63 @@ use serde::{Serialize, Serializer}; use crate::logger::SharedIncMetric; -/// Stores aggregated entropy metrics -pub(super) static METRICS: EntropyDeviceMetrics = EntropyDeviceMetrics::new(); +use std::sync::{Arc, RwLock}; +use std::collections::BTreeMap; -/// Called by METRICS.flush(), this function facilitates serialization of entropy device metrics. +/// This function facilitates aggregation and serialization of +/// per device vsock metrics. (Can also handle singular) pub fn flush_metrics(serializer: S) -> Result { - let mut seq = serializer.serialize_map(Some(1))?; - seq.serialize_entry("entropy", &METRICS)?; + let entropy_metrics = METRICS.read().unwrap(); + let metrics_len = entropy_metrics.metrics.len(); + // +1 to accomodate aggregate net metrics + let mut seq = serializer.serialize_map(Some(1 + metrics_len))?; + + let mut entropy_aggregated: EntropyDeviceMetrics = EntropyDeviceMetrics::default(); + + for (name, metrics) in entropy_metrics.metrics.iter() { + let devn = format!("entropy_{}", name); + // serialization will flush the metrics so aggregate before it. + let m: &EntropyDeviceMetrics = metrics; + entropy_aggregated.aggregate(m); + seq.serialize_entry(&devn, m)?; + } + seq.serialize_entry("entropy", &entropy_aggregated)?; seq.end() } +pub struct EntropyMetricsPerDevice { + pub metrics: BTreeMap> +} + +impl EntropyMetricsPerDevice { + /// Allocate `NetDeviceMetrics` for net device having + /// id `iface_id`. Also, allocate only if it doesn't + /// exist to avoid overwriting previously allocated data. + /// lock is always initialized so it is safe the unwrap + /// the lock without a check. + pub fn alloc(iface_id: String) -> Arc { + Arc::clone( + METRICS + .write() + .unwrap() + .metrics + .entry(iface_id) + .or_insert_with(|| Arc::new(EntropyDeviceMetrics::default())), + ) + } +} + +static METRICS: RwLock = RwLock::new(EntropyMetricsPerDevice { + metrics: { + let tree = BTreeMap::new(); + tree.insert( + "global".to_string(), + Arc::new(EntropyDeviceMetrics::default()), + ); + tree + }, +}); + #[derive(Debug, Serialize)] pub(super) struct EntropyDeviceMetrics { /// Number of device activation failures @@ -86,15 +133,166 @@ pub mod tests { use crate::logger::IncMetric; #[test] - fn test_entropy_dev_metrics() { - let entropy_metrics: EntropyDeviceMetrics = EntropyDeviceMetrics::new(); - let entropy_metrics_local: String = serde_json::to_string(&entropy_metrics).unwrap(); - // the 1st serialize flushes the metrics and resets values to 0 so that - // we can compare the values with local metrics. - serde_json::to_string(&METRICS).unwrap(); - let entropy_metrics_global: String = serde_json::to_string(&METRICS).unwrap(); - assert_eq!(entropy_metrics_local, entropy_metrics_global); - entropy_metrics.entropy_event_count.inc(); - assert_eq!(entropy_metrics.entropy_event_count.count(), 1); + fn test_rng_dev_metrics() { + drop(METRICS.read().unwrap()); + drop(METRICS.write().unwrap()); + + for i in 0..5 { + let devn: String = format!("entropy{}", i); + NetMetricsPerDevice::alloc(devn.clone()); + METRICS + .read() + .unwrap() + .metrics + .get(&devn) + .unwrap() + .activate_fails + .inc(); + METRICS + .read() + .unwrap() + .metrics + .get(&devn) + .unwrap() + .entropy_bytes + .add(10); + METRICS + .read() + .unwrap() + .metrics + .get(&devn) + .unwrap() + .host_rng_fails + .add(5); + } + + for i in 0..5 { + let devn: String = format!("entropy{}", i); + assert!( + METRICS + .read() + .unwrap() + .metrics + .get(&devn) + .unwrap() + .activate_fails + .count() + >= 1 + ); + assert!( + METRICS + .read() + .unwrap() + .metrics + .get(&devn) + .unwrap() + .entropy_bytes + .count() + >= 10 + ); + assert_eq!( + METRICS + .read() + .unwrap() + .metrics + .get(&devn) + .unwrap() + .host_rng_fails + .count(), + 5 + ); + } + } + + #[test] + fn test_single_rng_metrics() { + // Use eth0 so that we can check thread safety with the + // `test_net_dev_metrics` which also uses the same name. + let devn = "entropy0"; + + drop(METRICS.read().unwrap()); + drop(METRICS.write().unwrap()); + + NetMetricsPerDevice::alloc(String::from(devn)); + METRICS.read().unwrap().metrics.get(devn).unwrap(); + + METRICS + .read() + .unwrap() + .metrics + .get(devn) + .unwrap() + .activate_fails + .inc(); + assert!( + METRICS + .read() + .unwrap() + .metrics + .get(devn) + .unwrap() + .activate_fails + .count() + > 0, + "{}", + METRICS + .read() + .unwrap() + .metrics + .get(devn) + .unwrap() + .activate_fails + .count() + ); + // we expect only 2 tests (this and test_max_net_dev_metrics) + // to update activate_fails count for eth0. + assert!( + METRICS + .read() + .unwrap() + .metrics + .get(devn) + .unwrap() + .activate_fails + .count() + <= 2, + "{}", + METRICS + .read() + .unwrap() + .metrics + .get(devn) + .unwrap() + .activate_fails + .count() + ); + + METRICS + .read() + .unwrap() + .metrics + .get(devn) + .unwrap() + .activate_fails + .inc(); + METRICS + .read() + .unwrap() + .metrics + .get(devn) + .unwrap() + .entropy_bytes + .add(5); + assert!( + METRICS + .read() + .unwrap() + .metrics + .get(devn) + .unwrap() + .entropy_bytes + .count() + >= 5 + ); } } diff --git a/src/vmm/src/devices/virtio/vsock/csm/connection.rs b/src/vmm/src/devices/virtio/vsock/csm/connection.rs index c9bd5b2c0f7..b8efd8e04f3 100644 --- a/src/vmm/src/devices/virtio/vsock/csm/connection.rs +++ b/src/vmm/src/devices/virtio/vsock/csm/connection.rs @@ -95,6 +95,7 @@ use crate::devices::virtio::vsock::metrics::METRICS; use crate::devices::virtio::vsock::packet::{VsockPacketHeader, VsockPacketRx, VsockPacketTx}; use crate::logger::IncMetric; use crate::utils::wrap_usize_to_u32; +use crate::vmm_config::vsock::VsockSocketType; /// Trait that vsock connection backends need to implement. /// @@ -139,6 +140,9 @@ pub struct VsockConnection { /// Instant when this connection should be scheduled for immediate termination, due to some /// timeout condition having been fulfilled. expiry: Option, + /// Manages type of connection to determine whether to use S backend or buffer + socket_type: VsockSocketType, + seqpacket_buf: Option>, } impl VsockChannel for VsockConnection @@ -509,6 +513,7 @@ where local_port: u32, peer_port: u32, peer_buf_alloc: u32, + socket_type: VsockSocketType::Stream, ) -> Self { Self { local_cid, @@ -525,6 +530,12 @@ where last_fwd_cnt_to_peer: Wrapping(0), pending_rx: PendingRxSet::from(PendingRx::Response), expiry: None, + socket_type, + seqpacket_buf: if socket_type == VsockSocketType::SeqPacket { + Some(Vec::new()) + } else { + None + }, } } @@ -535,6 +546,7 @@ where peer_cid: u64, local_port: u32, peer_port: u32, + socket_type: VsockSocketType, ) -> Self { Self { local_cid, @@ -551,6 +563,12 @@ where last_fwd_cnt_to_peer: Wrapping(0), pending_rx: PendingRxSet::from(PendingRx::Request), expiry: None, + socket_type, + seqpacket_buf: if socket_type == VsockSocketType::SeqPacket { + Some(Vec::new()) + } else { + None + }, } } @@ -874,6 +892,7 @@ mod tests { handler_ctx.device.queues[TXQ_INDEX].pop().unwrap(), ) .unwrap(); + let socket_type = VsockSocketType::Stream; let conn = match conn_state { ConnState::PeerInit => VsockConnection::::new_peer_init( stream, @@ -882,9 +901,10 @@ mod tests { LOCAL_PORT, PEER_PORT, PEER_BUF_ALLOC, + socket_type, ), ConnState::LocalInit => VsockConnection::::new_local_init( - stream, LOCAL_CID, PEER_CID, LOCAL_PORT, PEER_PORT, + stream, LOCAL_CID, PEER_CID, LOCAL_PORT, PEER_PORT, socket_type, ), ConnState::Established => { let mut conn = VsockConnection::::new_peer_init( @@ -894,6 +914,7 @@ mod tests { LOCAL_PORT, PEER_PORT, PEER_BUF_ALLOC, + socket_type, ); assert!(conn.has_pending_rx()); conn.recv_pkt(&mut rx_pkt).unwrap(); diff --git a/src/vmm/src/devices/virtio/vsock/device.rs b/src/vmm/src/devices/virtio/vsock/device.rs index aa114f6cccb..3dfd7cc3236 100644 --- a/src/vmm/src/devices/virtio/vsock/device.rs +++ b/src/vmm/src/devices/virtio/vsock/device.rs @@ -34,7 +34,7 @@ use crate::devices::virtio::device::{DeviceState, IrqTrigger, IrqType, VirtioDev use crate::devices::virtio::generated::virtio_config::{VIRTIO_F_IN_ORDER, VIRTIO_F_VERSION_1}; use crate::devices::virtio::queue::Queue as VirtQueue; use crate::devices::virtio::vsock::VsockError; -use crate::devices::virtio::vsock::metrics::METRICS; +use crate::devices::virtio::vsock::metrics::VsockMetricsPerDevice; use crate::logger::IncMetric; use crate::utils::byte_order; use crate::vstate::memory::{Bytes, GuestMemoryMmap}; @@ -241,11 +241,12 @@ where // connections and the guest_cid configuration field is fetched again. Existing listen sockets // remain but their CID is updated to reflect the current guest_cid. pub fn send_transport_reset_event(&mut self) -> Result<(), DeviceError> { + let global = VsockMetricsPerDevice::alloc("global".to_string()); // This is safe since we checked in the caller function that the device is activated. let mem = self.device_state.mem().unwrap(); let head = self.queues[EVQ_INDEX].pop().ok_or_else(|| { - METRICS.ev_queue_event_fails.inc(); + global.ev_queue_event_fails.inc(); DeviceError::VsockError(VsockError::EmptyQueue) })?; @@ -301,6 +302,7 @@ where } fn read_config(&self, offset: u64, data: &mut [u8]) { + let global = VsockMetricsPerDevice::alloc("global".to_string()); match offset { 0 if data.len() == 8 => byte_order::write_le_u64(data, self.cid()), 0 if data.len() == 4 => { @@ -310,7 +312,7 @@ where byte_order::write_le_u32(data, ((self.cid() >> 32) & 0xffff_ffff) as u32) } _ => { - METRICS.cfg_fails.inc(); + global.cfg_fails.inc(); warn!( "vsock: virtio-vsock received invalid read request of {} bytes at offset {}", data.len(), @@ -321,7 +323,8 @@ where } fn write_config(&mut self, offset: u64, data: &[u8]) { - METRICS.cfg_fails.inc(); + let global = VsockMetricsPerDevice::alloc("global".to_string()); + global.cfg_fails.inc(); warn!( "vsock: guest driver attempted to write device config (offset={:#x}, len={:#x})", offset, @@ -330,13 +333,14 @@ where } fn activate(&mut self, mem: GuestMemoryMmap) -> Result<(), ActivateError> { + let global = VsockMetricsPerDevice::alloc("global".to_string()); for q in self.queues.iter_mut() { q.initialize(&mem) .map_err(ActivateError::QueueMemoryError)?; } if self.queues.len() != defs::VSOCK_NUM_QUEUES { - METRICS.activate_fails.inc(); + global.activate_fails.inc(); return Err(ActivateError::QueueMismatch { expected: defs::VSOCK_NUM_QUEUES, got: self.queues.len(), @@ -344,7 +348,7 @@ where } if self.activate_evt.write(1).is_err() { - METRICS.activate_fails.inc(); + global.activate_fails.inc(); return Err(ActivateError::EventFd); } diff --git a/src/vmm/src/devices/virtio/vsock/event_handler.rs b/src/vmm/src/devices/virtio/vsock/event_handler.rs index 632148546e5..7ec0e3aa1c3 100755 --- a/src/vmm/src/devices/virtio/vsock/event_handler.rs +++ b/src/vmm/src/devices/virtio/vsock/event_handler.rs @@ -33,7 +33,7 @@ use vmm_sys_util::epoll::EventSet; use super::VsockBackend; use super::device::{EVQ_INDEX, RXQ_INDEX, TXQ_INDEX, Vsock}; use crate::devices::virtio::device::VirtioDevice; -use crate::devices::virtio::vsock::metrics::METRICS; +use crate::devices::virtio::vsock::metrics::VsockMetricsPerDevice; use crate::logger::IncMetric; impl Vsock @@ -47,37 +47,39 @@ where const PROCESS_NOTIFY_BACKEND: u32 = 4; pub fn handle_rxq_event(&mut self, evset: EventSet) -> bool { + let global = VsockMetricsPerDevice::alloc("global".to_string()); if evset != EventSet::IN { warn!("vsock: rxq unexpected event {:?}", evset); - METRICS.rx_queue_event_fails.inc(); + global.rx_queue_event_fails.inc(); return false; } let mut raise_irq = false; if let Err(err) = self.queue_events[RXQ_INDEX].read() { error!("Failed to get vsock rx queue event: {:?}", err); - METRICS.rx_queue_event_fails.inc(); + global.rx_queue_event_fails.inc(); } else if self.backend.has_pending_rx() { raise_irq |= self.process_rx(); - METRICS.rx_queue_event_count.inc(); + global.rx_queue_event_count.inc(); } raise_irq } pub fn handle_txq_event(&mut self, evset: EventSet) -> bool { + let global = VsockMetricsPerDevice::alloc("global".to_string()); if evset != EventSet::IN { warn!("vsock: txq unexpected event {:?}", evset); - METRICS.tx_queue_event_fails.inc(); + global.tx_queue_event_fails.inc(); return false; } let mut raise_irq = false; if let Err(err) = self.queue_events[TXQ_INDEX].read() { error!("Failed to get vsock tx queue event: {:?}", err); - METRICS.tx_queue_event_fails.inc(); + global.tx_queue_event_fails.inc(); } else { raise_irq |= self.process_tx(); - METRICS.tx_queue_event_count.inc(); + global.tx_queue_event_count.inc(); // The backend may have queued up responses to the packets we sent during // TX queue processing. If that happened, we need to fetch those responses // and place them into RX buffers. @@ -89,15 +91,16 @@ where } pub fn handle_evq_event(&mut self, evset: EventSet) -> bool { + let global = VsockMetricsPerDevice::alloc("global".to_string()); if evset != EventSet::IN { warn!("vsock: evq unexpected event {:?}", evset); - METRICS.ev_queue_event_fails.inc(); + global.ev_queue_event_fails.inc(); return false; } if let Err(err) = self.queue_events[EVQ_INDEX].read() { error!("Failed to consume vsock evq event: {:?}", err); - METRICS.ev_queue_event_fails.inc(); + global.ev_queue_event_fails.inc(); } false } diff --git a/src/vmm/src/devices/virtio/vsock/metrics.rs b/src/vmm/src/devices/virtio/vsock/metrics.rs index d626d5dca34..f3bdde2d353 100644 --- a/src/vmm/src/devices/virtio/vsock/metrics.rs +++ b/src/vmm/src/devices/virtio/vsock/metrics.rs @@ -41,16 +41,66 @@ use serde::{Serialize, Serializer}; use crate::logger::SharedIncMetric; +use std::sync::{Arc, RwLock}; +use std::collections::BTreeMap; + /// Stores aggregate metrics of all Vsock connections/actions -pub(super) static METRICS: VsockDeviceMetrics = VsockDeviceMetrics::new(); +// pub(super) static METRICS: VsockDeviceMetrics = VsockDeviceMetrics::new(); -/// Called by METRICS.flush(), this function facilitates serialization of vsock device metrics. +/// This function facilitates aggregation and serialization of +/// per device vsock metrics. (Can also handle singular) pub fn flush_metrics(serializer: S) -> Result { - let mut seq = serializer.serialize_map(Some(1))?; - seq.serialize_entry("vsock", &METRICS)?; + let vsock_metrics = METRICS.read().unwrap(); + let metrics_len = vsock_metrics.metrics.len(); + // +1 to accomodate aggregate net metrics + let mut seq = serializer.serialize_map(Some(1 + metrics_len))?; + + let mut vsock_aggregated: VsockDeviceMetrics = VsockDeviceMetrics::default(); + + for (name, metrics) in vsock_metrics.metrics.iter() { + let devn = format!("vsock_{}", name); + // serialization will flush the metrics so aggregate before it. + let m: &VsockDeviceMetrics = metrics; + vsock_aggregated.aggregate(m); + seq.serialize_entry(&devn, m)?; + } + seq.serialize_entry("vsock", &vsock_aggregated)?; seq.end() } +pub struct VsockMetricsPerDevice { + pub metrics: BTreeMap> +} + +impl VsockMetricsPerDevice { + /// Allocate `NetDeviceMetrics` for net device having + /// id `iface_id`. Also, allocate only if it doesn't + /// exist to avoid overwriting previously allocated data. + /// lock is always initialized so it is safe the unwrap + /// the lock without a check. + pub fn alloc(iface_id: String) -> Arc { + Arc::clone( + METRICS + .write() + .unwrap() + .metrics + .entry(iface_id) + .or_insert_with(|| Arc::new(VsockDeviceMetrics::default())), + ) + } +} + +static METRICS: RwLock = RwLock::new(VsockMetricsPerDevice { + metrics: { + let tree = BTreeMap::new(); + tree.insert( + "global".to_string(), + Arc::new(VsockDeviceMetrics::default()), + ); + tree + }, +}); + /// Vsock-related metrics. #[derive(Debug, Serialize)] pub(super) struct VsockDeviceMetrics { @@ -130,16 +180,178 @@ pub mod tests { use super::*; use crate::logger::IncMetric; + // Simple test to test ability to handle different devices based on some id + // Mimics the behavior and test of per-device structure in network devices. #[test] fn test_vsock_dev_metrics() { - let vsock_metrics: VsockDeviceMetrics = VsockDeviceMetrics::new(); - let vsock_metrics_local: String = serde_json::to_string(&vsock_metrics).unwrap(); - // the 1st serialize flushes the metrics and resets values to 0 so that - // we can compare the values with local metrics. - serde_json::to_string(&METRICS).unwrap(); - let vsock_metrics_global: String = serde_json::to_string(&METRICS).unwrap(); - assert_eq!(vsock_metrics_local, vsock_metrics_global); - vsock_metrics.conns_added.inc(); - assert_eq!(vsock_metrics.conns_added.count(), 1); + drop(METRICS.read().unwrap()); + drop(METRICS.write().unwrap()); + + for i in 0..3 { + let devn: String = format!("vsock{}", i); + VsockMetricsPerDevice::alloc(devn.clone()); + METRICS + .read() + .unwrap() + .metrics + .get(&devn) + .unwrap() + .conns_added + .inc(); + } + METRICS + .read() + .unwrap() + .metrics + .get("vsock1") + .unwrap() + .conns_added + .add(5); + METRICS + .read() + .unwrap() + .metrics + .get("vsock2") + .unwrap() + .activate_fails + .inc(); + + let json_output = serde_json::to_string(&*METRICS.read().unwrap()).unwrap(); + + // Optional: print JSON to visually verify structure + println!("{}", json_output); + + let parsed: serde_json::Value = serde_json::from_str(&json_output).unwrap(); + let a_count = parsed["vsock_vsock0"]["conns_added"]["count"].as_u64().unwrap(); + let b_count = parsed["vsock_vsock1"]["conns_added"]["count"].as_u64().unwrap(); + let c_count = parsed["vsock_vsock2"]["conns_added"]["count"].as_u64().unwrap(); + let a_count_2 = parsed["vsock_vsock0"]["activate_fails"]["count"].as_u64().unwrap(); + let c_count_2 = parsed["vsock_vsock2"]["activate_fails"]["count"].as_u64().unwrap(); + let aggregated = parsed["vsock"]["conns_added"]["count"].as_u64().unwrap(); + + assert_eq!(a_count, 1); + assert_eq!(b_count, 6); + assert_eq!(c_count, 1); + assert_eq!(a_count_2, 0); + assert_eq!(c_count_2, 1); + assert_eq!(aggregated, 8); + + drop(METRICS.read().unwrap()); + assert_eq!(METRICS + .read() + .unwrap() + .metrics + .get("vsock0") + .unwrap() + .conns_added + .count(), 0); + assert_eq!(METRICS + .read() + .unwrap() + .metrics + .get("vsock1") + .unwrap() + .conns_added + .count(), 0); + + METRICS + .read() + .unwrap() + .metrics + .get("vsock0") + .unwrap() + .activate_fails + .inc(); + + METRICS + .read() + .unwrap() + .metrics + .get("vsock0") + .unwrap() + .rx_bytes_count + .inc(); + + } + + // Device meant to test capability of retrieving and maintaining + // a default vsock for the tree, the default represents the global value. + // Also copies thread safety test from net devices. + #[test] + fn test_vsock_default() { + // Use vsock0 so that we can check thread safety with other tests. + let devn = "vsock0"; + + // Drop any existing read/write lock to avoid deadlocks or stale locks. + drop(METRICS.read().unwrap()); + drop(METRICS.write().unwrap()); + + // Allocate metrics for the device. + VsockMetricsPerDevice::alloc(String::from(devn)); + assert!(METRICS.read().unwrap().metrics.get(devn).is_some()); + + // Increment a field (e.g. activate_fails) to ensure it's being tracked. + METRICS + .read() + .unwrap() + .metrics + .get(devn) + .unwrap() + .activate_fails + .inc(); + + let count = METRICS + .read() + .unwrap() + .metrics + .get(devn) + .unwrap() + .activate_fails + .count(); + assert!( + count > 0, + "Expected activate_fails count > 0 but got {}", + count + ); + + // Ensure only up to 2 tests increment this (if sharing across tests). + assert!( + count <= 2, + "Expected activate_fails count <= 2 but got {}", + count + ); + + // Add more metric changes and assert correctness. + METRICS + .read() + .unwrap() + .metrics + .get(devn) + .unwrap() + .activate_fails + .inc(); + + METRICS + .read() + .unwrap() + .metrics + .get(devn) + .unwrap() + .rx_bytes_count + .add(5); + + let rx_count = METRICS + .read() + .unwrap() + .metrics + .get(devn) + .unwrap() + .rx_bytes_count + .count(); + assert!( + rx_count >= 5, + "Expected rx_bytes_count >= 5 but got {}", + rx_count + ); } } diff --git a/src/vmm/src/devices/virtio/vsock/unix/muxer.rs b/src/vmm/src/devices/virtio/vsock/unix/muxer.rs index 478d5c7318d..ebfd0b24a67 100644 --- a/src/vmm/src/devices/virtio/vsock/unix/muxer.rs +++ b/src/vmm/src/devices/virtio/vsock/unix/muxer.rs @@ -48,6 +48,7 @@ use super::{MuxerConnection, VsockUnixBackendError, defs}; use crate::devices::virtio::vsock::metrics::METRICS; use crate::devices::virtio::vsock::packet::{VsockPacketRx, VsockPacketTx}; use crate::logger::IncMetric; +use crate::vmm_config::vsock::VsockSocketType; /// A unique identifier of a `MuxerConnection` object. Connections are stored in a hash map, /// keyed by a `ConnMapKey` object. @@ -108,6 +109,8 @@ pub struct VsockMuxer { local_port_set: HashSet, /// The last used host-side port. local_port_last: u32, + /// Type of Socket (Either Stream or SeqPacket) + socket_type: VsockSocketType, } impl VsockChannel for VsockMuxer { @@ -303,7 +306,7 @@ impl VsockBackend for VsockMuxer {} impl VsockMuxer { /// Muxer constructor. - pub fn new(cid: u64, host_sock_path: String) -> Result { + pub fn new(cid: u64, host_sock_path: String, socket_type: VsockSocketType) -> Result { // Open/bind on the host Unix socket, so we can accept host-initiated // connections. let host_sock = UnixListener::bind(&host_sock_path) @@ -321,6 +324,7 @@ impl VsockMuxer { killq: MuxerKillQ::new(), local_port_last: (1u32 << 30) - 1, local_port_set: HashSet::with_capacity(defs::MAX_CONNECTIONS), + socket_type, }; // Listen on the host initiated socket, for incoming connections. @@ -849,7 +853,7 @@ mod tests { ) .unwrap(); - let muxer = VsockMuxer::new(PEER_CID, get_file(name)).unwrap(); + let muxer = VsockMuxer::new(PEER_CID, get_file(name), VsockSocketType::Stream).unwrap(); Self { _vsock_test_ctx: vsock_test_ctx, rx_pkt, diff --git a/src/vmm/src/vmm_config/vsock.rs b/src/vmm/src/vmm_config/vsock.rs index 920e4a4d217..2fb0970db33 100644 --- a/src/vmm/src/vmm_config/vsock.rs +++ b/src/vmm/src/vmm_config/vsock.rs @@ -32,6 +32,20 @@ pub struct VsockDeviceConfig { pub guest_cid: u32, /// Path to local unix socket. pub uds_path: String, + // Type of socket being used + #[serde(default = "default_socket_type")] + pub socket_type : VsockSocketType, +} + +fn default_socket_type() -> VsockSocketType { + VsockSocketType::Stream +} + +#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] +#[serde(rename_all = "lowercase")] +pub enum VsockSocketType { + Stream, + SeqPacket, } #[derive(Debug)] @@ -47,6 +61,7 @@ impl From<&VsockAndUnixPath> for VsockDeviceConfig { vsock_id: None, guest_cid: u32::try_from(vsock_lock.cid()).unwrap(), uds_path: vsock.uds_path.clone(), + socket_type: vsock_lock.socket_type() } } } @@ -99,7 +114,7 @@ impl VsockBuilder { pub fn create_unixsock_vsock( cfg: VsockDeviceConfig, ) -> Result, VsockConfigError> { - let backend = VsockUnixBackend::new(u64::from(cfg.guest_cid), cfg.uds_path)?; + let backend = VsockUnixBackend::new(u64::from(cfg.guest_cid), cfg.uds_path, cfg.socket_type)?; Vsock::new(u64::from(cfg.guest_cid), backend).map_err(VsockConfigError::CreateVsockDevice) } @@ -122,6 +137,7 @@ pub(crate) mod tests { vsock_id: None, guest_cid: 3, uds_path: tmp_sock_file.as_path().to_str().unwrap().to_string(), + socket_type: VsockSocketType::Stream, } } @@ -168,10 +184,11 @@ pub(crate) mod tests { fn test_set_device() { let mut vsock_builder = VsockBuilder::new(); let mut tmp_sock_file = TempFile::new().unwrap(); + let socket_type = VsockSocketType::Stream; tmp_sock_file.remove().unwrap(); let vsock = Vsock::new( 0, - VsockUnixBackend::new(1, tmp_sock_file.as_path().to_str().unwrap().to_string()) + VsockUnixBackend::new(1, tmp_sock_file.as_path().to_str().unwrap().to_string(), socket_type) .unwrap(), ) .unwrap();