From 45795e761c6e66f28476017d62d13ceb93154cd1 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Fri, 2 May 2025 16:23:20 +0200 Subject: [PATCH 1/4] snapshot: Provide streaming snapshot verification. Repurposes the remote `SnapshotFetcher` for verification of the snapshot's objects. This can be useful when memory usage is of concern. --- crates/snapshot/src/lib.rs | 58 +++++++++ crates/snapshot/src/remote.rs | 213 +++++++++++++++++++++++++------- crates/snapshot/tests/remote.rs | 8 +- 3 files changed, 230 insertions(+), 49 deletions(-) diff --git a/crates/snapshot/src/lib.rs b/crates/snapshot/src/lib.rs index cc549f6872e..9b384262ae3 100644 --- a/crates/snapshot/src/lib.rs +++ b/crates/snapshot/src/lib.rs @@ -48,8 +48,10 @@ use std::{ ops::{Add, AddAssign}, path::PathBuf, }; +use tokio::task::spawn_blocking; pub mod remote; +use remote::verify_snapshot; #[derive(Debug, Copy, Clone)] /// An object which may be associated with an error during snapshotting. @@ -543,6 +545,7 @@ impl fmt::Debug for SnapshotSize { } /// A repository of snapshots of a particular database instance. +#[derive(Clone)] pub struct SnapshotRepository { /// The directory which contains all the snapshots. root: SnapshotsPath, @@ -752,6 +755,7 @@ impl SnapshotRepository { }); } + let snapshot_dir = self.snapshot_dir_path(tx_offset); let object_repo = Self::object_repo(&snapshot_dir)?; let blob_store = snapshot.reconstruct_blob_store(&object_repo)?; @@ -769,6 +773,60 @@ impl SnapshotRepository { }) } + /// Read the [`Snapshot`] metadata at `tx_offset` and verify the integrity + /// of all objects it refers to. + /// + /// Fails if: + /// + /// - No snapshot exists in `self` for `tx_offset` + /// - The snapshot is incomplete, as detected by its lockfile still existing. + /// - The snapshot file's magic number does not match [`MAGIC`]. + /// - Any object file (page or large blob) referenced by the snapshot file + /// is missing or corrupted. + /// + /// The following conditions are not detected or considered as errors: + /// + /// - The snapshot file's version does not match [`CURRENT_SNAPSHOT_VERSION`]. + /// - The snapshot file's database identity or instance ID do not match + /// those in `self`. + /// - The snapshot file's module ABI version does not match + /// [`CURRENT_MODULE_ABI_VERSION`]. + /// - The snapshot file's recorded transaction offset does not match + /// `tx_offset`. + /// + /// Callers may want to inspect the returned [`Snapshot`] and ensure its + /// contents match their expectations. + pub async fn verify_snapshot(&self, tx_offset: TxOffset) -> Result { + let snapshot_dir = self.snapshot_dir_path(tx_offset); + let snapshot = spawn_blocking({ + let snapshot_dir = snapshot_dir.clone(); + move || { + let lockfile = Lockfile::lock_path(&snapshot_dir); + if lockfile.try_exists()? { + return Err(SnapshotError::Incomplete { tx_offset, lockfile }); + } + + let snapshot_file_path = snapshot_dir.snapshot_file(tx_offset); + let (snapshot, _compress_type) = Snapshot::read_from_file(&snapshot_file_path)?; + + if snapshot.magic != MAGIC { + return Err(SnapshotError::BadMagic { + tx_offset, + magic: snapshot.magic, + }); + } + Ok(snapshot) + } + }) + .await + .unwrap()?; + let object_repo = Self::object_repo(&snapshot_dir)?; + verify_snapshot(object_repo, self.root.clone(), snapshot.clone()) + .await + .map(drop)?; + Ok(snapshot) + } + /// Open a repository at `root`, failing if the `root` doesn't exist or isn't a directory. /// /// Calls [`Path::is_dir`] and requires that the result is `true`. diff --git a/crates/snapshot/src/remote.rs b/crates/snapshot/src/remote.rs index 8fcef3dd0e9..d5168e1af40 100644 --- a/crates/snapshot/src/remote.rs +++ b/crates/snapshot/src/remote.rs @@ -126,11 +126,33 @@ pub async fn synchronize_snapshot( provider: impl BlobProvider + 'static, snapshots_dir: SnapshotsPath, snapshot: Snapshot, +) -> Result { + run_fetcher(provider, snapshots_dir, snapshot, false).await +} + +/// Verifies the integrity of the objects referenced from [`Snapshot`], +/// in constant memory. +/// +/// Like [`synchronize_snapshot`], but doesn't modify the local storage. +/// Usually, a local [`BlobProvider`] like [`DirTrie`] should be provided. +pub async fn verify_snapshot( + provider: impl BlobProvider + 'static, + snapshots_dir: SnapshotsPath, + snapshot: Snapshot, +) -> Result<()> { + run_fetcher(provider, snapshots_dir, snapshot, true).await.map(drop) +} + +async fn run_fetcher( + provider: impl BlobProvider + 'static, + snapshots_dir: SnapshotsPath, + snapshot: Snapshot, + dry_run: bool, ) -> Result { spawn_blocking(|| SnapshotFetcher::create(provider, snapshots_dir, snapshot)) .await .unwrap()? - .run() + .run(dry_run) .await } @@ -165,6 +187,7 @@ struct SnapshotFetcher

{ object_repo: Arc, parent_repo: Option>, provider: P, + dry_run: bool, stats: StatsInner, @@ -202,12 +225,15 @@ impl SnapshotFetcher

{ object_repo: Arc::new(object_repo), parent_repo: parent_repo.map(Arc::new), provider, + dry_run: false, stats: <_>::default(), lock, }) } - async fn run(self) -> Result { + async fn run(mut self, dry_run: bool) -> Result { + self.dry_run = dry_run; + let snapshot_bsatn = serialize_bsatn(ObjectType::Snapshot, &self.snapshot)?; let snapshot_hash = blake3::hash(&snapshot_bsatn); let snapshot_file_path = self.dir.snapshot_file(self.snapshot.tx_offset); @@ -237,14 +263,13 @@ impl SnapshotFetcher

{ tokio::try_join!(self.fetch_blobs(), self.fetch_pages())?; // Success. Write out the snapshot file. - atomically(snapshot_file_path.0, |out| async { + atomically((!self.dry_run).then_some(snapshot_file_path.0), |out| async { let mut out = BufWriter::new(out); out.write_all(snapshot_hash.as_bytes()).await?; out.write_all(&snapshot_bsatn).await?; out.flush().await?; - out.into_inner().sync_all().await?; - Ok(()) + Ok(out.into_inner()) }) .await?; @@ -281,10 +306,13 @@ impl SnapshotFetcher

{ } async fn fetch_blob(&self, hash: blake3::Hash) -> Result<()> { - let Some(dst_path) = self.object_file_path(hash).await? else { + let Some(dst_path) = self + .object_file_path(ObjectType::Blob(BlobHash { data: *hash.as_bytes() })) + .await? + else { return Ok(()); }; - atomically(dst_path, |out| async move { + atomically((!self.dry_run).then_some(dst_path), |out| async move { let mut out = BufWriter::new(out); let mut src = self.provider.blob_reader(hash).await?; let compressed = src.fill_buf().await?.starts_with(&ZSTD_MAGIC_BYTES); @@ -295,12 +323,11 @@ impl SnapshotFetcher

{ let mut hasher = blake3::Hasher::new(); let computed_hash = if !compressed { // If the input is uncompressed, just update the hasher as we go. - let mut out = InspectWriter::new(out, |chunk| { + let mut writer = InspectWriter::new(&mut out, |chunk| { hasher.update(chunk); }); - tokio::io::copy_buf(&mut src, &mut out).await?; - out.flush().await?; - out.into_inner().into_inner().sync_all().await?; + tokio::io::copy_buf(&mut src, &mut writer).await?; + writer.flush().await?; hasher.finalize() } else { @@ -321,7 +348,6 @@ impl SnapshotFetcher

{ }); tokio::io::copy(&mut src, &mut out).await?; out.flush().await?; - out.into_inner().sync_all().await?; drop(tx); decompressor.await.unwrap()? @@ -335,7 +361,7 @@ impl SnapshotFetcher

{ }); } - Ok(()) + Ok(out.into_inner()) }) .await .inspect(|()| { @@ -344,10 +370,11 @@ impl SnapshotFetcher

{ } async fn fetch_page(&self, hash: blake3::Hash) -> Result<()> { - let Some(dst_path) = self.object_file_path(hash).await? else { + let Some(dst_path) = self.object_file_path(ObjectType::Page(hash)).await? else { return Ok(()); }; - atomically(dst_path, |out| async { + atomically((!self.dry_run).then_some(dst_path), |out| async { + let mut out = BufWriter::new(out); let mut src = self.provider.blob_reader(hash).await?; let compressed = src.fill_buf().await?.starts_with(&ZSTD_MAGIC_BYTES); @@ -358,12 +385,11 @@ impl SnapshotFetcher

{ let page_buf = if !compressed { // If the input is uncompressed, just copy all bytes to a buffer. let mut page_buf = Vec::with_capacity(u16::MAX as usize + 1); - let mut out = InspectWriter::new(BufWriter::new(out), |chunk| { + let mut writer = InspectWriter::new(&mut out, |chunk| { page_buf.extend_from_slice(chunk); }); - tokio::io::copy_buf(&mut src, &mut out).await?; - out.flush().await?; - out.into_inner().into_inner().sync_all().await?; + tokio::io::copy_buf(&mut src, &mut writer).await?; + writer.flush().await?; page_buf } else { @@ -377,19 +403,20 @@ impl SnapshotFetcher

{ Ok::<_, io::Error>(page_buf) }); - let mut out = InspectWriter::new(BufWriter::new(out), |chunk| { + let mut writer = InspectWriter::new(&mut out, |chunk| { let bytes = Bytes::copy_from_slice(chunk); tx.send(Ok(bytes)).ok(); }); - tokio::io::copy_buf(&mut src, &mut out).await?; - out.flush().await?; - out.into_inner().into_inner().sync_all().await?; + tokio::io::copy_buf(&mut src, &mut writer).await?; + writer.flush().await?; drop(tx); decompressor.await.unwrap()? }; - self.verify_page(hash, &page_buf) + self.verify_page(hash, &page_buf)?; + + Ok(out.into_inner()) }) .await .inspect(|()| { @@ -404,18 +431,44 @@ impl SnapshotFetcher

{ /// /// In the latter case, a hardlink will be created. /// `self.stats` is updated in either case. - async fn object_file_path(&self, hash: blake3::Hash) -> Result> { + /// + /// In dry-run mode, `Some(path)` is returned + /// if the file exists in either the target or the parent repo, + /// in order to force hash verification. + /// If it does not exist, an error is returned. + async fn object_file_path(&self, ty: ObjectType) -> Result> { + let hash = match ty { + ObjectType::Blob(hash) => blake3::Hash::from_bytes(hash.data), + ObjectType::Page(hash) => hash, + ObjectType::Snapshot => unreachable!("invalid argument"), + }; let path = self.object_repo.file_path(hash.as_bytes()); if fs::try_exists(&path).await? { + if self.dry_run { + return Ok(Some(path)); + } + self.stats.skipped_object(); return Ok(None); } if self.try_hardlink(hash).await? { + if self.dry_run { + return Ok(Some(path)); + } + self.stats.hardlinked_object(); return Ok(None); } + if self.dry_run { + return Err(SnapshotError::ReadObject { + ty, + source_repo: self.object_repo.root().to_owned(), + cause: io::Error::new(io::ErrorKind::NotFound, format!("missing object {}", path.display())), + }); + } + Ok(Some(path)) } @@ -426,10 +479,16 @@ impl SnapshotFetcher

{ let object_repo = Arc::clone(&self.object_repo); let parent_repo = Arc::clone(parent); - spawn_blocking(move || object_repo.try_hardlink_from(&parent_repo, hash.as_bytes())) - .await - .unwrap() - .map_err(Into::into) + if !self.dry_run { + spawn_blocking(move || object_repo.try_hardlink_from(&parent_repo, hash.as_bytes())) + .await + .unwrap() + .map_err(Into::into) + } else { + let src_file = parent_repo.file_path(hash.as_bytes()); + let meta = tokio::fs::metadata(src_file).await?; + Ok(meta.is_file()) + } } fn verify_page(&self, expected_hash: blake3::Hash, buf: &[u8]) -> Result<()> { @@ -496,27 +555,85 @@ impl AsyncWrite for AsyncHasher { } } -async fn atomically(file_path: PathBuf, f: F) -> Result<()> +/// The [`AsyncWrite`] created by [`atomically`]. +/// +/// Either a temporary file that is being renamed atomically if and when the +/// closure returns successfully, +/// or a [`tokio::io::Sink`] that discards all data written to it (used for +/// [`verify_snapshot`]). +enum AtomicWriter { + File(fs::File), + Null(tokio::io::Sink), +} + +impl AtomicWriter { + async fn sync_all(&self) -> io::Result<()> { + if let Self::File(file) = self { + file.sync_all().await?; + } + + Ok(()) + } +} + +impl AsyncWrite for AtomicWriter { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + match self.get_mut() { + Self::File(file) => Pin::new(file).poll_write(cx, buf), + Self::Null(sink) => Pin::new(sink).poll_write(cx, buf), + } + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.get_mut() { + Self::File(file) => Pin::new(file).poll_flush(cx), + Self::Null(sink) => Pin::new(sink).poll_flush(cx), + } + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.get_mut() { + Self::File(file) => Pin::new(file).poll_shutdown(cx), + Self::Null(sink) => Pin::new(sink).poll_shutdown(cx), + } + } +} + +async fn atomically(file_path: Option, f: F) -> Result<()> where - F: FnOnce(fs::File) -> Fut, - Fut: Future>, + F: FnOnce(AtomicWriter) -> Fut, + Fut: Future>, { - let dir = file_path.parent().expect("file not in a directory").to_owned(); - fs::create_dir_all(&dir).await?; - let (tmp_file, tmp_out) = spawn_blocking(move || { - let tmp = NamedTempFile::new_in(dir)?; - let out = tmp.reopen()?; - Ok::<_, io::Error>((tmp, out)) - }) - .await - .unwrap()?; - - f(fs::File::from_std(tmp_out)).await?; - - spawn_blocking(|| tmp_file.persist(file_path)) - .await - .unwrap() - .map_err(|e| e.error)?; + match file_path { + Some(file_path) => { + let dir = file_path.parent().expect("file not in a directory").to_owned(); + fs::create_dir_all(&dir).await?; + let (tmp_file, tmp_out) = spawn_blocking(move || { + let tmp = NamedTempFile::new_in(dir)?; + let out = tmp.reopen()?; + Ok::<_, io::Error>((tmp, out)) + }) + .await + .unwrap()?; + + let mut file = AtomicWriter::File(fs::File::from_std(tmp_out)); + file = f(file).await?; + file.sync_all().await?; + + spawn_blocking(|| tmp_file.persist(file_path)) + .await + .unwrap() + .map_err(|e| e.error)?; + } + + None => { + f(AtomicWriter::Null(tokio::io::sink())).await?; + } + } Ok(()) } diff --git a/crates/snapshot/tests/remote.rs b/crates/snapshot/tests/remote.rs index 4cee594102b..38b9f5aefdd 100644 --- a/crates/snapshot/tests/remote.rs +++ b/crates/snapshot/tests/remote.rs @@ -27,7 +27,10 @@ use spacetimedb_schema::{ def::ModuleDef, schema::{Schema as _, TableSchema}, }; -use spacetimedb_snapshot::{remote::synchronize_snapshot, Snapshot, SnapshotError, SnapshotRepository}; +use spacetimedb_snapshot::{ + remote::{synchronize_snapshot, verify_snapshot}, + Snapshot, SnapshotError, SnapshotRepository, +}; use spacetimedb_table::page_pool::PagePool; use tempfile::tempdir; use tokio::task::spawn_blocking; @@ -63,6 +66,9 @@ async fn can_sync_a_snapshot() -> anyhow::Result<()> { let dst_snapshot_full = dst_repo.read_snapshot(snapshot_offset, &pool)?; Locking::restore_from_snapshot(dst_snapshot_full, pool)?; + // Check that `verify_snapshot` agrees. + verify_snapshot(blob_provider.clone(), dst_path.clone(), src_snapshot.clone()).await?; + // Let's also check that running `synchronize_snapshot` again does nothing. let stats = synchronize_snapshot(blob_provider.clone(), dst_path.clone(), src_snapshot.clone()).await?; assert_eq!(stats.objects_skipped, total_objects); From d4b5e70d08de0de0af6e9ae88cf81afd62d20af0 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Mon, 5 May 2025 10:32:41 +0200 Subject: [PATCH 2/4] Avoid creating the test snapshot repeatedly --- crates/snapshot/tests/remote.rs | 105 ++++++++++++++++++++++++-------- 1 file changed, 78 insertions(+), 27 deletions(-) diff --git a/crates/snapshot/tests/remote.rs b/crates/snapshot/tests/remote.rs index 38b9f5aefdd..3c64a4874cd 100644 --- a/crates/snapshot/tests/remote.rs +++ b/crates/snapshot/tests/remote.rs @@ -1,6 +1,7 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Instant}; use env_logger::Env; +use log::info; use pretty_assertions::assert_matches; use spacetimedb::{ db::{ @@ -15,6 +16,7 @@ use spacetimedb::{ Identity, }; use spacetimedb_durability::{EmptyHistory, TxOffset}; +use spacetimedb_fs_utils::dir_trie::DirTrie; use spacetimedb_lib::{ bsatn, db::raw_def::v9::{RawModuleDefV9Builder, RawTableDefBuilder}, @@ -32,30 +34,25 @@ use spacetimedb_snapshot::{ Snapshot, SnapshotError, SnapshotRepository, }; use spacetimedb_table::page_pool::PagePool; -use tempfile::tempdir; -use tokio::task::spawn_blocking; +use tempfile::{tempdir, TempDir}; +use tokio::{sync::OnceCell, task::spawn_blocking}; // TODO: Happy path for compressed snapshot, pending #2034 #[tokio::test] async fn can_sync_a_snapshot() -> anyhow::Result<()> { enable_logging(); let tmp = tempdir()?; + let src = SourceSnapshot::get_or_create().await?; - let src_path = SnapshotsPath::from_path_unchecked(tmp.path().join("src")); - let dst_path = SnapshotsPath::from_path_unchecked(tmp.path().join("dst")); - - src_path.create()?; + let dst_path = SnapshotsPath::from_path_unchecked(tmp.path()); dst_path.create()?; - let src_repo = SnapshotRepository::open(src_path, Identity::ZERO, 0).map(Arc::new)?; let dst_repo = SnapshotRepository::open(dst_path.clone(), Identity::ZERO, 0).map(Arc::new)?; - let snapshot_offset = create_snapshot(src_repo.clone()).await?; - let src_snapshot_path = src_repo.snapshot_dir_path(snapshot_offset); - let (mut src_snapshot, _) = Snapshot::read_from_file(&src_snapshot_path.snapshot_file(snapshot_offset))?; + let mut src_snapshot = src.meta.clone(); let total_objects = src_snapshot.total_objects() as u64; - let blob_provider = SnapshotRepository::object_repo(&src_snapshot_path).map(Arc::new)?; + let blob_provider = src.objects.clone(); // This is the first snapshot in `dst_repo`, so all objects should be written. let stats = synchronize_snapshot(blob_provider.clone(), dst_path.clone(), src_snapshot.clone()).await?; @@ -63,7 +60,7 @@ async fn can_sync_a_snapshot() -> anyhow::Result<()> { // Assert that the copied snapshot is valid. let pool = PagePool::default(); - let dst_snapshot_full = dst_repo.read_snapshot(snapshot_offset, &pool)?; + let dst_snapshot_full = dst_repo.read_snapshot(src.offset, &pool)?; Locking::restore_from_snapshot(dst_snapshot_full, pool)?; // Check that `verify_snapshot` agrees. @@ -88,30 +85,26 @@ async fn can_sync_a_snapshot() -> anyhow::Result<()> { #[tokio::test] async fn rejects_overwrite() -> anyhow::Result<()> { + enable_logging(); let tmp = tempdir()?; + let src = SourceSnapshot::get_or_create().await?; - let src_path = SnapshotsPath::from_path_unchecked(tmp.path().join("src")); - let dst_path = SnapshotsPath::from_path_unchecked(tmp.path().join("dst")); - - src_path.create()?; + let dst_path = SnapshotsPath::from_path_unchecked(tmp.path()); dst_path.create()?; - let src_repo = SnapshotRepository::open(src_path, Identity::ZERO, 0).map(Arc::new)?; - - let snapshot_offset = create_snapshot(src_repo.clone()).await?; - let src_snapshot_path = src_repo.snapshot_dir_path(snapshot_offset); - let (src_snapshot, _) = Snapshot::read_from_file(&src_snapshot_path.snapshot_file(snapshot_offset))?; - - let blob_provider = SnapshotRepository::object_repo(&src_snapshot_path).map(Arc::new)?; + let src_snapshot = src.meta.clone(); + let blob_provider = src.objects.clone(); synchronize_snapshot(blob_provider.clone(), dst_path.clone(), src_snapshot.clone()).await?; // Try to overwrite with the previous snapshot. - let prev_offset = src_repo.latest_snapshot_older_than(snapshot_offset - 1)?.unwrap(); - let src_snapshot_path = src_repo.snapshot_dir_path(prev_offset); + // A previous snapshot exists because one is created immediately after + // database initialization. + let prev_offset = src.repo.latest_snapshot_older_than(src.offset - 1)?.unwrap(); + let src_snapshot_path = src.repo.snapshot_dir_path(prev_offset); let (mut src_snapshot, _) = Snapshot::read_from_file(&src_snapshot_path.snapshot_file(prev_offset))?; // Pretend it's the current snapshot, thereby altering the preimage. - src_snapshot.tx_offset = snapshot_offset; + src_snapshot.tx_offset = src.offset; let res = synchronize_snapshot(blob_provider, dst_path, src_snapshot).await; assert_matches!(res, Err(SnapshotError::HashMismatch { .. })); @@ -119,7 +112,61 @@ async fn rejects_overwrite() -> anyhow::Result<()> { Ok(()) } +/// Creating a snapshot takes a long time, because we need to commit +/// `SNAPSHOT_FREQUENCY` transactions to trigger one. +/// +/// Until the snapshot frequency becomes configurable, +/// avoid creating the source snapshot repeatedly. +struct SourceSnapshot { + offset: TxOffset, + meta: Snapshot, + objects: Arc, + repo: Arc, + + #[allow(unused)] + tmp: TempDir, +} + +impl SourceSnapshot { + async fn get_or_create() -> anyhow::Result<&'static Self> { + static SOURCE_SNAPSHOT: OnceCell = OnceCell::const_new(); + SOURCE_SNAPSHOT.get_or_try_init(Self::try_init).await + } + + async fn try_init() -> anyhow::Result { + let tmp = tempdir()?; + + let repo_path = SnapshotsPath::from_path_unchecked(tmp.path()); + let repo = spawn_blocking(move || { + repo_path.create()?; + SnapshotRepository::open(repo_path, Identity::ZERO, 0).map(Arc::new) + }) + .await + .unwrap()?; + let offset = create_snapshot(repo.clone()).await?; + + let dir_path = repo.snapshot_dir_path(offset); + let (meta, objects) = spawn_blocking(move || { + let meta = Snapshot::read_from_file(&dir_path.snapshot_file(offset)).map(|(file, _)| file)?; + let objects = SnapshotRepository::object_repo(&dir_path).map(Arc::new)?; + + Ok::<_, SnapshotError>((meta, objects)) + }) + .await + .unwrap()?; + + Ok(SourceSnapshot { + offset, + meta, + objects, + repo, + tmp, + }) + } +} + async fn create_snapshot(repo: Arc) -> anyhow::Result { + let start = Instant::now(); let mut watch = spawn_blocking(|| { let tmp = TempReplicaDir::new()?; let db = TestDB::open_db(&tmp, EmptyHistory::new(), None, Some(repo), 0)?; @@ -148,6 +195,10 @@ async fn create_snapshot(repo: Arc) -> anyhow::Result= SNAPSHOT_FREQUENCY); + info!( + "snapshot creation took {}s", + Instant::now().duration_since(start).as_secs_f32() + ); Ok(snapshot_offset) } From e7533bfadf8840b5280ee3657aefb5531a70b89f Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Mon, 5 May 2025 11:36:05 +0200 Subject: [PATCH 3/4] Test that verification does verify things. --- Cargo.lock | 1 + crates/snapshot/Cargo.toml | 1 + crates/snapshot/tests/remote.rs | 70 +++++++++++++++++++++++++++++++-- 3 files changed, 68 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a1f399d2916..5dce66c115d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5839,6 +5839,7 @@ dependencies = [ "hex", "log", "pretty_assertions", + "rand 0.9.0", "spacetimedb-core", "spacetimedb-durability", "spacetimedb-fs-utils", diff --git a/crates/snapshot/Cargo.toml b/crates/snapshot/Cargo.toml index 4d3cccff2f3..3a141816eec 100644 --- a/crates/snapshot/Cargo.toml +++ b/crates/snapshot/Cargo.toml @@ -34,3 +34,4 @@ spacetimedb-schema = { path = "../schema" } anyhow.workspace = true env_logger.workspace = true pretty_assertions = { workspace = true, features = ["unstable"] } +rand.workspace = true diff --git a/crates/snapshot/tests/remote.rs b/crates/snapshot/tests/remote.rs index 3c64a4874cd..f8818524680 100644 --- a/crates/snapshot/tests/remote.rs +++ b/crates/snapshot/tests/remote.rs @@ -1,8 +1,9 @@ -use std::{sync::Arc, time::Instant}; +use std::{io, sync::Arc, time::Instant}; use env_logger::Env; use log::info; use pretty_assertions::assert_matches; +use rand::seq::IndexedRandom as _; use spacetimedb::{ db::{ datastore::locking_tx_datastore::datastore::Locking, @@ -63,9 +64,6 @@ async fn can_sync_a_snapshot() -> anyhow::Result<()> { let dst_snapshot_full = dst_repo.read_snapshot(src.offset, &pool)?; Locking::restore_from_snapshot(dst_snapshot_full, pool)?; - // Check that `verify_snapshot` agrees. - verify_snapshot(blob_provider.clone(), dst_path.clone(), src_snapshot.clone()).await?; - // Let's also check that running `synchronize_snapshot` again does nothing. let stats = synchronize_snapshot(blob_provider.clone(), dst_path.clone(), src_snapshot.clone()).await?; assert_eq!(stats.objects_skipped, total_objects); @@ -112,6 +110,70 @@ async fn rejects_overwrite() -> anyhow::Result<()> { Ok(()) } +#[tokio::test] +async fn verifies_objects() -> anyhow::Result<()> { + enable_logging(); + let tmp = tempdir()?; + let src = SourceSnapshot::get_or_create().await?; + + let dst_path = SnapshotsPath::from_path_unchecked(tmp.path()); + dst_path.create()?; + + let src_snapshot = src.meta.clone(); + + synchronize_snapshot(src.objects.clone(), dst_path.clone(), src_snapshot.clone()).await?; + + // Read objects for verification from the destination repo. + let blob_provider = spawn_blocking({ + let dst_path = dst_path.clone(); + let snapshot_offset = src_snapshot.tx_offset; + move || { + let repo = SnapshotRepository::open(dst_path, Identity::ZERO, 0)?; + let objects = SnapshotRepository::object_repo(&repo.snapshot_dir_path(snapshot_offset))?; + anyhow::Ok(Arc::new(objects)) + } + }) + .await + .unwrap()?; + // Initially, all should be good. + verify_snapshot(blob_provider.clone(), dst_path.clone(), src_snapshot.clone()).await?; + + // Pick a random object to mess with. + let random_object_path = { + let all_objects = src_snapshot.objects().collect::>(); + let random_object = all_objects.choose(&mut rand::rng()).copied().unwrap(); + blob_provider.file_path(random_object.as_bytes()) + }; + + // Truncate the object file and assert that verification fails. + tokio::fs::File::options() + .write(true) + .open(&random_object_path) + .await? + .set_len(1) + .await?; + info!("truncated object file {}", random_object_path.display()); + let err = verify_snapshot(blob_provider.clone(), dst_path.clone(), src_snapshot.clone()) + .await + .unwrap_err(); + assert_matches!( + err, + // If the object is a page, we'll get `Deserialize`, + // otherwise `HashMismatch`. + SnapshotError::HashMismatch { .. } | SnapshotError::Deserialize { .. } + ); + + // Delete the object file and assert that verification fails. + tokio::fs::remove_file(&random_object_path).await?; + info!("deleted object file {}", random_object_path.display()); + let err = verify_snapshot(blob_provider, dst_path, src_snapshot) + .await + .unwrap_err(); + assert_matches!(err, SnapshotError::ReadObject { cause, ..} if cause.kind() == io::ErrorKind::NotFound); + + Ok(()) +} + /// Creating a snapshot takes a long time, because we need to commit /// `SNAPSHOT_FREQUENCY` transactions to trigger one. /// From 1bf2a1613afb06e63602eeb0f731a1c52762884c Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Thu, 8 May 2025 17:13:12 +0200 Subject: [PATCH 4/4] Update Cargo.lock --- Cargo.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 5dce66c115d..c1543306821 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5839,7 +5839,7 @@ dependencies = [ "hex", "log", "pretty_assertions", - "rand 0.9.0", + "rand 0.9.1", "spacetimedb-core", "spacetimedb-durability", "spacetimedb-fs-utils",