From 52296eb5e1be88a241903488c133214cb5a9a363 Mon Sep 17 00:00:00 2001 From: Christian Date: Mon, 11 Nov 2024 12:03:34 +0000 Subject: [PATCH] feat: Implement TableRequirement checks (#689) * Impelment TableRequirement check * Address comments --- crates/iceberg/src/catalog/mod.rs | 309 +++++++++++++++++++++++++++++- crates/iceberg/src/transaction.rs | 2 +- 2 files changed, 303 insertions(+), 8 deletions(-) diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 854c1269c..536726fd3 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -29,8 +29,8 @@ use typed_builder::TypedBuilder; use uuid::Uuid; use crate::spec::{ - FormatVersion, Schema, Snapshot, SnapshotReference, SortOrder, TableMetadataBuilder, - UnboundPartitionSpec, ViewRepresentations, + FormatVersion, Schema, SchemaId, Snapshot, SnapshotReference, SortOrder, TableMetadata, + TableMetadataBuilder, UnboundPartitionSpec, ViewRepresentations, }; use crate::table::Table; use crate::{Error, ErrorKind, Result}; @@ -312,14 +312,14 @@ pub enum TableRequirement { LastAssignedFieldIdMatch { /// The last assigned field id of the table to assert. #[serde(rename = "last-assigned-field-id")] - last_assigned_field_id: i64, + last_assigned_field_id: i32, }, /// The table's current schema id must match the requirement. #[serde(rename = "assert-current-schema-id")] CurrentSchemaIdMatch { /// Current schema id of the table to assert. #[serde(rename = "current-schema-id")] - current_schema_id: i64, + current_schema_id: SchemaId, }, /// The table's last assigned partition id must match the /// requirement. @@ -327,14 +327,14 @@ pub enum TableRequirement { LastAssignedPartitionIdMatch { /// Last assigned partition id of the table to assert. #[serde(rename = "last-assigned-partition-id")] - last_assigned_partition_id: i64, + last_assigned_partition_id: i32, }, /// The table's default spec id must match the requirement. #[serde(rename = "assert-default-spec-id")] DefaultSpecIdMatch { /// Default spec id of the table to assert. #[serde(rename = "default-spec-id")] - default_spec_id: i64, + default_spec_id: i32, }, /// The table's default sort order id must match the requirement. #[serde(rename = "assert-default-sort-order-id")] @@ -453,6 +453,140 @@ impl TableUpdate { } } +impl TableRequirement { + /// Check that the requirement is met by the table metadata. + /// If the requirement is not met, an appropriate error is returned. + /// + /// Provide metadata as `None` if the table does not exist. + pub fn check(&self, metadata: Option<&TableMetadata>) -> Result<()> { + if let Some(metadata) = metadata { + match self { + TableRequirement::NotExist => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Requirement failed: Table with id {} already exists", + metadata.uuid() + ), + )); + } + TableRequirement::UuidMatch { uuid } => { + if &metadata.uuid() != uuid { + return Err(Error::new( + ErrorKind::DataInvalid, + "Requirement failed: Table UUID does not match", + ) + .with_context("expected", *uuid) + .with_context("found", metadata.uuid())); + } + } + TableRequirement::CurrentSchemaIdMatch { current_schema_id } => { + // ToDo: Harmonize the types of current_schema_id + if metadata.current_schema_id != *current_schema_id { + return Err(Error::new( + ErrorKind::DataInvalid, + "Requirement failed: Current schema id does not match", + ) + .with_context("expected", current_schema_id.to_string()) + .with_context("found", metadata.current_schema_id.to_string())); + } + } + TableRequirement::DefaultSortOrderIdMatch { + default_sort_order_id, + } => { + if metadata.default_sort_order().order_id != *default_sort_order_id { + return Err(Error::new( + ErrorKind::DataInvalid, + "Requirement failed: Default sort order id does not match", + ) + .with_context("expected", default_sort_order_id.to_string()) + .with_context( + "found", + metadata.default_sort_order().order_id.to_string(), + )); + } + } + TableRequirement::RefSnapshotIdMatch { r#ref, snapshot_id } => { + let snapshot_ref = metadata.snapshot_for_ref(r#ref); + if let Some(snapshot_id) = snapshot_id { + let snapshot_ref = snapshot_ref.ok_or(Error::new( + ErrorKind::DataInvalid, + format!("Requirement failed: Branch or tag `{}` not found", r#ref), + ))?; + if snapshot_ref.snapshot_id() != *snapshot_id { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Requirement failed: Branch or tag `{}`'s snapshot has changed", + r#ref + ), + ) + .with_context("expected", snapshot_id.to_string()) + .with_context("found", snapshot_ref.snapshot_id().to_string())); + } + } else if snapshot_ref.is_some() { + // a null snapshot ID means the ref should not exist already + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Requirement failed: Branch or tag `{}` already exists", + r#ref + ), + )); + } + } + TableRequirement::DefaultSpecIdMatch { default_spec_id } => { + // ToDo: Harmonize the types of default_spec_id + if metadata.default_partition_spec_id() != *default_spec_id { + return Err(Error::new( + ErrorKind::DataInvalid, + "Requirement failed: Default partition spec id does not match", + ) + .with_context("expected", default_spec_id.to_string()) + .with_context("found", metadata.default_partition_spec_id().to_string())); + } + } + TableRequirement::LastAssignedPartitionIdMatch { + last_assigned_partition_id, + } => { + if metadata.last_partition_id != *last_assigned_partition_id { + return Err(Error::new( + ErrorKind::DataInvalid, + "Requirement failed: Last assigned partition id does not match", + ) + .with_context("expected", last_assigned_partition_id.to_string()) + .with_context("found", metadata.last_partition_id.to_string())); + } + } + TableRequirement::LastAssignedFieldIdMatch { + last_assigned_field_id, + } => { + if &metadata.last_column_id != last_assigned_field_id { + return Err(Error::new( + ErrorKind::DataInvalid, + "Requirement failed: Last assigned field id does not match", + ) + .with_context("expected", last_assigned_field_id.to_string()) + .with_context("found", metadata.last_column_id.to_string())); + } + } + }; + } else { + match self { + TableRequirement::NotExist => {} + _ => { + return Err(Error::new( + ErrorKind::DataInvalid, + "Requirement failed: Table does not exist", + )); + } + } + } + + Ok(()) + } +} + pub(super) mod _serde { use serde::{Deserialize as _, Deserializer}; @@ -549,7 +683,7 @@ mod tests { use crate::spec::{ FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Summary, - TableMetadataBuilder, Transform, Type, UnboundPartitionSpec, + TableMetadata, TableMetadataBuilder, Transform, Type, UnboundPartitionSpec, }; use crate::{NamespaceIdent, TableCreation, TableIdent, TableRequirement, TableUpdate}; @@ -593,6 +727,167 @@ mod tests { ); } + fn metadata() -> TableMetadata { + let tbl_creation = TableCreation::builder() + .name("table".to_string()) + .location("/path/to/table".to_string()) + .schema(Schema::builder().build().unwrap()) + .build(); + + TableMetadataBuilder::from_table_creation(tbl_creation) + .unwrap() + .assign_uuid(uuid::Uuid::nil()) + .unwrap() + .build() + .unwrap() + } + + #[test] + fn test_check_requirement_not_exist() { + let metadata = metadata(); + let requirement = TableRequirement::NotExist; + + assert!(requirement.check(Some(&metadata)).is_err()); + assert!(requirement.check(None).is_ok()); + } + + #[test] + fn test_check_table_uuid() { + let metadata = metadata(); + + let requirement = TableRequirement::UuidMatch { + uuid: uuid::Uuid::now_v7(), + }; + assert!(requirement.check(Some(&metadata)).is_err()); + + let requirement = TableRequirement::UuidMatch { + uuid: uuid::Uuid::nil(), + }; + assert!(requirement.check(Some(&metadata)).is_ok()); + } + + #[test] + fn test_check_ref_snapshot_id() { + let metadata = metadata(); + + // Ref does not exist but should + let requirement = TableRequirement::RefSnapshotIdMatch { + r#ref: "my_branch".to_string(), + snapshot_id: Some(1), + }; + assert!(requirement.check(Some(&metadata)).is_err()); + + // Ref does not exist and should not + let requirement = TableRequirement::RefSnapshotIdMatch { + r#ref: "my_branch".to_string(), + snapshot_id: None, + }; + assert!(requirement.check(Some(&metadata)).is_ok()); + + // Add snapshot + let record = r#" + { + "snapshot-id": 3051729675574597004, + "sequence-number": 10, + "timestamp-ms": 1515100955770, + "summary": { + "operation": "append" + }, + "manifest-list": "s3://b/wh/.../s1.avro", + "schema-id": 0 + } + "#; + + let snapshot = serde_json::from_str::(record).unwrap(); + let mut metadata = metadata; + metadata.append_snapshot(snapshot); + + // Ref exists and should matches + let requirement = TableRequirement::RefSnapshotIdMatch { + r#ref: "main".to_string(), + snapshot_id: Some(3051729675574597004), + }; + assert!(requirement.check(Some(&metadata)).is_ok()); + + // Ref exists but does not match + let requirement = TableRequirement::RefSnapshotIdMatch { + r#ref: "main".to_string(), + snapshot_id: Some(1), + }; + assert!(requirement.check(Some(&metadata)).is_err()); + } + + #[test] + fn test_check_last_assigned_field_id() { + let metadata = metadata(); + + let requirement = TableRequirement::LastAssignedFieldIdMatch { + last_assigned_field_id: 1, + }; + assert!(requirement.check(Some(&metadata)).is_err()); + + let requirement = TableRequirement::LastAssignedFieldIdMatch { + last_assigned_field_id: 0, + }; + assert!(requirement.check(Some(&metadata)).is_ok()); + } + + #[test] + fn test_check_current_schema_id() { + let metadata = metadata(); + + let requirement = TableRequirement::CurrentSchemaIdMatch { + current_schema_id: 1, + }; + assert!(requirement.check(Some(&metadata)).is_err()); + + let requirement = TableRequirement::CurrentSchemaIdMatch { + current_schema_id: 0, + }; + assert!(requirement.check(Some(&metadata)).is_ok()); + } + + #[test] + fn test_check_last_assigned_partition_id() { + let metadata = metadata(); + + let requirement = TableRequirement::LastAssignedPartitionIdMatch { + last_assigned_partition_id: 1, + }; + assert!(requirement.check(Some(&metadata)).is_err()); + + let requirement = TableRequirement::LastAssignedPartitionIdMatch { + last_assigned_partition_id: 0, + }; + assert!(requirement.check(Some(&metadata)).is_ok()); + } + + #[test] + fn test_check_default_spec_id() { + let metadata = metadata(); + + let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 1 }; + assert!(requirement.check(Some(&metadata)).is_err()); + + let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 0 }; + assert!(requirement.check(Some(&metadata)).is_ok()); + } + + #[test] + fn test_check_default_sort_order_id() { + let metadata = metadata(); + + let requirement = TableRequirement::DefaultSortOrderIdMatch { + default_sort_order_id: 1, + }; + assert!(requirement.check(Some(&metadata)).is_err()); + + let requirement = TableRequirement::DefaultSortOrderIdMatch { + default_sort_order_id: 0, + }; + assert!(requirement.check(Some(&metadata)).is_ok()); + } + #[test] fn test_table_uuid() { test_serde_json( diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index db7c3f28f..f29cf5122 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -154,7 +154,7 @@ impl<'a> ReplaceSortOrderAction<'a> { let requirements = vec![ TableRequirement::CurrentSchemaIdMatch { - current_schema_id: self.tx.table.metadata().current_schema().schema_id() as i64, + current_schema_id: self.tx.table.metadata().current_schema().schema_id(), }, TableRequirement::DefaultSortOrderIdMatch { default_sort_order_id: self.tx.table.metadata().default_sort_order().order_id,