diff --git a/crates/worker/src/partition_processor_manager/mod.rs b/crates/worker/src/partition_processor_manager/mod.rs index a019265bc..55a91344e 100644 --- a/crates/worker/src/partition_processor_manager/mod.rs +++ b/crates/worker/src/partition_processor_manager/mod.rs @@ -99,19 +99,13 @@ pub struct PartitionProcessorManager { asynchronous_operations: JoinSet, - pending_snapshots: HashMap, - snapshot_export_tasks: FuturesUnordered>, + pending_snapshots: HashMap>, + snapshot_export_tasks: + FuturesUnordered>>, } type SnapshotResultInternal = Result; -/// Handle to an outstanding [`SnapshotPartitionTask`] that has been spawned, including a reference -/// to notify the requester. -pub struct PendingSnapshotTask { - pub handle: TaskHandle<()>, - pub sender: oneshot::Sender, -} - #[derive(Debug, thiserror::Error)] pub enum Error { #[error(transparent)] @@ -284,8 +278,8 @@ impl PartitionProcessorManager { } _ = &mut shutdown => { self.health_status.update(WorkerStatus::Unknown); - for task in self.pending_snapshots.values() { - task.handle.cancel(); + for task in self.snapshot_export_tasks.iter() { + task.cancel(); } return Ok(()); } @@ -327,7 +321,8 @@ impl PartitionProcessorManager { } } - #[instrument(level = "debug", skip_all, fields(partition_id = %event.partition_id, event = %<&'static str as From<&EventKind>>::from(&event.inner)))] + #[instrument(level = "debug", skip_all, fields(partition_id = %event.partition_id, event = %<&'static str as From<&EventKind>>::from(&event.inner) + ))] fn on_asynchronous_event(&mut self, event: AsynchronousEvent) { let AsynchronousEvent { partition_id, @@ -619,7 +614,7 @@ impl PartitionProcessorManager { processor_state.stop(); } if self.pending_snapshots.contains_key(&partition_id) { - warn!(%partition_id, "Partition processor requested to stop while a snapshot task is still outstanding."); + info!(%partition_id, "Partition processor stop requested with snapshot task result outstanding."); } } ProcessorCommand::Follower | ProcessorCommand::Leader => { @@ -703,7 +698,7 @@ impl PartitionProcessorManager { return; } - self.spawn_create_snapshot_task(partition_id, sender); + self.spawn_create_snapshot_task(partition_id, Some(sender)); } fn on_create_snapshot_task_completed(&mut self, result: SnapshotResultInternal) { @@ -723,10 +718,8 @@ impl PartitionProcessorManager { Err(snapshot_error) => (snapshot_error.partition_id(), Err(snapshot_error)), }; - if let Some(pending) = self.pending_snapshots.remove(&partition_id) { - let _ = pending.sender.send(response); - } else { - warn!("Snapshot task result received, but there was no pending sender found!") + if let Some(sender) = self.pending_snapshots.remove(&partition_id) { + let _ = sender.send(response); } } @@ -741,61 +734,54 @@ impl PartitionProcessorManager { return; }; - for (partition_id, state) in self.processor_states.iter() { - let status = state.partition_processor_status(); - match status { - Some(status) - if status.effective_mode == RunMode::Leader - && status.replay_status == ReplayStatus::Active - && status.last_applied_log_lsn.unwrap_or(Lsn::INVALID) - >= status - .last_archived_log_lsn - .unwrap_or(Lsn::OLDEST) - .add(Lsn::from(records_per_snapshot.get())) => - { - debug!( - %partition_id, - last_archived_lsn = %status.last_archived_log_lsn.unwrap_or(SequenceNumber::OLDEST), - last_applied_lsn = %status.last_applied_log_lsn.unwrap_or(SequenceNumber::INVALID), - "Creating partition snapshot", - ); - let (tx, _) = oneshot::channel(); - - // ignore errors and don't request further snapshots if internal queue is full; we will try again later - if self - .tx - .try_send(ProcessorsManagerCommand::CreateSnapshot(*partition_id, tx)) - .is_err() - { - break; - } - } - _ => { - continue; - } - } + let snapshot_partitions: Vec<_> = self + .processor_states + .iter() + .filter_map(|(partition_id, state)| { + state + .partition_processor_status() + .map(|status| (*partition_id, status)) + }) + .filter(|(_, status)| { + status.effective_mode == RunMode::Leader + && status.replay_status == ReplayStatus::Active + && status.last_applied_log_lsn.unwrap_or(Lsn::INVALID) + >= status + .last_archived_log_lsn + .unwrap_or(Lsn::OLDEST) + .add(Lsn::from(records_per_snapshot.get())) + }) + .collect(); + + for (partition_id, status) in snapshot_partitions { + debug!( + %partition_id, + last_archived_lsn = %status.last_archived_log_lsn.unwrap_or(SequenceNumber::OLDEST), + last_applied_lsn = %status.last_applied_log_lsn.unwrap_or(SequenceNumber::INVALID), + "Requesting partition snapshot", + ); + self.spawn_create_snapshot_task(partition_id, None); } } + /// Spawn a task to create a snapshot of the given partition. Optionally, a sender will be + /// notified of the result on completion. fn spawn_create_snapshot_task( &mut self, partition_id: PartitionId, - sender: oneshot::Sender, + sender: Option>, ) { if let Entry::Vacant(entry) = self.pending_snapshots.entry(partition_id) { let config = self.updateable_config.live_load(); let snapshot_base_path = config.worker.snapshots.snapshots_dir(partition_id); - let snapshot_id = SnapshotId::new(); - let (snapshot_metadata_tx, snapshot_metadata_rx) = oneshot::channel(); let create_snapshot_task = SnapshotPartitionTask { snapshot_id, partition_id, - partition_store_manager: self.partition_store_manager.clone(), snapshot_base_path, - result_sender: snapshot_metadata_tx, + partition_store_manager: self.partition_store_manager.clone(), cluster_name: config.common.cluster_name().into(), node_name: config.common.node_name().into(), }; @@ -809,19 +795,21 @@ impl PartitionProcessorManager { match spawn_task_result { Ok(handle) => { - self.snapshot_export_tasks.push(snapshot_metadata_rx); - entry.insert(PendingSnapshotTask { handle, sender }); + self.snapshot_export_tasks.push(handle); + if let Some(sender) = sender { + entry.insert(sender); + } } - Err(_) => { - sender - .send(Err(SnapshotError::InvalidState(partition_id))) - .ok(); + Err(_shutdown) => { + if let Some(sender) = sender { + let _ = sender.send(Err(SnapshotError::InvalidState(partition_id))); + } } } + } else if let Some(sender) = sender { + let _ = sender.send(Err(SnapshotError::SnapshotInProgress(partition_id))); } else { - sender - .send(Err(SnapshotError::SnapshotInProgress(partition_id))) - .ok(); + warn!(%partition_id, "Snapshot task not started: another snapshot is already in progress") } } diff --git a/crates/worker/src/partition_processor_manager/processor_state.rs b/crates/worker/src/partition_processor_manager/processor_state.rs index 55c0878ef..764abb77c 100644 --- a/crates/worker/src/partition_processor_manager/processor_state.rs +++ b/crates/worker/src/partition_processor_manager/processor_state.rs @@ -353,8 +353,8 @@ impl ProcessorState { } /// The Partition Processor is in a state in which it is acceptable to create and publish - /// snapshots. Since we don't want newer snapshots to move backwards in applied LSN, the current - /// implementation checks whether the processor is fully caught up with the log. + /// snapshots. Since we generally don't want newer snapshots to move backwards in applied LSN, + /// the current implementation checks whether the processor is fully caught up with the log. pub fn should_publish_snapshots(&self) -> bool { match self { ProcessorState::Started { diff --git a/crates/worker/src/partition_processor_manager/snapshot_task.rs b/crates/worker/src/partition_processor_manager/snapshot_task.rs index 9ff7d2250..3e8593ce6 100644 --- a/crates/worker/src/partition_processor_manager/snapshot_task.rs +++ b/crates/worker/src/partition_processor_manager/snapshot_task.rs @@ -11,7 +11,6 @@ use std::path::PathBuf; use std::time::SystemTime; -use tokio::sync::oneshot; use tracing::{debug, instrument, warn}; use restate_core::worker_api::SnapshotError; @@ -27,14 +26,13 @@ pub struct SnapshotPartitionTask { pub partition_id: PartitionId, pub snapshot_base_path: PathBuf, pub partition_store_manager: PartitionStoreManager, - pub result_sender: oneshot::Sender>, pub cluster_name: String, pub node_name: String, } impl SnapshotPartitionTask { #[instrument(level = "info", skip_all, fields(snapshot_id = %self.snapshot_id, partition_id = %self.partition_id))] - pub async fn run(self) { + pub async fn run(self) -> Result { debug!("Creating partition snapshot"); let result = create_snapshot_inner( @@ -47,19 +45,16 @@ impl SnapshotPartitionTask { ) .await; - let _ = self.result_sender.send(match result { - Ok(metadata) => { + result + .inspect(|metadata| { debug!( archived_lsn = %metadata.min_applied_lsn, "Partition snapshot created" ); - Ok(metadata) - } - Err(err) => { + }) + .inspect_err(|err| { warn!("Failed to create partition snapshot: {}", err); - Err(err) - } - }); + }) } }