Skip to content

[sled-diagnostics] use JoinSet for multiple commands #8151

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 77 additions & 42 deletions sled-diagnostics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

//! Diagnostics for an Oxide sled that exposes common support commands.

use futures::{StreamExt, stream::FuturesUnordered};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the rebase picked up new code that uses FuturesUnordered: https://github.com/oxidecomputer/omicron/actions/runs/15056224877/job/42322633598?pr=8151#step:11:767

use std::sync::Arc;

use slog::Logger;

#[macro_use]
Expand All @@ -21,6 +22,7 @@ cfg_if::cfg_if! {

pub mod logs;
pub use logs::{LogError, LogsHandle};
use tokio::{sync::Semaphore, task::JoinSet};

mod queries;
pub use crate::queries::{
Expand All @@ -29,6 +31,41 @@ pub use crate::queries::{
};
use queries::*;

/// Max number of commands to run in parallel
const MAX_PARALLELISM: usize = 50;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did we arrive at this number?


struct MultipleCommands<T> {
semaphore: Arc<Semaphore>,
set: JoinSet<T>,
}

impl<T: 'static + Send> MultipleCommands<T> {
fn new() -> MultipleCommands<T> {
let semaphore = Arc::new(Semaphore::new(MAX_PARALLELISM));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the subject of the concurrency limit, it seems to me that the functions in this crate that run multiple commands fall into two categories: those that run a fairly small, fixed number of commands, like ipadm_info and dladm_info, and those which run a command or set of commands against every Oxide process PID (pargs_oxide_processes etc).

For the functions that run commands against every Oxide process pid, the concurrency limit is certainly useful, as there may be basically any number of pids. But something like ipadm_info will always spawn exactly 3 processes, which is below the concurrency limit, and all this faffing around with a Semaphore is unnecessary.

I kind of wonder if the class of functions that spawn a fixed set of commands should eschew the use of MultipleCommands and just construct a JoinSet and spawn their 3 or 5 tasks or whatever. In practice, any overhead from the semaphore acquire/release/drop and stuff is probably insignificant compared to "actually spawning a child process" so this probably doesn't actually matter, but we could avoid doing it...up to you.

let set = JoinSet::new();

Self { semaphore, set }
}

fn add_command<F>(&mut self, command: F)
where
F: std::future::Future<Output = T> + Send + 'static,
{
let semaphore = Arc::clone(&self.semaphore);
let _abort_handle = self.set.spawn(async move {
// Hold onto the permit until the command finishes executing
let _permit =
semaphore.acquire_owned().await.expect("semaphore acquire");
command.await
});
}

/// Wait for all commands to execute and return their output.
async fn join_all(self) -> Vec<T> {
self.set.join_all().await
}
}

/// List all zones on a sled.
pub async fn zoneadm_info()
-> Result<SledDiagnosticsCmdOutput, SledDiagnosticsCmdError> {
Expand All @@ -38,33 +75,31 @@ pub async fn zoneadm_info()
/// Retrieve various `ipadm` command output for the system.
pub async fn ipadm_info()
-> Vec<Result<SledDiagnosticsCmdOutput, SledDiagnosticsCmdError>> {
[ipadm_show_interface(), ipadm_show_addr(), ipadm_show_prop()]
.into_iter()
.map(|c| async move {
execute_command_with_timeout(c, DEFAULT_TIMEOUT).await
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<Result<_, _>>>()
.await
let mut commands = MultipleCommands::new();
for command in
[ipadm_show_interface(), ipadm_show_addr(), ipadm_show_prop()]
{
commands
.add_command(execute_command_with_timeout(command, DEFAULT_TIMEOUT))
}
commands.join_all().await
}

/// Retrieve various `dladm` command output for the system.
pub async fn dladm_info()
-> Vec<Result<SledDiagnosticsCmdOutput, SledDiagnosticsCmdError>> {
[
let mut commands = MultipleCommands::new();
for command in [
dladm_show_phys(),
dladm_show_ether(),
dladm_show_link(),
dladm_show_vnic(),
dladm_show_linkprop(),
]
.into_iter()
.map(|c| async move {
execute_command_with_timeout(c, DEFAULT_TIMEOUT).await
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<Result<_, _>>>()
.await
] {
commands
.add_command(execute_command_with_timeout(command, DEFAULT_TIMEOUT))
}
commands.join_all().await
}

pub async fn nvmeadm_info()
Expand All @@ -83,14 +118,14 @@ pub async fn pargs_oxide_processes(
Err(e) => return vec![Err(e.into())],
};

pids.iter()
.map(|pid| pargs_process(*pid))
.map(|c| async move {
execute_command_with_timeout(c, DEFAULT_TIMEOUT).await
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<Result<_, _>>>()
.await
let mut commands = MultipleCommands::new();
for pid in pids {
commands.add_command(execute_command_with_timeout(
pargs_process(pid),
DEFAULT_TIMEOUT,
));
}
commands.join_all().await
}

pub async fn pstack_oxide_processes(
Expand All @@ -104,14 +139,14 @@ pub async fn pstack_oxide_processes(
Err(e) => return vec![Err(e.into())],
};

pids.iter()
.map(|pid| pstack_process(*pid))
.map(|c| async move {
execute_command_with_timeout(c, DEFAULT_TIMEOUT).await
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<Result<_, _>>>()
.await
let mut commands = MultipleCommands::new();
for pid in pids {
commands.add_command(execute_command_with_timeout(
pstack_process(pid),
DEFAULT_TIMEOUT,
));
}
commands.join_all().await
}

pub async fn pfiles_oxide_processes(
Expand All @@ -125,14 +160,14 @@ pub async fn pfiles_oxide_processes(
Err(e) => return vec![Err(e.into())],
};

pids.iter()
.map(|pid| pfiles_process(*pid))
.map(|c| async move {
execute_command_with_timeout(c, DEFAULT_TIMEOUT).await
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<Result<_, _>>>()
.await
let mut commands = MultipleCommands::new();
for pid in pids {
commands.add_command(execute_command_with_timeout(
pfiles_process(pid),
DEFAULT_TIMEOUT,
));
}
commands.join_all().await
}

/// Retrieve various `zfs` command output for the system.
Expand Down
Loading