From f242e6b25367871fb1ef78321cb2de1355ce9c38 Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Tue, 13 May 2025 12:18:31 -0400 Subject: [PATCH 1/2] [sled-agent-config-reconciler] Flesh out main reconicilation task --- Cargo.lock | 1 + sled-agent/config-reconciler/Cargo.toml | 1 + .../src/dataset_serialization_task.rs | 83 ++++- sled-agent/config-reconciler/src/handle.rs | 10 +- .../config-reconciler/src/internal_disks.rs | 35 +- sled-agent/config-reconciler/src/raw_disks.rs | 69 ++-- .../config-reconciler/src/reconciler_task.rs | 341 ++++++++++++++++-- .../src/reconciler_task/external_disks.rs | 13 + .../src/reconciler_task/zones.rs | 192 +++++----- 9 files changed, 581 insertions(+), 164 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 707e2664fe..58e32b448f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11543,6 +11543,7 @@ dependencies = [ "debug-ignore", "derive_more", "dropshot", + "either", "expectorate", "futures", "glob", diff --git a/sled-agent/config-reconciler/Cargo.toml b/sled-agent/config-reconciler/Cargo.toml index 6ae554cb84..9d173f3586 100644 --- a/sled-agent/config-reconciler/Cargo.toml +++ b/sled-agent/config-reconciler/Cargo.toml @@ -16,6 +16,7 @@ chrono.workspace = true debug-ignore.workspace = true derive_more.workspace = true dropshot.workspace = true +either.workspace = true futures.workspace = true glob.workspace = true id-map.workspace = true diff --git a/sled-agent/config-reconciler/src/dataset_serialization_task.rs b/sled-agent/config-reconciler/src/dataset_serialization_task.rs index 096f7f6698..650e418d36 100644 --- a/sled-agent/config-reconciler/src/dataset_serialization_task.rs +++ b/sled-agent/config-reconciler/src/dataset_serialization_task.rs @@ -24,6 +24,8 @@ use illumos_utils::zfs::DestroyDatasetError; use illumos_utils::zfs::Mountpoint; use illumos_utils::zfs::WhichDatasets; use illumos_utils::zfs::Zfs; +use illumos_utils::zpool::PathInPool; +use illumos_utils::zpool::ZpoolOrRamdisk; use nexus_sled_agent_shared::inventory::InventoryDataset; use omicron_common::disk::DatasetConfig; use omicron_common::disk::DatasetKind; @@ -32,6 +34,8 @@ use omicron_common::disk::SharedDatasetConfig; use omicron_common::zpool_name::ZpoolName; use omicron_uuid_kinds::DatasetUuid; use sled_storage::config::MountConfig; +use sled_storage::dataset::U2_DEBUG_DATASET; +use sled_storage::dataset::ZONE_DATASET; use sled_storage::manager::NestedDatasetConfig; use sled_storage::manager::NestedDatasetListOptions; use sled_storage::manager::NestedDatasetLocation; @@ -83,6 +87,29 @@ pub enum DatasetEnsureError { TestError(&'static str), } +impl DatasetEnsureError { + fn is_retryable(&self) -> bool { + match self { + // Errors that we don't know for sure _aren't_ retryable. + DatasetEnsureError::ZpoolNotFound(_) + | DatasetEnsureError::EnsureFailed { .. } => true, + + // Errors that we know aren't retryable: recovering from these + // require config changes, so there's no need to retry until that + // happens. + DatasetEnsureError::TransientZoneRootNoConfig(_) + | DatasetEnsureError::UuidMismatch { .. } => false, + + DatasetEnsureError::TransientZoneRootFailure { err, .. } => { + err.is_retryable() + } + + #[cfg(test)] + DatasetEnsureError::TestError(_) => false, + } + } +} + #[derive(Debug, thiserror::Error)] pub enum NestedDatasetMountError { #[error("could not mount dataset {}", .name)] @@ -129,6 +156,60 @@ pub enum NestedDatasetListError { #[derive(Debug, Clone, Default)] pub(crate) struct DatasetEnsureResult(IdMap); +impl DatasetEnsureResult { + pub(crate) fn has_retryable_error(&self) -> bool { + self.0.iter().any(|result| match &result.state { + DatasetState::Ensured => false, + DatasetState::FailedToEnsure(err) => err.is_retryable(), + }) + } + + pub(crate) fn all_mounted_debug_datasets<'a>( + &'a self, + mount_config: &'a MountConfig, + ) -> impl Iterator + 'a { + self.all_mounted_datasets(mount_config, DatasetKind::Debug) + } + + pub(crate) fn all_mounted_zone_root_datasets<'a>( + &'a self, + mount_config: &'a MountConfig, + ) -> impl Iterator + 'a { + self.all_mounted_datasets(mount_config, DatasetKind::TransientZoneRoot) + } + + fn all_mounted_datasets<'a>( + &'a self, + mount_config: &'a MountConfig, + kind: DatasetKind, + ) -> impl Iterator + 'a { + // We're a helper called by the pub methods on this type, so we only + // have to handle the `kind`s they call us with. + let mountpoint = match &kind { + DatasetKind::Debug => U2_DEBUG_DATASET, + DatasetKind::TransientZoneRoot => ZONE_DATASET, + _ => unreachable!( + "private function called with unexpected kind {kind:?}" + ), + }; + self.0 + .iter() + .filter(|result| match &result.state { + DatasetState::Ensured => true, + DatasetState::FailedToEnsure(_) => false, + }) + .filter(move |result| *result.config.name.kind() == kind) + .map(|result| { + let pool = *result.config.name.pool(); + PathInPool { + pool: ZpoolOrRamdisk::Zpool(pool), + path: pool + .dataset_mountpoint(&mount_config.root, mountpoint), + } + }) + } +} + #[derive(Debug, Clone)] struct SingleDatasetEnsureResult { config: DatasetConfig, @@ -149,7 +230,7 @@ enum DatasetState { FailedToEnsure(Arc), } -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) struct DatasetTaskHandle(mpsc::Sender); impl DatasetTaskHandle { diff --git a/sled-agent/config-reconciler/src/handle.rs b/sled-agent/config-reconciler/src/handle.rs index 7c5d07879a..800d4ab9b9 100644 --- a/sled-agent/config-reconciler/src/handle.rs +++ b/sled-agent/config-reconciler/src/handle.rs @@ -50,6 +50,7 @@ use crate::dump_setup_task; use crate::internal_disks::InternalDisksReceiver; use crate::ledger::LedgerTaskHandle; use crate::raw_disks; +use crate::raw_disks::RawDisksReceiver; use crate::raw_disks::RawDisksSender; use crate::reconciler_task; use crate::reconciler_task::CurrentlyManagedZpools; @@ -71,6 +72,7 @@ pub struct ConfigReconcilerSpawnToken { reconciler_result_tx: watch::Sender, currently_managed_zpools_tx: watch::Sender>, external_disks_tx: watch::Sender>, + raw_disks_rx: RawDisksReceiver, ledger_task_log: Logger, reconciler_task_log: Logger, } @@ -110,7 +112,7 @@ impl ConfigReconcilerHandle { let internal_disks_rx = InternalDisksReceiver::spawn_internal_disks_task( Arc::clone(&mount_config), - raw_disks_rx, + raw_disks_rx.clone(), base_log, ); @@ -125,7 +127,7 @@ impl ConfigReconcilerHandle { ); let (reconciler_result_tx, reconciler_result_rx) = - watch::channel(ReconcilerResult::default()); + watch::channel(ReconcilerResult::new(Arc::clone(&mount_config))); let (currently_managed_zpools_tx, currently_managed_zpools_rx) = watch::channel(Arc::default()); let currently_managed_zpools_rx = @@ -156,6 +158,7 @@ impl ConfigReconcilerHandle { reconciler_result_tx, currently_managed_zpools_tx, external_disks_tx, + raw_disks_rx, ledger_task_log: base_log .new(slog::o!("component" => "SledConfigLedgerTask")), reconciler_task_log: base_log @@ -188,6 +191,7 @@ impl ConfigReconcilerHandle { reconciler_result_tx, currently_managed_zpools_tx, external_disks_tx, + raw_disks_rx, ledger_task_log, reconciler_task_log, } = token; @@ -213,12 +217,14 @@ impl ConfigReconcilerHandle { reconciler_task::spawn( Arc::clone(self.internal_disks_rx.mount_config()), + self.dataset_task.clone(), key_requester, time_sync_config, current_config_rx, reconciler_result_tx, currently_managed_zpools_tx, external_disks_tx, + raw_disks_rx, sled_agent_facilities, reconciler_task_log, ); diff --git a/sled-agent/config-reconciler/src/internal_disks.rs b/sled-agent/config-reconciler/src/internal_disks.rs index e2630aa4d1..e9c05adbdb 100644 --- a/sled-agent/config-reconciler/src/internal_disks.rs +++ b/sled-agent/config-reconciler/src/internal_disks.rs @@ -48,6 +48,7 @@ use tokio::sync::watch::error::RecvError; use crate::disks_common::MaybeUpdatedDisk; use crate::disks_common::update_properties_from_raw_disk; use crate::raw_disks::RawDiskWithId; +use crate::raw_disks::RawDisksReceiver; /// A thin wrapper around a [`watch::Receiver`] that presents a similar API. #[derive(Debug, Clone)] @@ -143,7 +144,7 @@ impl InternalDisksReceiver { pub(crate) fn spawn_internal_disks_task( mount_config: Arc, - raw_disks_rx: watch::Receiver>, + raw_disks_rx: RawDisksReceiver, base_log: &Logger, ) -> Self { Self::spawn_with_disk_adopter( @@ -160,7 +161,7 @@ impl InternalDisksReceiver { fn spawn_with_disk_adopter( mount_config: Arc, - raw_disks_rx: watch::Receiver>, + raw_disks_rx: RawDisksReceiver, base_log: &Logger, disk_adopter: T, ) -> Self { @@ -537,7 +538,7 @@ impl IdMappable for InternalDisk { struct InternalDisksTask { // Input channel for changes to the raw disks sled-agent sees. - raw_disks_rx: watch::Receiver>, + raw_disks_rx: RawDisksReceiver, // The set of disks we've successfully started managing. disks_tx: watch::Sender>, @@ -934,11 +935,11 @@ mod tests { async fn only_m2_disks_are_adopted() { let logctx = dev::test_setup_log("only_m2_disks_are_adopted"); - let (raw_disks_tx, raw_disks_rx) = watch::channel(IdMap::new()); + let (raw_disks_tx, raw_disks_rx) = watch::channel(Arc::default()); let disk_adopter = Arc::new(TestDiskAdopter::default()); let mut disks_rx = InternalDisksReceiver::spawn_with_disk_adopter( Arc::new(any_mount_config()), - raw_disks_rx, + RawDisksReceiver(raw_disks_rx), &logctx.log, Arc::clone(&disk_adopter), ); @@ -948,6 +949,7 @@ mod tests { // Add four disks: two M.2 and two U.2. raw_disks_tx.send_modify(|disks| { + let disks = Arc::make_mut(disks); for disk in [ new_raw_test_disk(DiskVariant::M2, "m2-0"), new_raw_test_disk(DiskVariant::U2, "u2-0"), @@ -994,12 +996,13 @@ mod tests { // Setup: one disk. let mut raw_disk = new_raw_test_disk(DiskVariant::M2, "test-m2"); - let (raw_disks_tx, raw_disks_rx) = - watch::channel([raw_disk.clone().into()].into_iter().collect()); + let (raw_disks_tx, raw_disks_rx) = watch::channel(Arc::new( + [raw_disk.clone().into()].into_iter().collect(), + )); let disk_adopter = Arc::new(TestDiskAdopter::default()); let mut disks_rx = InternalDisksReceiver::spawn_with_disk_adopter( Arc::new(any_mount_config()), - raw_disks_rx, + RawDisksReceiver(raw_disks_rx), &logctx.log, Arc::clone(&disk_adopter), ); @@ -1021,7 +1024,7 @@ mod tests { ); *raw_disk.firmware_mut() = new_firmware; raw_disks_tx.send_modify(|disks| { - disks.insert(raw_disk.clone().into()); + Arc::make_mut(disks).insert(raw_disk.clone().into()); }); // Wait for the change to be noticed. @@ -1051,17 +1054,17 @@ mod tests { // Setup: two disks. let raw_disk1 = new_raw_test_disk(DiskVariant::M2, "m2-1"); let raw_disk2 = new_raw_test_disk(DiskVariant::M2, "m2-2"); - let (raw_disks_tx, raw_disks_rx) = watch::channel( + let (raw_disks_tx, raw_disks_rx) = watch::channel(Arc::new( [&raw_disk1, &raw_disk2] .into_iter() .cloned() .map(From::from) .collect(), - ); + )); let disk_adopter = Arc::new(TestDiskAdopter::default()); let mut disks_rx = InternalDisksReceiver::spawn_with_disk_adopter( Arc::new(any_mount_config()), - raw_disks_rx, + RawDisksReceiver(raw_disks_rx), &logctx.log, Arc::clone(&disk_adopter), ); @@ -1075,7 +1078,7 @@ mod tests { // Remove test disk 1. raw_disks_tx.send_modify(|raw_disks| { - raw_disks.remove(raw_disk1.identity()); + Arc::make_mut(raw_disks).remove(raw_disk1.identity()); }); // Wait for the removal to be propagated. @@ -1100,9 +1103,9 @@ mod tests { // Setup: one disk, and configure the disk adopter to fail. let raw_disk = new_raw_test_disk(DiskVariant::M2, "test-m2"); - let (_raw_disks_tx, raw_disks_rx) = watch::channel( + let (_raw_disks_tx, raw_disks_rx) = watch::channel(Arc::new( [&raw_disk].into_iter().cloned().map(From::from).collect(), - ); + )); let disk_adopter = Arc::new(TestDiskAdopter::default()); disk_adopter.inner.lock().unwrap().should_fail_requests.insert( raw_disk.identity().clone(), @@ -1113,7 +1116,7 @@ mod tests { let mut disks_rx = InternalDisksReceiver::spawn_with_disk_adopter( Arc::new(any_mount_config()), - raw_disks_rx, + RawDisksReceiver(raw_disks_rx), &logctx.log, Arc::clone(&disk_adopter), ); diff --git a/sled-agent/config-reconciler/src/raw_disks.rs b/sled-agent/config-reconciler/src/raw_disks.rs index 6c984e0832..70470416fd 100644 --- a/sled-agent/config-reconciler/src/raw_disks.rs +++ b/sled-agent/config-reconciler/src/raw_disks.rs @@ -5,7 +5,6 @@ //! Provides thin wrappers around a watch channel managing the set of //! [`RawDisk`]s sled-agent is aware of. -use id_map::Entry; use id_map::IdMap; use id_map::IdMappable; use omicron_common::disk::DiskIdentity; @@ -13,16 +12,36 @@ use sled_storage::disk::RawDisk; use slog::Logger; use slog::info; use std::ops::Deref; +use std::ops::DerefMut; use std::sync::Arc; use tokio::sync::watch; -pub(crate) fn new() -> (RawDisksSender, watch::Receiver>) { - let (tx, rx) = watch::channel(IdMap::default()); - (RawDisksSender(tx), rx) +pub(crate) fn new() -> (RawDisksSender, RawDisksReceiver) { + let (tx, rx) = watch::channel(Arc::default()); + (RawDisksSender(tx), RawDisksReceiver(rx)) } #[derive(Debug, Clone)] -pub struct RawDisksSender(watch::Sender>); +pub(crate) struct RawDisksReceiver( + pub(crate) watch::Receiver>>, +); + +impl Deref for RawDisksReceiver { + type Target = watch::Receiver>>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for RawDisksReceiver { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +#[derive(Debug, Clone)] +pub struct RawDisksSender(watch::Sender>>); impl RawDisksSender { /// Set the complete set of raw disks visible to sled-agent. @@ -44,10 +63,10 @@ impl RawDisksSender { } } - if *disks == new_disks { + if **disks == new_disks { false } else { - *disks = new_disks; + *disks = Arc::new(new_disks); true } }); @@ -57,29 +76,27 @@ impl RawDisksSender { pub fn add_or_update_raw_disk(&self, disk: RawDisk, log: &Logger) -> bool { let disk = RawDiskWithId::from(disk); self.0.send_if_modified(|disks| { - match disks.entry(Arc::clone(&disk.identity)) { - Entry::Vacant(entry) => { + match disks.get(&disk.identity) { + Some(existing) if *existing == disk => { + return false; + } + Some(existing) => { info!( - log, "Adding new raw disk"; - "identity" => ?disk.identity, + log, "Updating raw disk"; + "old" => ?existing.disk, + "new" => ?disk.disk, ); - entry.insert(disk); - true } - Entry::Occupied(mut entry) => { - if *entry.get() == disk { - false - } else { - info!( - log, "Updating raw disk"; - "old" => ?entry.get().disk, - "new" => ?disk.disk, - ); - entry.insert(disk); - true - } + None => { + info!( + log, "Adding new raw disk"; + "disk" => ?disk.disk, + ); } } + + Arc::make_mut(disks).insert(disk); + true }) } @@ -103,7 +120,7 @@ impl RawDisksSender { } info!(log, "Removing disk"; "identity" => ?identity); - disks.remove(identity); + Arc::make_mut(disks).remove(identity); true }) } diff --git a/sled-agent/config-reconciler/src/reconciler_task.rs b/sled-agent/config-reconciler/src/reconciler_task.rs index 3c91c60e67..51c0a2ecc3 100644 --- a/sled-agent/config-reconciler/src/reconciler_task.rs +++ b/sled-agent/config-reconciler/src/reconciler_task.rs @@ -6,12 +6,17 @@ use chrono::DateTime; use chrono::Utc; +use either::Either; +use futures::future; use illumos_utils::zpool::PathInPool; use key_manager::StorageKeyRequester; use nexus_sled_agent_shared::inventory::OmicronSledConfig; use sled_storage::config::MountConfig; use sled_storage::disk::Disk; use slog::Logger; +use slog::info; +use slog::warn; +use slog_error_chain::InlineErrorChain; use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; @@ -19,13 +24,17 @@ use std::time::Instant; use tokio::sync::watch; use crate::TimeSyncConfig; +use crate::dataset_serialization_task::DatasetEnsureResult; +use crate::dataset_serialization_task::DatasetTaskHandle; use crate::ledger::CurrentSledConfig; +use crate::raw_disks::RawDisksReceiver; use crate::sled_agent_facilities::SledAgentFacilities; mod external_disks; mod zones; use self::external_disks::ExternalDisks; +use self::zones::OmicronZones; pub use self::external_disks::CurrentlyManagedZpools; pub use self::external_disks::CurrentlyManagedZpoolsReceiver; @@ -35,51 +44,56 @@ pub use self::zones::TimeSyncStatus; #[allow(clippy::too_many_arguments)] pub(crate) fn spawn( mount_config: Arc, + dataset_task: DatasetTaskHandle, key_requester: StorageKeyRequester, time_sync_config: TimeSyncConfig, current_config_rx: watch::Receiver, reconciler_result_tx: watch::Sender, currently_managed_zpools_tx: watch::Sender>, external_disks_tx: watch::Sender>, + raw_disks_rx: RawDisksReceiver, sled_agent_facilities: T, log: Logger, ) { let external_disks = ExternalDisks::new( - mount_config, + Arc::clone(&mount_config), currently_managed_zpools_tx, external_disks_tx, ); + let zones = OmicronZones::new(mount_config, time_sync_config); + tokio::spawn( ReconcilerTask { key_requester, - time_sync_config, + dataset_task, current_config_rx, reconciler_result_tx, + raw_disks_rx, external_disks, - sled_agent_facilities, + zones, log, } - .run(), + .run(sled_agent_facilities), ); } #[derive(Debug, Clone)] pub(crate) struct ReconcilerResult { + mount_config: Arc, status: ReconcilerTaskStatus, latest_result: Option>, } -impl Default for ReconcilerResult { - fn default() -> Self { +impl ReconcilerResult { + pub(crate) fn new(mount_config: Arc) -> Self { Self { + mount_config, status: ReconcilerTaskStatus::NotYetRunning, latest_result: None, } } -} -impl ReconcilerResult { pub fn timesync_status(&self) -> TimeSyncStatus { self.latest_result .as_deref() @@ -90,21 +104,27 @@ impl ReconcilerResult { pub fn all_mounted_debug_datasets( &self, ) -> impl Iterator + '_ { - // unimplemented!() doesn't work with `-> impl Iterator` - if 1 > 0 { - panic!("unimplemented!"); - } - std::iter::empty() + let Some(latest_result) = &self.latest_result else { + return Either::Left(std::iter::empty()); + }; + Either::Right( + latest_result + .datasets + .all_mounted_debug_datasets(&self.mount_config), + ) } pub fn all_mounted_zone_root_datasets( &self, ) -> impl Iterator + '_ { - // unimplemented!() doesn't work with `-> impl Iterator` - if 1 > 0 { - panic!("unimplemented!"); - } - std::iter::empty() + let Some(latest_result) = &self.latest_result else { + return Either::Left(std::iter::empty()); + }; + Either::Right( + latest_result + .datasets + .all_mounted_zone_root_datasets(&self.mount_config), + ) } } @@ -112,7 +132,7 @@ impl ReconcilerResult { pub enum ReconcilerTaskStatus { NotYetRunning, WaitingForInternalDisks, - WaitingForRackSetup, + WaitingForInitialConfig, PerformingReconciliation { config: OmicronSledConfig, started_at_time: DateTime, @@ -127,21 +147,292 @@ pub enum ReconcilerTaskStatus { #[derive(Debug)] struct LatestReconcilerTaskResultInner { sled_config: OmicronSledConfig, + datasets: DatasetEnsureResult, timesync_status: TimeSyncStatus, } -struct ReconcilerTask { +struct ReconcilerTask { key_requester: StorageKeyRequester, - time_sync_config: TimeSyncConfig, + dataset_task: DatasetTaskHandle, current_config_rx: watch::Receiver, reconciler_result_tx: watch::Sender, + raw_disks_rx: RawDisksReceiver, external_disks: ExternalDisks, - sled_agent_facilities: T, + zones: OmicronZones, log: Logger, } -impl ReconcilerTask { - async fn run(self) { - unimplemented!() +impl ReconcilerTask { + async fn run(mut self, sled_agent_facilities: T) { + // If reconciliation fails, we may want to retry it. The "happy path" + // that requires this is waiting for time sync: during RSS, cold boot, + // or replacement of the NTP zone, we may fail to start any zones that + // depend on time sync. We want to retry this pretty frequently: it's + // cheap if we haven't time sync'd yet, and we'd like to move on to + // starting zones as soon as we can. + // + // We could use a more complicated retry policy than "sleep for a few + // seconds" (e.g., backoff, or even "pick the retry policy based on the + // particular kind of failure we're retrying"). For now we'll just take + // this pretty aggressive policy. + const SLEEP_BETWEEN_RETRIES: Duration = Duration::from_secs(5); + + loop { + let result = self.do_reconcilation(&sled_agent_facilities).await; + + let maybe_retry = match result { + ReconciliationResult::NoRetryNeeded => { + Either::Left(future::pending()) + } + ReconciliationResult::ShouldRetry => { + info!( + self.log, + "reconcilation result has retryable error; will retry"; + "retry_after" => ?SLEEP_BETWEEN_RETRIES, + ); + Either::Right(tokio::time::sleep(SLEEP_BETWEEN_RETRIES)) + } + }; + + // Wait for one of: + // + // 1. The current ledgered `OmicronSledConfig` has changed + // 2. The set of `RawDisk`s has changed + // 3. Our retry timer expires + tokio::select! { + // Cancel-safe per docs on `changed()` + result = self.current_config_rx.changed() => { + match result { + Ok(()) => { + info!( + self.log, + "starting reconciliation due to config change" + ); + continue; + } + Err(_closed) => { + // This should never happen in production, but may + // in tests. + warn!( + self.log, + "current_config watch channel closed; exiting" + ); + return; + } + } + } + + // Cancel-safe per docs on `changed()` + result = self.raw_disks_rx.changed() => { + match result { + Ok(()) => { + info!( + self.log, + "starting reconciliation due to raw disk change" + ); + continue; + } + Err(_closed) => { + // This should never happen in production, but may + // in tests. + warn!( + self.log, + "raw_disks watch channel closed; exiting" + ); + return; + } + } + } + + // Cancel-safe: this is either `future::pending()` (never + // completes) or `sleep()` (we don't care if it's cancelled) + _ = maybe_retry => { + info!( + self.log, + "starting reconciliation due to retryable error" + ); + continue; + } + } + } } + + async fn do_reconcilation( + &mut self, + sled_agent_facilities: &T, + ) -> ReconciliationResult { + // Take a snapshot of the current state of the input channels on which + // we act. Clone both to avoid keeping the channels locked while we + // reconcile. + let current_config = self.current_config_rx.borrow_and_update().clone(); + let current_raw_disks = self.raw_disks_rx.borrow_and_update().clone(); + + // See whether we actually have a config to reconcile against. + let started_at_instant = Instant::now(); + let sled_config = match current_config { + // In both `WaitingFor...` cases, we don't need to retry on our own: + // we'll retry as soon as there's a change in our inputs that might + // nudge us out of these states. + CurrentSledConfig::WaitingForInternalDisks => { + self.reconciler_result_tx.send_modify(|r| { + r.status = ReconcilerTaskStatus::WaitingForInternalDisks; + }); + return ReconciliationResult::NoRetryNeeded; + } + CurrentSledConfig::WaitingForInitialConfig => { + self.reconciler_result_tx.send_modify(|r| { + r.status = ReconcilerTaskStatus::WaitingForInitialConfig; + }); + return ReconciliationResult::NoRetryNeeded; + } + CurrentSledConfig::Ledgered(sled_config) => { + self.reconciler_result_tx.send_modify(|r| { + r.status = ReconcilerTaskStatus::PerformingReconciliation { + config: sled_config.clone(), + started_at_time: Utc::now(), + started_at_instant, + }; + }); + sled_config + } + }; + + // --- + // We go through the removal process first: shut down zones, then remove + // datasets, then remove disks. + // --- + + // First, shut down zones if needed. + let zone_shutdown_result = self + .zones + .shut_down_zones_if_needed( + &sled_config.zones, + sled_agent_facilities, + &self.log, + ) + .await; + + // Next, delete datasets that need to be deleted. + // + // TODO We don't do this yet: + // https://github.com/oxidecomputer/omicron/issues/6177 + + // Finally, remove any external disks we're no longer supposed to use + // (either due to config changes or the raw disk being gone). + self.external_disks.stop_managing_if_needed( + ¤t_raw_disks, + &sled_config.disks, + &self.log, + ); + + // --- + // Now go through the add process: start managing disks, create + // datasets, start zones. + // --- + + // Start managing disks. + self.external_disks + .start_managing_if_needed( + ¤t_raw_disks, + &sled_config.disks, + &self.key_requester, + &self.log, + ) + .await; + + // Ensure all the datasets we want exist. + let datasets = match self + .dataset_task + .datasets_ensure(sled_config.datasets.clone()) + .await + { + Ok(result) => result, + Err(err) => { + warn!( + self.log, "failed to contact dataset task"; + InlineErrorChain::new(&err), + ); + // If we can't contact the dataset task, reuse the result from + // our previous attempt. This should still be correct (until we + // start deleting datasets, at which point we'll need a more + // holistic tracker for dataset status like we already have for + // disks and zones). + self.reconciler_result_tx + .borrow() + .latest_result + .as_ref() + .map(|inner| inner.datasets.clone()) + .unwrap_or_else(DatasetEnsureResult::default) + } + }; + + // Collect the current timesync status (needed to start any new zones, + // and also we want to report it as part of each reconciler result). + let timesync_status = self.zones.check_timesync().await; + + // We conservatively refuse to start any new zones if any zones have + // failed to shut down cleanly. This could be more precise, but we want + // to avoid wandering into some really weird cases, such as: + // + // * Multiple NTP zones active concurrently + // * Multiple Crucible zones trying to manage the same zpool + // + // which could happen if we're upgrading a zone but failed to shut down + // the old instance. + match zone_shutdown_result { + Ok(()) => { + let currently_managed_zpools = + self.external_disks.currently_managed_zpools(); + self.zones + .start_zones_if_needed( + &sled_config.zones, + sled_agent_facilities, + timesync_status.is_synchronized(), + ¤tly_managed_zpools, + &self.log, + ) + .await; + } + Err(nfailed) => { + warn!( + self.log, + "skipping attempt to start new zones; \ + {nfailed} zones failed to shut down cleanly" + ); + } + } + + // We'll retry even if there have been no config changes if (a) time + // isn't sync'd yet or (b) any of our disk/dataset/zone attempts failed + // with a retryable error. + let result = if !timesync_status.is_synchronized() + || self.external_disks.has_retryable_error() + || self.zones.has_retryable_error() + || datasets.has_retryable_error() + { + ReconciliationResult::ShouldRetry + } else { + ReconciliationResult::NoRetryNeeded + }; + + let inner = LatestReconcilerTaskResultInner { + sled_config, + datasets, + timesync_status, + }; + self.reconciler_result_tx.send_modify(|r| { + r.status = ReconcilerTaskStatus::Idle { + completed_at_time: Utc::now(), + ran_for: started_at_instant.elapsed(), + }; + r.latest_result = Some(Arc::new(inner)); + }); + + result + } +} + +enum ReconciliationResult { + NoRetryNeeded, + ShouldRetry, } diff --git a/sled-agent/config-reconciler/src/reconciler_task/external_disks.rs b/sled-agent/config-reconciler/src/reconciler_task/external_disks.rs index 5aa425b24f..40a815a0f0 100644 --- a/sled-agent/config-reconciler/src/reconciler_task/external_disks.rs +++ b/sled-agent/config-reconciler/src/reconciler_task/external_disks.rs @@ -189,6 +189,19 @@ impl ExternalDisks { } } + pub(crate) fn has_retryable_error(&self) -> bool { + self.disks.iter().any(|disk| match &disk.state { + DiskState::Managed(_) => false, + DiskState::FailedToManage(err) => err.retryable(), + }) + } + + pub(super) fn currently_managed_zpools( + &self, + ) -> Arc { + Arc::clone(&*self.currently_managed_zpools_tx.borrow()) + } + fn update_output_watch_channels(&self) { let current_disks = self .disks diff --git a/sled-agent/config-reconciler/src/reconciler_task/zones.rs b/sled-agent/config-reconciler/src/reconciler_task/zones.rs index 0be8f3333e..9caf37e5eb 100644 --- a/sled-agent/config-reconciler/src/reconciler_task/zones.rs +++ b/sled-agent/config-reconciler/src/reconciler_task/zones.rs @@ -47,6 +47,17 @@ pub enum TimeSyncStatus { TimeSync(TimeSync), } +impl TimeSyncStatus { + pub(crate) fn is_synchronized(&self) -> bool { + match self { + TimeSyncStatus::NotYetChecked + | TimeSyncStatus::FailedToGetSyncStatus(_) => false, + TimeSyncStatus::ConfiguredToSkip => true, + TimeSyncStatus::TimeSync(time_sync) => time_sync.sync, + } + } +} + #[derive(Debug, thiserror::Error)] pub enum TimeSyncError { #[error("no running NTP zone")] @@ -76,6 +87,16 @@ impl OmicronZones { Self { zones: IdMap::default(), mount_config, timesync_config } } + pub(crate) fn has_retryable_error(&self) -> bool { + self.zones.iter().any(|zone| match &zone.state { + ZoneState::Running(_) => false, + // Assume any error is retryable. This might not be right? We can + // narrow this down in the future. + ZoneState::PartiallyShutDown { .. } + | ZoneState::FailedToStart(_) => true, + }) + } + /// Attempt to shut down any zones that aren't present in `desired_zones`, /// or that weren't present in some prior call but which didn't succeed in /// shutting down and are in a partially-shut-down state. @@ -109,64 +130,55 @@ impl OmicronZones { // Filter desired zones down to just those that we need to stop. See // [`ZoneState`] for more discussion of why we're willing (or unwilling) // to stop zones in various current states. - let mut zones_to_shut_down = self - .zones - .iter() - .filter(|z| { - match desired_zones.get(&z.config.id) { - // We no longer want this zone to be running. - None => true, - - // We do want this zone to be running; check the current - // state. - Some(desired_config) => match &z.state { - // Only shut down a running zone if the desired config - // has changed from the config used to start it. - ZoneState::Running(_) => { - if z.config == *desired_config { - false - } else { - info!( - log, - "starting shutdown of running zone; config \ - has changed"; - "zone" => z.config.zone_name(), - "old-config" => ?z.config, - "new-config" => ?desired_config, - ); - true - } - } - - // Shut down zones in other states, but log why first. - ZoneState::PartiallyShutDown { err, .. } => { + let zones_to_shut_down = self.zones.iter().filter(|z| { + match desired_zones.get(&z.config.id) { + // We no longer want this zone to be running. + None => true, + + // We do want this zone to be running; check the current + // state. + Some(desired_config) => match &z.state { + // Only shut down a running zone if the desired config + // has changed from the config used to start it. + ZoneState::Running(_) => { + if z.config == *desired_config { + false + } else { info!( log, - "resuming shutdown of partially-shut-down zone"; + "starting shutdown of running zone; config \ + has changed"; "zone" => z.config.zone_name(), - "prev_err" => InlineErrorChain::new(err), + "old-config" => ?z.config, + "new-config" => ?desired_config, ); true } + } - ZoneState::FailedToStart(err) => { - info!( - log, - "starting shutdown of a failed-to-start zone"; - "zone" => z.config.zone_name(), - "prev_err" => InlineErrorChain::new(err), - ); - true - } - }, - } - }) - .peekable(); + // Shut down zones in other states, but log why first. + ZoneState::PartiallyShutDown { err, .. } => { + info!( + log, + "resuming shutdown of partially-shut-down zone"; + "zone" => z.config.zone_name(), + "prev_err" => InlineErrorChain::new(err), + ); + true + } - // Save a bit of work: bail out now if we have no zones to stop. - if zones_to_shut_down.peek().is_none() { - return Ok(()); - } + ZoneState::FailedToStart(err) => { + info!( + log, + "starting shutdown of a failed-to-start zone"; + "zone" => z.config.zone_name(), + "prev_err" => InlineErrorChain::new(err), + ); + true + } + }, + } + }); // Map the zones to the futures that will try to shut them down. let shutdown_futures = zones_to_shut_down.map(|zone| { @@ -213,53 +225,45 @@ impl OmicronZones { // Filter desired zones down to just those that we need to start. See // [`ZoneState`] for more discussion of why we're willing (or unwilling) // to start zones in various current states. - let mut zones_to_start = desired_zones - .iter() - .filter(|zone| { - match self.zones.get(&zone.id).map(|z| &z.state) { - // This is entirely new zone - start it! - None => { - info!( - log, "starting new zone"; - "config" => ?zone, - ); - true - } - - // This is a zone we've tried to start before; try again! - Some(ZoneState::FailedToStart(err)) => { - info!( - log, - "retrying start of zone"; - "config" => ?zone, - "prev_err" => InlineErrorChain::new(err), - ); - true - } + let zones_to_start = desired_zones.iter().filter(|zone| { + match self.zones.get(&zone.id).map(|z| &z.state) { + // This is entirely new zone - start it! + None => { + info!( + log, "starting new zone"; + "config" => ?zone, + ); + true + } - // We want this zone to be running now but previously needed - // to stop it and failed to do so: don't try to start it - // again until we succeed in stopping it. - Some(ZoneState::PartiallyShutDown { err, .. }) => { - warn!( - log, - "refusing to start zone (partially shut down)"; - "config" => ?zone, - "shutdown_err" => InlineErrorChain::new(err), - ); - false - } + // This is a zone we've tried to start before; try again! + Some(ZoneState::FailedToStart(err)) => { + info!( + log, + "retrying start of zone"; + "config" => ?zone, + "prev_err" => InlineErrorChain::new(err), + ); + true + } - // The common case: this zone is already running. - Some(ZoneState::Running(_)) => false, + // We want this zone to be running now but previously needed + // to stop it and failed to do so: don't try to start it + // again until we succeed in stopping it. + Some(ZoneState::PartiallyShutDown { err, .. }) => { + warn!( + log, + "refusing to start zone (partially shut down)"; + "config" => ?zone, + "shutdown_err" => InlineErrorChain::new(err), + ); + false } - }) - .peekable(); - // Save a bit of work: bail out now if we have no zones to start. - if zones_to_start.peek().is_none() { - return; - } + // The common case: this zone is already running. + Some(ZoneState::Running(_)) => false, + } + }); // Build up the futures for starting each zone. let all_u2_pools = all_u2_pools.clone().into_vec(); @@ -291,7 +295,7 @@ impl OmicronZones { } /// Check the timesync status from a running NTP zone (if it exists) - pub(super) async fn check_timesync(self) -> TimeSyncStatus { + pub(super) async fn check_timesync(&self) -> TimeSyncStatus { match &self.timesync_config { TimeSyncConfig::Normal => { match self.timesync_status_from_ntp_zone().await { From 6ed5f47b417ef2c10eac246c3d54b62d1a6c1d3c Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Thu, 15 May 2025 17:36:32 -0400 Subject: [PATCH 2/2] remove double negative comment --- .../config-reconciler/src/dataset_serialization_task.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sled-agent/config-reconciler/src/dataset_serialization_task.rs b/sled-agent/config-reconciler/src/dataset_serialization_task.rs index 650e418d36..367b96cd6c 100644 --- a/sled-agent/config-reconciler/src/dataset_serialization_task.rs +++ b/sled-agent/config-reconciler/src/dataset_serialization_task.rs @@ -90,7 +90,12 @@ pub enum DatasetEnsureError { impl DatasetEnsureError { fn is_retryable(&self) -> bool { match self { - // Errors that we don't know for sure _aren't_ retryable. + // These errors might be retryable; there are probably cases where + // they won't be, but we need more context than we have available + // from just the error to know for sure. For now, assume they are + // retryable - that may mean we churn on something doomed, but + // that's better than failing to retry something we should have + // retried. DatasetEnsureError::ZpoolNotFound(_) | DatasetEnsureError::EnsureFailed { .. } => true,