From 52bdca7317c3aaee21da6b4f36f9549509edf309 Mon Sep 17 00:00:00 2001 From: Ben Schofield Date: Fri, 5 Jan 2024 10:36:30 -0800 Subject: [PATCH] Add a tracing warning when a thread blocks steps Add a warning to the sim when a given host or client blocks progress in a simulation run. This works by spawning a background thread for each run that periodically checks the steps taken by the simulation. If the number of steps is the same between checks then the thread adds the tracing info. --- src/sim.rs | 32 ++++++++++++++++++++++++++------ 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/src/sim.rs b/src/sim.rs index 6f2abbf..a4353c3 100644 --- a/src/sim.rs +++ b/src/sim.rs @@ -5,7 +5,10 @@ use std::cell::RefCell; use std::future::Future; use std::net::IpAddr; use std::ops::DerefMut; -use std::sync::Arc; +use std::sync::mpsc::TryRecvError; +use std::sync::{Arc, mpsc}; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::Relaxed; use std::time::UNIX_EPOCH; use tokio::time::Duration; use tracing::Level; @@ -30,7 +33,7 @@ pub struct Sim<'a> { /// Simulation elapsed time elapsed: Duration, - steps: usize, + steps: Arc, } impl<'a> Sim<'a> { @@ -46,7 +49,7 @@ impl<'a> Sim<'a> { rts: IndexMap::new(), since_epoch, elapsed: Duration::ZERO, - steps: 1, // bumped after each step + steps: Arc::new(1.into()), // bumped after each step } } @@ -309,10 +312,27 @@ impl<'a> Sim<'a> { /// Executes a simple event loop that calls [step](#method.step) each iteration, /// returning early if any host software errors. pub fn run(&mut self) -> Result { + let steps = self.steps.clone(); + let (tx, rx) = mpsc::channel(); + std::thread::spawn(move || { + loop { + let prev = steps.load(std::sync::atomic::Ordering::Relaxed); + // Exit if main thread has. + match rx.try_recv() { + Ok(_) | Err(TryRecvError::Disconnected) => break, + _ => {} + } + std::thread::sleep(Duration::from_secs(10)); + if steps.load(std::sync::atomic::Ordering::Relaxed) == prev { + tracing::warn!("A task is blocking preventing simulation steps."); + } + } + }); loop { let is_finished = self.step()?; if is_finished { + let _ = tx.send(()); return Ok(()); } } @@ -328,7 +348,7 @@ impl<'a> Sim<'a> { /// /// Returns whether or not all clients have completed. pub fn step(&mut self) -> Result { - tracing::debug!("step {}", self.steps); + tracing::debug!("step {}", self.steps.load(Relaxed)); let tick = self.config.tick; let mut is_finished = true; @@ -380,12 +400,12 @@ impl<'a> Sim<'a> { } self.elapsed += tick; - self.steps += 1; + self.steps.fetch_add(1, Relaxed); if self.elapsed > self.config.duration && !is_finished { return Err(format!( "Ran for duration: {:?} steps: {} without completing", - self.config.duration, self.steps, + self.config.duration, self.steps.load(Relaxed), ))?; }