diff --git a/nexus/src/app/background/tasks/support_bundle_collector.rs b/nexus/src/app/background/tasks/support_bundle_collector.rs index 3629ffce9b..ab61422332 100644 --- a/nexus/src/app/background/tasks/support_bundle_collector.rs +++ b/nexus/src/app/background/tasks/support_bundle_collector.rs @@ -42,6 +42,7 @@ use std::sync::Arc; use tokio::io::AsyncReadExt; use tokio::io::AsyncSeekExt; use tokio::io::SeekFrom; +use tokio::task::JoinSet; use tufaceous_artifact::ArtifactHash; use zip::ZipArchive; use zip::ZipWriter; @@ -60,7 +61,7 @@ fn authz_support_bundle_from_id(id: SupportBundleUuid) -> authz::SupportBundle { } // Specifies the data to be collected within the Support Bundle. -#[derive(Default)] +#[derive(Clone, Default)] struct BundleRequest { // If "false": Skip collecting host-specific info from each sled. skip_sled_info: bool, @@ -373,13 +374,13 @@ impl SupportBundleCollector { } }; - let collection = BundleCollection { - collector: &self, + let collection = Arc::new(BundleCollection { + datastore: self.datastore.clone(), log: opctx.log.new(slog::o!("bundle" => bundle.id.to_string())), - opctx, - request, - bundle: &bundle, - }; + opctx: opctx.child(std::collections::BTreeMap::new()), + request: request.clone(), + bundle: bundle.clone(), + }); let authz_bundle = authz_support_bundle_from_id(bundle.id.into()); let mut report = collection.collect_bundle_and_store_on_sled().await?; @@ -416,20 +417,18 @@ impl SupportBundleCollector { } // Wraps up all arguments to perform a single support bundle collection -struct BundleCollection<'a> { - // The task responsible for this collection - collector: &'a SupportBundleCollector, - +struct BundleCollection { + datastore: Arc, log: slog::Logger, - opctx: &'a OpContext, - request: &'a BundleRequest, - bundle: &'a SupportBundle, + opctx: OpContext, + request: BundleRequest, + bundle: SupportBundle, } -impl BundleCollection<'_> { +impl BundleCollection { // Collect the bundle within Nexus, and store it on a target sled. async fn collect_bundle_and_store_on_sled( - &self, + self: &Arc, ) -> anyhow::Result { // Create a temporary directory where we'll store the support bundle // as it's being collected. @@ -456,7 +455,7 @@ impl BundleCollection<'_> { "bundle" => %self.bundle.id ); - let bundle = self.collector.datastore.support_bundle_get( + let bundle = self.datastore.support_bundle_get( &self.opctx, self.bundle.id.into() ).await?; @@ -491,7 +490,6 @@ impl BundleCollection<'_> { // Find the sled where we're storing this bundle. let sled_id = self - .collector .datastore .zpool_get_sled_if_in_service( &self.opctx, @@ -499,7 +497,7 @@ impl BundleCollection<'_> { ) .await?; let sled_client = nexus_networking::sled_client( - &self.collector.datastore, + &self.datastore, &self.opctx, sled_id.into_untyped_uuid(), &self.log, @@ -545,7 +543,7 @@ impl BundleCollection<'_> { // As a result, it is important that this function be implemented as // cancel-safe. async fn collect_bundle_as_file( - &self, + self: &Arc, dir: &Utf8TempDir, ) -> anyhow::Result { let log = &self.log; @@ -561,161 +559,201 @@ impl BundleCollection<'_> { .await?; if let Ok(all_sleds) = self - .collector .datastore .sled_list_all_batched(&self.opctx, SledFilter::InService) .await { report.listed_in_service_sleds = true; - // NOTE: This could be, and probably should be, done concurrently. - for sled in &all_sleds { - info!(&log, "Collecting bundle info from sled"; "sled" => %sled.id()); - let sled_path = dir - .path() - .join("rack") - .join(sled.rack_id.to_string()) - .join("sled") - .join(sled.id().to_string()); - tokio::fs::create_dir_all(&sled_path).await?; - tokio::fs::write( - sled_path.join("sled.txt"), - format!("{sled:?}"), - ) - .await?; + const MAX_CONCURRENT_SLED_REQUESTS: usize = 16; + let mut sleds_iter = all_sleds.into_iter().peekable(); + let mut tasks = JoinSet::new(); - if self.request.skip_sled_info { - continue; + // While we have incoming work to send to tasks (sleds_iter) + // or a task operating on that data (tasks)... + while sleds_iter.peek().is_some() || !tasks.is_empty() { + // Spawn tasks up to the concurrency limit + while tasks.len() < MAX_CONCURRENT_SLED_REQUESTS + && sleds_iter.peek().is_some() + { + if let Some(sled) = sleds_iter.next() { + let collection: Arc = self.clone(); + let dir = dir.path().to_path_buf(); + tasks.spawn({ + async move { + collection + .collect_data_from_sled(&sled, &dir) + .await + } + }); + } } - let Ok(sled_client) = nexus_networking::sled_client( - &self.collector.datastore, - &self.opctx, - sled.id(), - log, - ) - .await - else { - tokio::fs::write( - sled_path.join("error.txt"), - "Could not contact sled", - ) - .await?; - continue; - }; - - // NB: As new sled-diagnostic commands are added they should - // be added to this array so that their output can be saved - // within the support bundle. - let mut diag_cmds = futures::stream::iter([ - save_diag_cmd_output_or_error( - &sled_path, - "zoneadm", - sled_client.support_zoneadm_info(), - ) - .boxed(), - save_diag_cmd_output_or_error( - &sled_path, - "dladm", - sled_client.support_dladm_info(), - ) - .boxed(), - save_diag_cmd_output_or_error( - &sled_path, - "ipadm", - sled_client.support_ipadm_info(), - ) - .boxed(), - save_diag_cmd_output_or_error( - &sled_path, - "nvmeadm", - sled_client.support_nvmeadm_info(), - ) - .boxed(), - save_diag_cmd_output_or_error( - &sled_path, - "pargs", - sled_client.support_pargs_info(), - ) - .boxed(), - save_diag_cmd_output_or_error( - &sled_path, - "pfiles", - sled_client.support_pfiles_info(), - ) - .boxed(), - save_diag_cmd_output_or_error( - &sled_path, - "pstack", - sled_client.support_pstack_info(), - ) - .boxed(), - save_diag_cmd_output_or_error( - &sled_path, - "zfs", - sled_client.support_zfs_info(), - ) - .boxed(), - save_diag_cmd_output_or_error( - &sled_path, - "zpool", - sled_client.support_zpool_info(), - ) - .boxed(), - save_diag_cmd_output_or_error( - &sled_path, - "health-check", - sled_client.support_health_check(), - ) - .boxed(), - ]) - // Currently we execute up to 10 commands concurrently which - // might be doing their own concurrent work, for example - // collectiong `pstack` output of every Oxide process that is - // found on a sled. - .buffer_unordered(10); - - while let Some(result) = diag_cmds.next().await { - // Log that we failed to write the diag command output to a - // file but don't return early as we wish to get as much - // information as we can. - if let Err(e) = result { - error!( + // Await the completion of ongoing tasks. + // + // Keep collecting from other sleds, even if one or more of the + // sled collection tasks fail. + if let Some(result) = tasks.join_next().await { + if let Err(err) = result { + warn!( &self.log, - "failed to write diagnostic command output to \ - file: {e}" + "Failed to fully collect support bundle info from sled"; + "err" => ?err ); } } - - // For each zone we concurrently fire off a request to its - // sled-agent to collect its logs in a zip file and write the - // result to the support bundle. - let zones = sled_client.support_logs().await?.into_inner(); - let mut log_futs: FuturesUnordered<_> = zones - .iter() - .map(|zone| { - save_zone_log_zip_or_error( - log, - &sled_client, - zone, - &sled_path, - ) - }) - .collect(); - - while let Some(log_collection_result) = log_futs.next().await { - // We log any errors saving the zip file to disk and - // continue on. - if let Err(e) = log_collection_result { - error!(&self.log, "failed to write logs output: {e}"); - } - } } } Ok(report) } + + // Collect data from a sled, storing it into a directory that will + // be turned into a support bundle. + // + // - "sled" is the sled from which we should collect data. + // - "dir" is a directory where data can be stored, to be turned + // into a bundle after collection completes. + async fn collect_data_from_sled( + &self, + sled: &nexus_db_model::Sled, + dir: &Utf8Path, + ) -> anyhow::Result<()> { + let log = &self.log; + info!(&log, "Collecting bundle info from sled"; "sled" => %sled.id()); + let sled_path = dir + .join("rack") + .join(sled.rack_id.to_string()) + .join("sled") + .join(sled.id().to_string()); + tokio::fs::create_dir_all(&sled_path).await?; + tokio::fs::write(sled_path.join("sled.txt"), format!("{sled:?}")) + .await?; + + if self.request.skip_sled_info { + return Ok(()); + } + + let Ok(sled_client) = nexus_networking::sled_client( + &self.datastore, + &self.opctx, + sled.id(), + log, + ) + .await + else { + tokio::fs::write( + sled_path.join("error.txt"), + "Could not contact sled", + ) + .await?; + return Ok(()); + }; + + // NB: As new sled-diagnostic commands are added they should + // be added to this array so that their output can be saved + // within the support bundle. + let mut diag_cmds = futures::stream::iter([ + save_diag_cmd_output_or_error( + &sled_path, + "zoneadm", + sled_client.support_zoneadm_info(), + ) + .boxed(), + save_diag_cmd_output_or_error( + &sled_path, + "dladm", + sled_client.support_dladm_info(), + ) + .boxed(), + save_diag_cmd_output_or_error( + &sled_path, + "ipadm", + sled_client.support_ipadm_info(), + ) + .boxed(), + save_diag_cmd_output_or_error( + &sled_path, + "nvmeadm", + sled_client.support_nvmeadm_info(), + ) + .boxed(), + save_diag_cmd_output_or_error( + &sled_path, + "pargs", + sled_client.support_pargs_info(), + ) + .boxed(), + save_diag_cmd_output_or_error( + &sled_path, + "pfiles", + sled_client.support_pfiles_info(), + ) + .boxed(), + save_diag_cmd_output_or_error( + &sled_path, + "pstack", + sled_client.support_pstack_info(), + ) + .boxed(), + save_diag_cmd_output_or_error( + &sled_path, + "zfs", + sled_client.support_zfs_info(), + ) + .boxed(), + save_diag_cmd_output_or_error( + &sled_path, + "zpool", + sled_client.support_zpool_info(), + ) + .boxed(), + save_diag_cmd_output_or_error( + &sled_path, + "health-check", + sled_client.support_health_check(), + ) + .boxed(), + ]) + // Currently we execute up to 10 commands concurrently which + // might be doing their own concurrent work, for example + // collectiong `pstack` output of every Oxide process that is + // found on a sled. + .buffer_unordered(10); + + while let Some(result) = diag_cmds.next().await { + // Log that we failed to write the diag command output to a + // file but don't return early as we wish to get as much + // information as we can. + if let Err(e) = result { + error!( + &self.log, + "failed to write diagnostic command output to \ + file: {e}" + ); + } + } + + // For each zone we concurrently fire off a request to its + // sled-agent to collect its logs in a zip file and write the + // result to the support bundle. + let zones = sled_client.support_logs().await?.into_inner(); + let mut log_futs: FuturesUnordered<_> = zones + .iter() + .map(|zone| { + save_zone_log_zip_or_error(log, &sled_client, zone, &sled_path) + }) + .collect(); + + while let Some(log_collection_result) = log_futs.next().await { + // We log any errors saving the zip file to disk and + // continue on. + if let Err(e) = log_collection_result { + error!(&self.log, "failed to write logs output: {e}"); + } + } + return Ok(()); + } } impl BackgroundTask for SupportBundleCollector {