diff --git a/sled-diagnostics/src/lib.rs b/sled-diagnostics/src/lib.rs index b61267af01..133e176575 100644 --- a/sled-diagnostics/src/lib.rs +++ b/sled-diagnostics/src/lib.rs @@ -4,7 +4,8 @@ //! Diagnostics for an Oxide sled that exposes common support commands. -use futures::{StreamExt, stream::FuturesUnordered}; +use std::sync::Arc; + use slog::Logger; #[macro_use] @@ -21,6 +22,7 @@ cfg_if::cfg_if! { pub mod logs; pub use logs::{LogError, LogsHandle}; +use tokio::{sync::Semaphore, task::JoinSet}; mod queries; pub use crate::queries::{ @@ -29,6 +31,41 @@ pub use crate::queries::{ }; use queries::*; +/// Max number of commands to run in parallel +const MAX_PARALLELISM: usize = 50; + +struct MultipleCommands { + semaphore: Arc, + set: JoinSet, +} + +impl MultipleCommands { + fn new() -> MultipleCommands { + let semaphore = Arc::new(Semaphore::new(MAX_PARALLELISM)); + let set = JoinSet::new(); + + Self { semaphore, set } + } + + fn add_command(&mut self, command: F) + where + F: std::future::Future + Send + 'static, + { + let semaphore = Arc::clone(&self.semaphore); + let _abort_handle = self.set.spawn(async move { + // Hold onto the permit until the command finishes executing + let _permit = + semaphore.acquire_owned().await.expect("semaphore acquire"); + command.await + }); + } + + /// Wait for all commands to execute and return their output. + async fn join_all(self) -> Vec { + self.set.join_all().await + } +} + /// List all zones on a sled. pub async fn zoneadm_info() -> Result { @@ -38,33 +75,31 @@ pub async fn zoneadm_info() /// Retrieve various `ipadm` command output for the system. pub async fn ipadm_info() -> Vec> { - [ipadm_show_interface(), ipadm_show_addr(), ipadm_show_prop()] - .into_iter() - .map(|c| async move { - execute_command_with_timeout(c, DEFAULT_TIMEOUT).await - }) - .collect::>() - .collect::>>() - .await + let mut commands = MultipleCommands::new(); + for command in + [ipadm_show_interface(), ipadm_show_addr(), ipadm_show_prop()] + { + commands + .add_command(execute_command_with_timeout(command, DEFAULT_TIMEOUT)) + } + commands.join_all().await } /// Retrieve various `dladm` command output for the system. pub async fn dladm_info() -> Vec> { - [ + let mut commands = MultipleCommands::new(); + for command in [ dladm_show_phys(), dladm_show_ether(), dladm_show_link(), dladm_show_vnic(), dladm_show_linkprop(), - ] - .into_iter() - .map(|c| async move { - execute_command_with_timeout(c, DEFAULT_TIMEOUT).await - }) - .collect::>() - .collect::>>() - .await + ] { + commands + .add_command(execute_command_with_timeout(command, DEFAULT_TIMEOUT)) + } + commands.join_all().await } pub async fn nvmeadm_info() @@ -83,14 +118,14 @@ pub async fn pargs_oxide_processes( Err(e) => return vec![Err(e.into())], }; - pids.iter() - .map(|pid| pargs_process(*pid)) - .map(|c| async move { - execute_command_with_timeout(c, DEFAULT_TIMEOUT).await - }) - .collect::>() - .collect::>>() - .await + let mut commands = MultipleCommands::new(); + for pid in pids { + commands.add_command(execute_command_with_timeout( + pargs_process(pid), + DEFAULT_TIMEOUT, + )); + } + commands.join_all().await } pub async fn pstack_oxide_processes( @@ -104,14 +139,14 @@ pub async fn pstack_oxide_processes( Err(e) => return vec![Err(e.into())], }; - pids.iter() - .map(|pid| pstack_process(*pid)) - .map(|c| async move { - execute_command_with_timeout(c, DEFAULT_TIMEOUT).await - }) - .collect::>() - .collect::>>() - .await + let mut commands = MultipleCommands::new(); + for pid in pids { + commands.add_command(execute_command_with_timeout( + pstack_process(pid), + DEFAULT_TIMEOUT, + )); + } + commands.join_all().await } pub async fn pfiles_oxide_processes( @@ -125,14 +160,14 @@ pub async fn pfiles_oxide_processes( Err(e) => return vec![Err(e.into())], }; - pids.iter() - .map(|pid| pfiles_process(*pid)) - .map(|c| async move { - execute_command_with_timeout(c, DEFAULT_TIMEOUT).await - }) - .collect::>() - .collect::>>() - .await + let mut commands = MultipleCommands::new(); + for pid in pids { + commands.add_command(execute_command_with_timeout( + pfiles_process(pid), + DEFAULT_TIMEOUT, + )); + } + commands.join_all().await } /// Retrieve various `zfs` command output for the system.