diff --git a/crates/partition-store/src/tests/snapshots_test/mod.rs b/crates/partition-store/src/tests/snapshots_test/mod.rs index d0c40be3c4..ed82d2170e 100644 --- a/crates/partition-store/src/tests/snapshots_test/mod.rs +++ b/crates/partition-store/src/tests/snapshots_test/mod.rs @@ -29,7 +29,7 @@ pub(crate) async fn run_tests(manager: PartitionStoreManager, mut partition_stor partition_id, node_name: "node".to_string(), created_at: humantime::Timestamp::from(SystemTime::from(MillisSinceEpoch::new(0))), - snapshot_id: SnapshotId::from_parts(0, 0), + snapshot_id: SnapshotId::from(0u128), key_range: key_range.clone(), min_applied_lsn: snapshot.min_applied_lsn, db_comparator_name: snapshot.db_comparator_name.clone(), diff --git a/crates/worker/src/partition/snapshots/repository.rs b/crates/worker/src/partition/snapshots/repository.rs index f59a7cc08f..5afe7fccb3 100644 --- a/crates/worker/src/partition/snapshots/repository.rs +++ b/crates/worker/src/partition/snapshots/repository.rs @@ -26,6 +26,15 @@ use restate_partition_store::snapshots::PartitionSnapshotMetadata; use restate_types::config::SnapshotsOptions; /// Provides read and write access to the long-term partition snapshot storage destination. +/// +/// The repository wraps access to an object store "bucket" that contains snapshot metadata and data +/// optimised for efficient retrieval. The bucket layout is split into two top-level prefixes for +/// snapshot metadata and data respectively. While full snapshot archives contain all relevant +/// metadata, this split layout allows for efficient retrieval of only the metadata upfront. It also +/// enables us to evolve the data storage layout independently in the future. +/// +/// - `[/]metadata//--{lsn}.json` +/// - `[/]snapshot//--{lsn}.tar` #[derive(Clone)] pub struct SnapshotRepository { object_store: Arc, @@ -34,23 +43,21 @@ pub struct SnapshotRepository { staging_path: PathBuf, } -// todo(pavel): finalize repository object layout impl SnapshotRepository { pub async fn create( base_dir: PathBuf, snapshots_options: &SnapshotsOptions, ) -> anyhow::Result { - let destination = - if let Some(ref destination) = snapshots_options.destination { - destination.clone() - } else { - base_dir - .join("pp-snapshots") - .into_os_string() - .into_string() - .map(|path| format!("file://{path}")) - .map_err(|e| anyhow!("Unable to convert path to string: {:?}", e))? - }; + let destination = if let Some(ref destination) = snapshots_options.destination { + destination.clone() + } else { + base_dir + .join("pp-snapshots") + .into_os_string() + .into_string() + .map(|path| format!("file://{path}")) + .map_err(|e| anyhow!("Unable to convert path to string: {:?}", e))? + }; let destination = Url::parse(&destination).context("Failed parsing snapshot repository URL")?; @@ -84,12 +91,21 @@ impl SnapshotRepository { .into() }; - let prefix = destination.path().into(); + let staging_path = base_dir.join("snapshot-staging"); + tokio::fs::create_dir_all(&staging_path).await?; + + // prefix must be stripped of any leading slash and, unless zero-length, end in a single "/" character + let prefix: String = destination.path().into(); + let prefix = match prefix.as_str() { + "" | "/" => "".to_string(), + prefix => format!("{}/", prefix.trim_start_matches('/').trim_end_matches('/')), + }; + Ok(SnapshotRepository { object_store, destination, prefix, - staging_path: base_dir.clone().join("snapshot-staging"), + staging_path, }) } @@ -114,53 +130,65 @@ impl SnapshotRepository { // the latest snapshot is always first. let inverted_sort_key = format!("{:016x}", u64::MAX - lsn.as_u64()); - // The snapshot data key format is: [/]/__.tar - let snapshot_key = match self.prefix.as_str() { - "" | "/" => format!( - "{partition_id}/{sk}_{lsn}_{snapshot_id}.tar", - sk = inverted_sort_key, - lsn = metadata.min_applied_lsn, - ), - prefix => format!( - "{trimmed_prefix}/{partition_id}/{sk}_{lsn}_{snapshot_id}.tar", - trimmed_prefix = prefix.trim_start_matches('/').trim_end_matches('/'), - sk = inverted_sort_key, - ), - }; - - let staging_path = self.staging_path.clone(); - tokio::fs::create_dir_all(&staging_path).await?; + let metadata_json_path = local_snapshot_path.join("metadata.json"); + let metadata_key = format!( + "{prefix}metadata/{partition_id}/{sk}_{snapshot_id}-lsn_{lsn}.json", + prefix = self.prefix, + sk = inverted_sort_key, + lsn = metadata.min_applied_lsn, + ); - let tarball_path = staging_path.join(format!("{snapshot_id}.tar")); - let snapshot_tarball = tokio::fs::File::create_new(tarball_path.clone()).await?; + let snapshot_archive_path = self.staging_path.join(format!("{snapshot_id}.tar")); + let snapshot_tarball = tokio::fs::File::create_new(snapshot_archive_path.as_path()).await?; + let snapshot_key = format!( + "{prefix}snapshot/{partition_id}/{sk}_{snapshot_id}-lsn_{lsn}.tar", + prefix = self.prefix, + sk = inverted_sort_key, + lsn = metadata.min_applied_lsn, + ); - let local_files_path = local_snapshot_path.clone(); let mut tar = tokio_tar::Builder::new(snapshot_tarball); debug!( "Creating snapshot tarball from {:?} as {:?}", - local_files_path, + local_snapshot_path.as_path(), tar.get_ref() ); - tar.append_dir_all(".", local_files_path).await?; + tar.append_dir_all(".", local_snapshot_path.as_path()) + .await?; tar.finish().await?; - let key = object_store::path::Path::from(snapshot_key.clone()); + let key = object_store::path::Path::from(snapshot_key.as_str()); let put_result = - put_snapshot_object(tarball_path.as_path(), &key, self.object_store.clone()).await?; - + put_snapshot_object(snapshot_archive_path.as_path(), &key, &self.object_store).await?; debug!( etag = put_result.e_tag.unwrap_or_default(), "Successfully published snapshot to: {snapshot_key}", ); - tokio::fs::remove_dir_all(local_snapshot_path.clone()).await?; + let metadata_json_payload = PutPayload::from(tokio::fs::read(metadata_json_path).await?); + let put_result = self + .object_store + .put( + &object_store::path::Path::from(metadata_key.as_str()), + metadata_json_payload, + ) + .await?; + debug!( + etag = put_result.e_tag.unwrap_or_default(), + "Successfully published snapshot metadata to: {metadata_key}", + ); + + tokio::fs::remove_dir_all(local_snapshot_path.as_path()).await?; trace!( "Removed local snapshot files: {}", local_snapshot_path.display() ); - tokio::fs::remove_file(tarball_path.clone()).await?; - trace!("Removed local snapshot tarball: {}", tarball_path.display()); + tokio::fs::remove_file(snapshot_archive_path.as_path()).await?; + trace!( + "Removed local snapshot tarball: {}", + snapshot_archive_path.display() + ); Ok(()) } @@ -173,7 +201,7 @@ const MULTIPART_UPLOAD_THRESHOLD_BYTES: usize = 5 * 1024 * 1024; async fn put_snapshot_object( snapshot_path: &Path, key: &object_store::path::Path, - object_store: Arc, + object_store: &Arc, ) -> anyhow::Result { let mut snapshot = tokio::fs::File::open(snapshot_path).await?;