From edb0440a2ffd62f0b3e2950f87ee5d50829b2575 Mon Sep 17 00:00:00 2001 From: Mike Zeller Date: Tue, 13 May 2025 19:56:41 +0000 Subject: [PATCH 1/4] [spr] initial version Created using spr 1.3.6-beta.1 --- sled-diagnostics/src/lib.rs | 145 +++++++++++++++++++++++++----------- 1 file changed, 103 insertions(+), 42 deletions(-) diff --git a/sled-diagnostics/src/lib.rs b/sled-diagnostics/src/lib.rs index 8b7639a45b1..a3778b2d2e5 100644 --- a/sled-diagnostics/src/lib.rs +++ b/sled-diagnostics/src/lib.rs @@ -4,7 +4,8 @@ //! Diagnostics for an Oxide sled that exposes common support commands. -use futures::{StreamExt, stream::FuturesUnordered}; +use std::sync::Arc; + use slog::Logger; #[macro_use] @@ -21,6 +22,7 @@ cfg_if::cfg_if! { pub mod logs; pub use logs::{LogError, LogsHandle}; +use tokio::{sync::Semaphore, task::JoinSet}; mod queries; pub use crate::queries::{ @@ -29,6 +31,59 @@ pub use crate::queries::{ }; use queries::*; +/// Max number of commands to run in parallel +const MAX_PARALLELISM: usize = 50; + +trait ParallelCommandExecution { + type Output; + + /// Add a command to the set of commands to be executed. + async fn add_command(&mut self, command: F) + where + F: std::future::Future + Send + 'static; +} + +struct MultipleCommands { + semaphore: Arc, + set: JoinSet, +} + +impl MultipleCommands { + fn new() -> MultipleCommands { + let semaphore = Arc::new(Semaphore::new(MAX_PARALLELISM)); + let set = JoinSet::new(); + + Self { semaphore, set } + } + + /// Wait for all commands to execute and return their output. + async fn join_all(self) -> Vec { + self.set.join_all().await + } +} + +impl ParallelCommandExecution for MultipleCommands +where + T: Send + 'static, +{ + type Output = T; + + async fn add_command(&mut self, command: F) + where + F: std::future::Future + Send + 'static, + { + let permit = Arc::clone(&self.semaphore) + .acquire_owned() + .await + .expect("semaphore acquire"); + let _abort_handle = self.set.spawn(async move { + let res = command.await; + drop(permit); + res + }); + } +} + /// List all zones on a sled. pub async fn zoneadm_info() -> Result { @@ -38,33 +93,33 @@ pub async fn zoneadm_info() /// Retrieve various `ipadm` command output for the system. pub async fn ipadm_info() -> Vec> { - [ipadm_show_interface(), ipadm_show_addr(), ipadm_show_prop()] - .into_iter() - .map(|c| async move { - execute_command_with_timeout(c, DEFAULT_TIMEOUT).await - }) - .collect::>() - .collect::>>() - .await + let mut commands = MultipleCommands::new(); + for command in + [ipadm_show_interface(), ipadm_show_addr(), ipadm_show_prop()] + { + commands + .add_command(execute_command_with_timeout(command, DEFAULT_TIMEOUT)) + .await + } + commands.join_all().await } /// Retrieve various `dladm` command output for the system. pub async fn dladm_info() -> Vec> { - [ + let mut commands = MultipleCommands::new(); + for command in [ dladm_show_phys(), dladm_show_ether(), dladm_show_link(), dladm_show_vnic(), dladm_show_linkprop(), - ] - .into_iter() - .map(|c| async move { - execute_command_with_timeout(c, DEFAULT_TIMEOUT).await - }) - .collect::>() - .collect::>>() - .await + ] { + commands + .add_command(execute_command_with_timeout(command, DEFAULT_TIMEOUT)) + .await + } + commands.join_all().await } pub async fn nvmeadm_info() @@ -83,14 +138,16 @@ pub async fn pargs_oxide_processes( Err(e) => return vec![Err(e.into())], }; - pids.iter() - .map(|pid| pargs_process(*pid)) - .map(|c| async move { - execute_command_with_timeout(c, DEFAULT_TIMEOUT).await - }) - .collect::>() - .collect::>>() - .await + let mut commands = MultipleCommands::new(); + for pid in pids { + commands + .add_command(execute_command_with_timeout( + pargs_process(pid), + DEFAULT_TIMEOUT, + )) + .await; + } + commands.join_all().await } pub async fn pstack_oxide_processes( @@ -104,14 +161,16 @@ pub async fn pstack_oxide_processes( Err(e) => return vec![Err(e.into())], }; - pids.iter() - .map(|pid| pstack_process(*pid)) - .map(|c| async move { - execute_command_with_timeout(c, DEFAULT_TIMEOUT).await - }) - .collect::>() - .collect::>>() - .await + let mut commands = MultipleCommands::new(); + for pid in pids { + commands + .add_command(execute_command_with_timeout( + pstack_process(pid), + DEFAULT_TIMEOUT, + )) + .await; + } + commands.join_all().await } pub async fn pfiles_oxide_processes( @@ -125,14 +184,16 @@ pub async fn pfiles_oxide_processes( Err(e) => return vec![Err(e.into())], }; - pids.iter() - .map(|pid| pfiles_process(*pid)) - .map(|c| async move { - execute_command_with_timeout(c, DEFAULT_TIMEOUT).await - }) - .collect::>() - .collect::>>() - .await + let mut commands = MultipleCommands::new(); + for pid in pids { + commands + .add_command(execute_command_with_timeout( + pfiles_process(pid), + DEFAULT_TIMEOUT, + )) + .await; + } + commands.join_all().await } /// Retrieve various `zfs` command output for the system. From 6a5a376839ac22f8a9345ac587d865674688527c Mon Sep 17 00:00:00 2001 From: Mike Zeller Date: Wed, 14 May 2025 00:35:16 +0000 Subject: [PATCH 2/4] apply eliza's feedback Created using spr 1.3.6-beta.1 --- sled-diagnostics/src/lib.rs | 43 +++++++++++++++---------------------- 1 file changed, 17 insertions(+), 26 deletions(-) diff --git a/sled-diagnostics/src/lib.rs b/sled-diagnostics/src/lib.rs index a3778b2d2e5..ef546f230b0 100644 --- a/sled-diagnostics/src/lib.rs +++ b/sled-diagnostics/src/lib.rs @@ -38,7 +38,7 @@ trait ParallelCommandExecution { type Output; /// Add a command to the set of commands to be executed. - async fn add_command(&mut self, command: F) + fn add_command(&mut self, command: F) where F: std::future::Future + Send + 'static; } @@ -68,15 +68,14 @@ where { type Output = T; - async fn add_command(&mut self, command: F) + fn add_command(&mut self, command: F) where F: std::future::Future + Send + 'static, { - let permit = Arc::clone(&self.semaphore) - .acquire_owned() - .await - .expect("semaphore acquire"); + let semaphore = Arc::clone(&self.semaphore); let _abort_handle = self.set.spawn(async move { + let permit = + semaphore.acquire_owned().await.expect("semaphore acquire"); let res = command.await; drop(permit); res @@ -99,7 +98,6 @@ pub async fn ipadm_info() { commands .add_command(execute_command_with_timeout(command, DEFAULT_TIMEOUT)) - .await } commands.join_all().await } @@ -117,7 +115,6 @@ pub async fn dladm_info() ] { commands .add_command(execute_command_with_timeout(command, DEFAULT_TIMEOUT)) - .await } commands.join_all().await } @@ -140,12 +137,10 @@ pub async fn pargs_oxide_processes( let mut commands = MultipleCommands::new(); for pid in pids { - commands - .add_command(execute_command_with_timeout( - pargs_process(pid), - DEFAULT_TIMEOUT, - )) - .await; + commands.add_command(execute_command_with_timeout( + pargs_process(pid), + DEFAULT_TIMEOUT, + )); } commands.join_all().await } @@ -163,12 +158,10 @@ pub async fn pstack_oxide_processes( let mut commands = MultipleCommands::new(); for pid in pids { - commands - .add_command(execute_command_with_timeout( - pstack_process(pid), - DEFAULT_TIMEOUT, - )) - .await; + commands.add_command(execute_command_with_timeout( + pstack_process(pid), + DEFAULT_TIMEOUT, + )); } commands.join_all().await } @@ -186,12 +179,10 @@ pub async fn pfiles_oxide_processes( let mut commands = MultipleCommands::new(); for pid in pids { - commands - .add_command(execute_command_with_timeout( - pfiles_process(pid), - DEFAULT_TIMEOUT, - )) - .await; + commands.add_command(execute_command_with_timeout( + pfiles_process(pid), + DEFAULT_TIMEOUT, + )); } commands.join_all().await } From b5eb1589e329e3e24987b49eb5adac19b0c72b6f Mon Sep 17 00:00:00 2001 From: Mike Zeller Date: Wed, 14 May 2025 00:45:31 +0000 Subject: [PATCH 3/4] minor cleanup Created using spr 1.3.6-beta.1 --- sled-diagnostics/src/lib.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sled-diagnostics/src/lib.rs b/sled-diagnostics/src/lib.rs index ef546f230b0..eaf7b01a842 100644 --- a/sled-diagnostics/src/lib.rs +++ b/sled-diagnostics/src/lib.rs @@ -74,11 +74,10 @@ where { let semaphore = Arc::clone(&self.semaphore); let _abort_handle = self.set.spawn(async move { - let permit = + // Hold onto the permit until the command finishes executing + let _permit = semaphore.acquire_owned().await.expect("semaphore acquire"); - let res = command.await; - drop(permit); - res + command.await }); } } From dee2f7e2c31081905fa01fbc3d98e38e45823be9 Mon Sep 17 00:00:00 2001 From: Mike Zeller Date: Wed, 14 May 2025 00:52:09 +0000 Subject: [PATCH 4/4] drop the trait Created using spr 1.3.6-beta.1 --- sled-diagnostics/src/lib.rs | 30 +++++++----------------------- 1 file changed, 7 insertions(+), 23 deletions(-) diff --git a/sled-diagnostics/src/lib.rs b/sled-diagnostics/src/lib.rs index eaf7b01a842..1ffe89bbb81 100644 --- a/sled-diagnostics/src/lib.rs +++ b/sled-diagnostics/src/lib.rs @@ -34,21 +34,12 @@ use queries::*; /// Max number of commands to run in parallel const MAX_PARALLELISM: usize = 50; -trait ParallelCommandExecution { - type Output; - - /// Add a command to the set of commands to be executed. - fn add_command(&mut self, command: F) - where - F: std::future::Future + Send + 'static; -} - struct MultipleCommands { semaphore: Arc, set: JoinSet, } -impl MultipleCommands { +impl MultipleCommands { fn new() -> MultipleCommands { let semaphore = Arc::new(Semaphore::new(MAX_PARALLELISM)); let set = JoinSet::new(); @@ -56,21 +47,9 @@ impl MultipleCommands { Self { semaphore, set } } - /// Wait for all commands to execute and return their output. - async fn join_all(self) -> Vec { - self.set.join_all().await - } -} - -impl ParallelCommandExecution for MultipleCommands -where - T: Send + 'static, -{ - type Output = T; - fn add_command(&mut self, command: F) where - F: std::future::Future + Send + 'static, + F: std::future::Future + Send + 'static, { let semaphore = Arc::clone(&self.semaphore); let _abort_handle = self.set.spawn(async move { @@ -80,6 +59,11 @@ where command.await }); } + + /// Wait for all commands to execute and return their output. + async fn join_all(self) -> Vec { + self.set.join_all().await + } } /// List all zones on a sled.