Skip to content

Commit

Permalink
Introduce SnapshotRepository find_latest and wire up partition restore
Browse files Browse the repository at this point in the history
With this change, Partition Processor startup now checks the snapshot repository
 for a partition snapshot before creating a blank store database. If a recent
 snapshot is available, we will restore that instead of replaying the log from
 the beginning.
  • Loading branch information
pcholakov committed Nov 22, 2024
1 parent 38268d6 commit 40e74fd
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 16 deletions.
13 changes: 8 additions & 5 deletions crates/partition-store/src/partition_store_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,11 @@ impl PartitionStoreManager {
})
}

pub async fn has_partition(&self, partition_id: PartitionId) -> bool {
let guard = self.lookup.lock().await;
guard.live.contains_key(&partition_id)
/// Check whether we have a partition store for the given partition id, irrespective of whether
/// the store is open or not.
pub async fn has_partition_store(&self, partition_id: PartitionId) -> bool {
let cf_name = cf_for_partition(partition_id);
self.rocksdb.inner().cf_handle(&cf_name).is_some()
}

pub async fn get_partition_store(&self, partition_id: PartitionId) -> Option<PartitionStore> {
Expand Down Expand Up @@ -169,8 +171,9 @@ impl PartitionStoreManager {

info!(
?partition_id,
min_applied_lsn = ?snapshot.min_applied_lsn,
"Initializing partition store from snapshot"
lsn = ?snapshot.min_applied_lsn,
path = ?snapshot.base_dir,
"Importing partition store snapshot"
);

if let Err(e) = self
Expand Down
1 change: 1 addition & 0 deletions crates/worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ serde = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true }
strum = { workspace = true }
tempfile = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
Expand Down
101 changes: 99 additions & 2 deletions crates/worker/src/partition/snapshots/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ use object_store::aws::AmazonS3Builder;
use object_store::{MultipartUpload, ObjectStore, PutPayload};
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use tempfile::TempDir;
use tokio::io::AsyncReadExt;
use tracing::{debug, trace};
use tracing::{debug, info, trace};
use url::Url;

use restate_partition_store::snapshots::{PartitionSnapshotMetadata, SnapshotFormatVersion};
use restate_partition_store::snapshots::{
LocalPartitionSnapshot, PartitionSnapshotMetadata, SnapshotFormatVersion,
};
use restate_types::config::SnapshotsOptions;
use restate_types::identifiers::{PartitionId, SnapshotId};
use restate_types::logs::Lsn;
Expand All @@ -48,6 +51,7 @@ pub struct SnapshotRepository {
object_store: Arc<dyn ObjectStore>,
destination: Url,
prefix: String,
base_dir: PathBuf,
}

#[serde_as]
Expand Down Expand Up @@ -138,6 +142,7 @@ impl SnapshotRepository {
object_store,
destination,
prefix,
base_dir,
})
}

Expand Down Expand Up @@ -237,6 +242,98 @@ impl SnapshotRepository {

Ok(())
}

/// Discover and download the latest snapshot available. Dropping the returned
/// `LocalPartitionSnapshot` will delete the local snapshot data files.
pub(crate) async fn get_latest(
&self,
partition_id: PartitionId,
) -> anyhow::Result<Option<LocalPartitionSnapshot>> {
let latest_path = object_store::path::Path::from(format!(
"{prefix}{partition_id}/latest.json",
prefix = self.prefix,
partition_id = partition_id,
));

let latest = self.object_store.get(&latest_path).await;

let latest = match latest {
Ok(result) => result,
Err(object_store::Error::NotFound { .. }) => {
debug!("Latest snapshot data not found in repository");
return Ok(None);
}
Err(e) => return Err(e.into()),
};

let latest: LatestSnapshot =
serde_json::from_slice(latest.bytes().await?.iter().as_slice())?;
trace!("Latest snapshot metadata: {:?}", latest);

let snapshot_metadata_path = object_store::path::Path::from(format!(
"{prefix}{partition_id}/{path}/metadata.json",
prefix = self.prefix,
partition_id = partition_id,
path = latest.path,
));
let snapshot_metadata = self.object_store.get(&snapshot_metadata_path).await;

let snapshot_metadata = match snapshot_metadata {
Ok(result) => result,
Err(object_store::Error::NotFound { .. }) => {
info!("Latest snapshot points to a snapshot that was not found in the repository!");
return Ok(None); // arguably this could also be an error
}
Err(e) => return Err(e.into()),
};

let mut snapshot_metadata: PartitionSnapshotMetadata =
serde_json::from_slice(snapshot_metadata.bytes().await?.iter().as_slice())?;
if snapshot_metadata.version != SnapshotFormatVersion::V1 {
return Err(anyhow!(
"Unsupported snapshot format version: {:?}",
snapshot_metadata.version
));
}

// The snapshot ingest directory should be on the same filesystem as the partition store
// to minimize IO and disk space usage during import.
let snapshot_dir = TempDir::with_prefix_in(
format!("{}-", snapshot_metadata.snapshot_id),
&self.base_dir,
)?;
debug!(
snapshot_id = %snapshot_metadata.snapshot_id,
path = ?snapshot_dir.path(),
"Getting snapshot data",
);

// todo(pavel): stream the data from the object store
for file in &mut snapshot_metadata.files {
let filename = file.name.trim_start_matches("/");
let key = object_store::path::Path::from(format!(
"{prefix}{partition_id}/{path}/{filename}",
prefix = self.prefix,
partition_id = partition_id,
path = latest.path,
filename = filename,
));
let file_path = snapshot_dir.path().join(filename);
let file_data = self.object_store.get(&key).await?;
tokio::fs::write(&file_path, file_data.bytes().await?).await?;
trace!(%key, "Downloaded snapshot data file to {:?}", file_path);
// Patch paths to point to the local staging directory
file.directory = snapshot_dir.path().to_string_lossy().to_string();
}

Ok(Some(LocalPartitionSnapshot {
base_dir: snapshot_dir.into_path(),
min_applied_lsn: snapshot_metadata.min_applied_lsn,
db_comparator_name: snapshot_metadata.db_comparator_name,
files: snapshot_metadata.files,
key_range: snapshot_metadata.key_range.clone(),
}))
}
}

/// S3 and other stores require a certain minimum size for the parts of a multipart upload. It is an
Expand Down
1 change: 1 addition & 0 deletions crates/worker/src/partition_processor_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,7 @@ impl PartitionProcessorManager {
self.metadata.clone(),
self.bifrost.clone(),
self.partition_store_manager.clone(),
self.snapshot_repository.clone(),
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
use std::ops::RangeInclusive;

use tokio::sync::{mpsc, watch};
use tracing::instrument;
use tracing::{info, instrument, trace};

use crate::invoker_integration::EntryEnricher;
use crate::partition::invoker_storage_reader::InvokerStorageReader;
use crate::partition::snapshots::SnapshotRepository;
use crate::partition_processor_manager::processor_state::StartedProcessor;
use crate::PartitionProcessorBuilder;
use restate_bifrost::Bifrost;
Expand All @@ -38,6 +39,7 @@ pub struct SpawnPartitionProcessorTask {
metadata: Metadata,
bifrost: Bifrost,
partition_store_manager: PartitionStoreManager,
snapshot_repository: SnapshotRepository,
}

impl SpawnPartitionProcessorTask {
Expand All @@ -51,6 +53,7 @@ impl SpawnPartitionProcessorTask {
metadata: Metadata,
bifrost: Bifrost,
partition_store_manager: PartitionStoreManager,
snapshot_repository: SnapshotRepository,
) -> Self {
Self {
task_name,
Expand All @@ -61,6 +64,7 @@ impl SpawnPartitionProcessorTask {
metadata,
bifrost,
partition_store_manager,
snapshot_repository,
}
}

Expand All @@ -82,6 +86,7 @@ impl SpawnPartitionProcessorTask {
metadata,
bifrost,
partition_store_manager,
snapshot_repository,
} = self;

let config = configuration.pinned();
Expand Down Expand Up @@ -129,14 +134,53 @@ impl SpawnPartitionProcessorTask {
{
let options = options.clone();
let key_range = key_range.clone();
let partition_store = partition_store_manager
.open_partition_store(
partition_id,
key_range,
OpenMode::CreateIfMissing,
&options.storage.rocksdb,
)
.await?;

let partition_store = if !partition_store_manager
.has_partition_store(pp_builder.partition_id)
.await
{
trace!(
partition_id = %partition_id,
"Looking for partition snapshot from which to bootstrap partition store",
);
let snapshot = snapshot_repository.get_latest(partition_id).await?;
if let Some(snapshot) = snapshot {
info!(
partition_id = %partition_id,
"Found snapshot to bootstrap partition, restoring it",
);
partition_store_manager
.open_partition_store_from_snapshot(
partition_id,
key_range.clone(),
snapshot,
&options.storage.rocksdb,
)
.await?
} else {
info!(
partition_id = %partition_id,
"No snapshot found to bootstrap partition, creating new store",
);
partition_store_manager
.open_partition_store(
partition_id,
key_range,
OpenMode::CreateIfMissing,
&options.storage.rocksdb,
)
.await?
}
} else {
partition_store_manager
.open_partition_store(
partition_id,
key_range,
OpenMode::OpenExisting,
&options.storage.rocksdb,
)
.await?
};

move || async move {
tc.spawn_child(
Expand Down

0 comments on commit 40e74fd

Please sign in to comment.