Skip to content

Commit

Permalink
Validate object store uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov committed Dec 5, 2024
1 parent 9889aa5 commit aee1a12
Showing 1 changed file with 130 additions and 62 deletions.
192 changes: 130 additions & 62 deletions crates/worker/src/partition/snapshots/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub struct SnapshotRepository {
}

#[serde_as]
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct LatestSnapshot {
pub version: SnapshotFormatVersion,

Expand All @@ -80,6 +80,21 @@ pub struct LatestSnapshot {
pub path: String,
}

impl LatestSnapshot {
pub fn from_snapshot(snapshot: &PartitionSnapshotMetadata, path: String) -> Self {
LatestSnapshot {
version: snapshot.version,
cluster_name: snapshot.cluster_name.clone(),
node_name: snapshot.node_name.clone(),
partition_id: snapshot.partition_id,
snapshot_id: snapshot.snapshot_id,
created_at: snapshot.created_at.clone(),
min_applied_lsn: snapshot.min_applied_lsn,
path,
}
}
}

impl SnapshotRepository {
/// Creates an instance of the repository if a snapshots destination is configured.
pub async fn create_if_configured(
Expand All @@ -96,42 +111,7 @@ impl SnapshotRepository {
.inspect(|params| info!("Snapshot destination parameters ignored: {params}"));
destination.set_query(None);

// We use the AWS SDK configuration and credentials provider so that the conventional AWS
// environment variables and config files work as expected. The object_store crate has its
// own configuration mechanism which doesn't support many of the AWS conventions. This
// differs quite a lot from the Lambda invoker which uses the AWS SDK, and that would be a
// very surprising inconsistency for customers. This mechanism allows us to infer the region
// and securely obtain session credentials without any hard-coded configuration.
let object_store: Arc<dyn ObjectStore> = if destination.scheme() == "s3" {
debug!("Using AWS SDK credentials provider");
let aws_region = aws_config::load_defaults(BehaviorVersion::v2024_03_28())
.await
.region()
.context("Unable to determine AWS region to use with S3")?
.clone();

let store = AmazonS3Builder::new()
.with_url(destination.clone())
.with_region(aws_region.to_string())
.with_conditional_put(S3ConditionalPut::ETagMatch)
.with_credentials(Arc::new(AwsSdkCredentialsProvider {
credentials_provider: DefaultCredentialsChain::builder().build().await,
}))
.with_retry(object_store::RetryConfig {
max_retries: 8,
retry_timeout: Duration::from_secs(60),
backoff: object_store::BackoffConfig {
init_backoff: Duration::from_millis(100),
max_backoff: Duration::from_secs(5),
base: 2.,
},
})
.build()?;

Arc::new(store)
} else {
object_store::parse_url(&destination)?.0.into()
};
let object_store = create_object_store_client(destination.clone()).await?;

// The prefix must be stripped of any leading slash and, unless it is empty, must end in a
// single "/" character.
Expand Down Expand Up @@ -207,13 +187,9 @@ impl SnapshotRepository {
) -> Result<(), PutSnapshotError> {
// A unique snapshot path within the partition prefix. We pad the LSN to ensure correct
// lexicographic sorting.
let relative_snapshot_path = format!(
"lsn_{lsn:020}-{snapshot_id}",
lsn = snapshot.min_applied_lsn,
snapshot_id = snapshot.snapshot_id
);
let snapshot_prefix = Self::get_snapshot_prefix(snapshot);
let full_snapshot_path = format!(
"{prefix}{partition_id}/{relative_snapshot_path}",
"{prefix}{partition_id}/{snapshot_prefix}",
prefix = self.prefix,
partition_id = snapshot.partition_id,
);
Expand Down Expand Up @@ -294,18 +270,8 @@ impl SnapshotRepository {
return Ok(());
}

// Construct the new "latest snapshot" pointer
let latest = LatestSnapshot {
version: snapshot.version,
cluster_name: snapshot.cluster_name.clone(),
node_name: snapshot.node_name.clone(),
partition_id: snapshot.partition_id,
snapshot_id: snapshot.snapshot_id,
created_at: snapshot.created_at.clone(),
min_applied_lsn: snapshot.min_applied_lsn,
path: relative_snapshot_path,
};
let latest_json = PutPayload::from(
let latest = LatestSnapshot::from_snapshot(snapshot, snapshot_prefix);
let latest = PutPayload::from(
serde_json::to_string_pretty(&latest)
.map_err(|e| PutSnapshotError::from(e, progress.clone()))?,
);
Expand All @@ -327,7 +293,7 @@ impl SnapshotRepository {
// retrying the entire put_snapshot attempt on object_store::Error::Precondition.
let put_result = self
.object_store
.put_opts(&latest_path, latest_json, conditions)
.put_opts(&latest_path, latest, conditions)
.await
.map_err(|e| PutSnapshotError::from(e, progress.clone()))?;

Expand All @@ -340,6 +306,14 @@ impl SnapshotRepository {
Ok(())
}

fn get_snapshot_prefix(snapshot: &PartitionSnapshotMetadata) -> String {
format!(
"lsn_{lsn:020}-{snapshot_id}",
lsn = snapshot.min_applied_lsn,
snapshot_id = snapshot.snapshot_id
)
}

async fn get_latest_snapshot_metadata_for_update(
&self,
snapshot: &PartitionSnapshotMetadata,
Expand Down Expand Up @@ -384,6 +358,46 @@ impl SnapshotRepository {
}
}

async fn create_object_store_client(destination: Url) -> anyhow::Result<Arc<dyn ObjectStore>> {
// We use the AWS SDK configuration and credentials provider so that the conventional AWS
// environment variables and config files work as expected. The object_store crate has its
// own configuration mechanism which doesn't support many of the AWS conventions. This
// differs quite a lot from the Lambda invoker which uses the AWS SDK, and that would be a
// very surprising inconsistency for customers. This mechanism allows us to infer the region
// and securely obtain session credentials without any hard-coded configuration.
let object_store: Arc<dyn ObjectStore> = if destination.scheme() == "s3" {
debug!("Using AWS SDK credentials provider");
let aws_region = aws_config::load_defaults(BehaviorVersion::v2024_03_28())
.await
.region()
.context("Unable to determine AWS region to use with S3")?
.clone();

let store = AmazonS3Builder::new()
.with_url(destination)
.with_region(aws_region.to_string())
.with_conditional_put(S3ConditionalPut::ETagMatch)
.with_credentials(Arc::new(AwsSdkCredentialsProvider {
credentials_provider: DefaultCredentialsChain::builder().build().await,
}))
.with_retry(object_store::RetryConfig {
max_retries: 8,
retry_timeout: Duration::from_secs(60),
backoff: object_store::BackoffConfig {
init_backoff: Duration::from_millis(100),
max_backoff: Duration::from_secs(5),
base: 2.,
},
})
.build()?;

Arc::new(store)
} else {
object_store::parse_url(&destination)?.0.into()
};
Ok(object_store)
}

#[derive(Clone, Debug)]
struct SnapshotUploadProgress {
pub full_snapshot_path: String,
Expand Down Expand Up @@ -516,10 +530,9 @@ impl object_store::CredentialProvider for AwsSdkCredentialsProvider {

#[cfg(test)]
mod tests {
use restate_partition_store::snapshots::{PartitionSnapshotMetadata, SnapshotFormatVersion};
use restate_types::config::SnapshotsOptions;
use restate_types::identifiers::{PartitionId, PartitionKey, SnapshotId};
use restate_types::logs::{Lsn, SequenceNumber};
use bytes::Bytes;
use object_store::path::Path;
use object_store::ObjectStore;
use std::time::SystemTime;
use tempfile::TempDir;
use tokio::io::AsyncWriteExt;
Expand All @@ -529,7 +542,11 @@ mod tests {
use tracing_subscriber::{fmt, EnvFilter};
use url::Url;

use super::SnapshotRepository;
use super::{LatestSnapshot, SnapshotRepository};
use restate_partition_store::snapshots::{PartitionSnapshotMetadata, SnapshotFormatVersion};
use restate_types::config::SnapshotsOptions;
use restate_types::identifiers::{PartitionId, PartitionKey, SnapshotId};
use restate_types::logs::{Lsn, SequenceNumber};

#[tokio::test]
async fn test_repository_local() -> anyhow::Result<()> {
Expand Down Expand Up @@ -655,16 +672,52 @@ mod tests {
);

let opts = SnapshotsOptions {
destination: Some(destination.to_string()),
destination: Some(destination.clone()),
..SnapshotsOptions::default()
};

eprintln!("Destination: {}", destination);

let destination = Url::parse(destination.as_str())?;
let path = destination.path().to_string();
let object_store = super::create_object_store_client(destination).await?;

let repository = SnapshotRepository::create_if_configured(&opts)
.await?
.unwrap();

repository.put(&snapshot1, source_dir.clone()).await?;

let snapshot_prefix = SnapshotRepository::get_snapshot_prefix(&snapshot1);
let data = object_store
.get(&Path::from(format!(
"{}/{}/{}/data.sst",
path, snapshot1.partition_id, snapshot_prefix,
)))
.await?;
assert_eq!(data.bytes().await?, Bytes::from_static(b"snapshot-data"));

let metadata = object_store
.get(&Path::from(format!(
"{}/{}/{}/metadata.json",
path, snapshot1.partition_id, snapshot_prefix,
)))
.await?;
let metadata: PartitionSnapshotMetadata = serde_json::from_slice(&metadata.bytes().await?)?;
assert_eq!(snapshot1.snapshot_id, metadata.snapshot_id);

let latest = object_store
.get(&Path::from(format!(
"{}/{}/latest.json",
path, snapshot1.partition_id,
)))
.await?;
let latest: LatestSnapshot = serde_json::from_slice(&latest.bytes().await?)?;
assert_eq!(
LatestSnapshot::from_snapshot(&snapshot1, snapshot_prefix),
latest
);

let snapshot_source = TempDir::new()?;
let source_dir = snapshot_source.path().to_path_buf();

Expand All @@ -679,6 +732,21 @@ mod tests {

repository.put(&snapshot2, source_dir).await?;

let latest = object_store
.get(&Path::from(format!(
"{}/{}/latest.json",
path, snapshot2.partition_id,
)))
.await?;
let latest: LatestSnapshot = serde_json::from_slice(&latest.bytes().await?)?;
assert_eq!(
LatestSnapshot::from_snapshot(
&snapshot2,
SnapshotRepository::get_snapshot_prefix(&snapshot2)
),
latest
);

Ok(())
}

Expand Down

0 comments on commit aee1a12

Please sign in to comment.