diff --git a/Cargo.lock b/Cargo.lock index cf89eb512..4f5243f44 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3105,6 +3105,7 @@ version = "0.4.0" dependencies = [ "arrow-array", "arrow-schema", + "ctor", "datafusion", "futures", "iceberg", diff --git a/crates/catalog/glue/tests/glue_catalog_test.rs b/crates/catalog/glue/tests/glue_catalog_test.rs index ebfb1f432..634d7345d 100644 --- a/crates/catalog/glue/tests/glue_catalog_test.rs +++ b/crates/catalog/glue/tests/glue_catalog_test.rs @@ -44,7 +44,7 @@ fn before_all() { normalize_test_name(module_path!()), format!("{}/testdata/glue_catalog", env!("CARGO_MANIFEST_DIR")), ); - docker_compose.run(); + docker_compose.up(); guard.replace(docker_compose); } diff --git a/crates/catalog/hms/tests/hms_catalog_test.rs b/crates/catalog/hms/tests/hms_catalog_test.rs index 37a95941b..f70517ac0 100644 --- a/crates/catalog/hms/tests/hms_catalog_test.rs +++ b/crates/catalog/hms/tests/hms_catalog_test.rs @@ -43,7 +43,7 @@ fn before_all() { normalize_test_name(module_path!()), format!("{}/testdata/hms_catalog", env!("CARGO_MANIFEST_DIR")), ); - docker_compose.run(); + docker_compose.up(); guard.replace(docker_compose); } diff --git a/crates/catalog/rest/tests/rest_catalog_test.rs b/crates/catalog/rest/tests/rest_catalog_test.rs index eee58fc19..f70fc2f3d 100644 --- a/crates/catalog/rest/tests/rest_catalog_test.rs +++ b/crates/catalog/rest/tests/rest_catalog_test.rs @@ -41,7 +41,7 @@ fn before_all() { normalize_test_name(module_path!()), format!("{}/testdata/rest_catalog", env!("CARGO_MANIFEST_DIR")), ); - docker_compose.run(); + docker_compose.up(); guard.replace(docker_compose); } diff --git a/crates/iceberg/tests/file_io_gcs_test.rs b/crates/iceberg/tests/file_io_gcs_test.rs index f78465571..161285ae6 100644 --- a/crates/iceberg/tests/file_io_gcs_test.rs +++ b/crates/iceberg/tests/file_io_gcs_test.rs @@ -40,7 +40,7 @@ mod tests { normalize_test_name(module_path!()), format!("{}/testdata/file_io_gcs", env!("CARGO_MANIFEST_DIR")), ); - docker_compose.run(); + docker_compose.up(); guard.replace(docker_compose); } diff --git a/crates/iceberg/tests/file_io_s3_test.rs b/crates/iceberg/tests/file_io_s3_test.rs index 22b798fc0..eab6853b3 100644 --- a/crates/iceberg/tests/file_io_s3_test.rs +++ b/crates/iceberg/tests/file_io_s3_test.rs @@ -38,7 +38,7 @@ mod tests { normalize_test_name(module_path!()), format!("{}/testdata/file_io_s3", env!("CARGO_MANIFEST_DIR")), ); - docker_compose.run(); + docker_compose.up(); guard.replace(docker_compose); } diff --git a/crates/integration_tests/Cargo.toml b/crates/integration_tests/Cargo.toml index 1d1bfd9c2..26ad2d8b3 100644 --- a/crates/integration_tests/Cargo.toml +++ b/crates/integration_tests/Cargo.toml @@ -27,6 +27,7 @@ rust-version = { workspace = true } [dependencies] arrow-array = { workspace = true } arrow-schema = { workspace = true } +ctor = { workspace = true } datafusion = { workspace = true } futures = { workspace = true } iceberg = { workspace = true } diff --git a/crates/integration_tests/src/lib.rs b/crates/integration_tests/src/lib.rs index c9311c45b..1d2d5dc1e 100644 --- a/crates/integration_tests/src/lib.rs +++ b/crates/integration_tests/src/lib.rs @@ -18,7 +18,7 @@ use std::collections::HashMap; use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; -use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig}; +use iceberg_catalog_rest::RestCatalogConfig; use iceberg_test_utils::docker::DockerCompose; use iceberg_test_utils::{normalize_test_name, set_up}; @@ -26,23 +26,24 @@ const REST_CATALOG_PORT: u16 = 8181; pub struct TestFixture { pub _docker_compose: DockerCompose, - pub rest_catalog: RestCatalog, + pub catalog_config: RestCatalogConfig, } -pub async fn set_test_fixture(func: &str) -> TestFixture { +pub fn set_test_fixture(func: &str) -> TestFixture { set_up(); let docker_compose = DockerCompose::new( normalize_test_name(format!("{}_{func}", module_path!())), format!("{}/testdata", env!("CARGO_MANIFEST_DIR")), ); - // Start docker compose - docker_compose.run(); + // Stop any containers from previous runs and start new ones + docker_compose.down(); + docker_compose.up(); let rest_catalog_ip = docker_compose.get_container_ip("rest"); let minio_ip = docker_compose.get_container_ip("minio"); - let config = RestCatalogConfig::builder() + let catalog_config = RestCatalogConfig::builder() .uri(format!("http://{}:{}", rest_catalog_ip, REST_CATALOG_PORT)) .props(HashMap::from([ ( @@ -54,10 +55,9 @@ pub async fn set_test_fixture(func: &str) -> TestFixture { (S3_REGION.to_string(), "us-east-1".to_string()), ])) .build(); - let rest_catalog = RestCatalog::new(config); TestFixture { _docker_compose: docker_compose, - rest_catalog, + catalog_config, } } diff --git a/crates/integration_tests/tests/main.rs b/crates/integration_tests/tests/main.rs new file mode 100644 index 000000000..20bf6b9e3 --- /dev/null +++ b/crates/integration_tests/tests/main.rs @@ -0,0 +1,19 @@ +use std::sync::{Arc, OnceLock}; + +use ctor::dtor; +use iceberg_integration_tests::{set_test_fixture, TestFixture}; + +pub mod shared; + +static DOCKER_CONTAINERS: OnceLock> = OnceLock::new(); + +pub fn get_shared_containers() -> &'static Arc { + DOCKER_CONTAINERS.get_or_init(|| Arc::new(set_test_fixture("shared_tests"))) +} + +#[dtor] +fn shutdown() { + if let Some(fixture) = DOCKER_CONTAINERS.get() { + fixture._docker_compose.down() + } +} diff --git a/crates/integration_tests/tests/append_data_file_test.rs b/crates/integration_tests/tests/shared/append_data_file_test.rs similarity index 94% rename from crates/integration_tests/tests/append_data_file_test.rs rename to crates/integration_tests/tests/shared/append_data_file_test.rs index 60d4f04c6..7bee80760 100644 --- a/crates/integration_tests/tests/append_data_file_test.rs +++ b/crates/integration_tests/tests/shared/append_data_file_test.rs @@ -31,13 +31,16 @@ use iceberg::writer::file_writer::location_generator::{ use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation}; -use iceberg_integration_tests::set_test_fixture; +use iceberg_catalog_rest::RestCatalog; use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::file::properties::WriterProperties; +use crate::get_shared_containers; + #[tokio::test] async fn test_append_data_file() { - let fixture = set_test_fixture("test_create_table").await; + let fixture = get_shared_containers(); + let rest_catalog = RestCatalog::new(fixture.catalog_config.clone()); let ns = Namespace::with_properties( NamespaceIdent::from_strs(["apple", "ios"]).unwrap(), @@ -47,11 +50,9 @@ async fn test_append_data_file() { ]), ); - fixture - .rest_catalog + let _ = rest_catalog .create_namespace(ns.name(), ns.properties().clone()) - .await - .unwrap(); + .await; let schema = Schema::builder() .with_schema_id(1) @@ -69,8 +70,7 @@ async fn test_append_data_file() { .schema(schema.clone()) .build(); - let table = fixture - .rest_catalog + let table = rest_catalog .create_table(ns.name(), table_creation) .await .unwrap(); @@ -137,7 +137,7 @@ async fn test_append_data_file() { let mut append_action = tx.fast_append(None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); let tx = append_action.apply().await.unwrap(); - let table = tx.commit(&fixture.rest_catalog).await.unwrap(); + let table = tx.commit(&rest_catalog).await.unwrap(); // check result let batch_stream = table @@ -157,7 +157,7 @@ async fn test_append_data_file() { let mut append_action = tx.fast_append(None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); let tx = append_action.apply().await.unwrap(); - let table = tx.commit(&fixture.rest_catalog).await.unwrap(); + let table = tx.commit(&rest_catalog).await.unwrap(); // check result again let batch_stream = table diff --git a/crates/integration_tests/tests/append_partition_data_file_test.rs b/crates/integration_tests/tests/shared/append_partition_data_file_test.rs similarity index 95% rename from crates/integration_tests/tests/append_partition_data_file_test.rs rename to crates/integration_tests/tests/shared/append_partition_data_file_test.rs index 103021532..3736f954c 100644 --- a/crates/integration_tests/tests/append_partition_data_file_test.rs +++ b/crates/integration_tests/tests/shared/append_partition_data_file_test.rs @@ -35,12 +35,15 @@ use iceberg::writer::file_writer::location_generator::{ use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation}; -use iceberg_integration_tests::set_test_fixture; +use iceberg_catalog_rest::RestCatalog; use parquet::file::properties::WriterProperties; +use crate::get_shared_containers; + #[tokio::test] async fn test_append_partition_data_file() { - let fixture = set_test_fixture("test_partition_data_file").await; + let fixture = get_shared_containers(); + let rest_catalog = RestCatalog::new(fixture.catalog_config.clone()); let ns = Namespace::with_properties( NamespaceIdent::from_strs(["iceberg", "rust"]).unwrap(), @@ -50,11 +53,9 @@ async fn test_append_partition_data_file() { ]), ); - fixture - .rest_catalog + let _ = rest_catalog .create_namespace(ns.name(), ns.properties().clone()) - .await - .unwrap(); + .await; let schema = Schema::builder() .with_schema_id(1) @@ -77,13 +78,12 @@ async fn test_append_partition_data_file() { .expect("could not bind to schema"); let table_creation = TableCreation::builder() - .name("t1".to_string()) + .name("t2".to_string()) .schema(schema.clone()) .partition_spec(partition_spec) .build(); - let table = fixture - .rest_catalog + let table = rest_catalog .create_table(ns.name(), table_creation) .await .unwrap(); @@ -148,7 +148,7 @@ async fn test_append_partition_data_file() { .add_data_files(data_file_valid.clone()) .unwrap(); let tx = append_action.apply().await.unwrap(); - let table = tx.commit(&fixture.rest_catalog).await.unwrap(); + let table = tx.commit(&rest_catalog).await.unwrap(); // check result let batch_stream = table diff --git a/crates/integration_tests/tests/conflict_commit_test.rs b/crates/integration_tests/tests/shared/conflict_commit_test.rs similarity index 92% rename from crates/integration_tests/tests/conflict_commit_test.rs rename to crates/integration_tests/tests/shared/conflict_commit_test.rs index 52575d1ce..12e149c4c 100644 --- a/crates/integration_tests/tests/conflict_commit_test.rs +++ b/crates/integration_tests/tests/shared/conflict_commit_test.rs @@ -31,12 +31,15 @@ use iceberg::writer::file_writer::location_generator::{ use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation}; -use iceberg_integration_tests::set_test_fixture; +use iceberg_catalog_rest::RestCatalog; use parquet::file::properties::WriterProperties; +use crate::get_shared_containers; + #[tokio::test] async fn test_append_data_file_conflict() { - let fixture = set_test_fixture("test_create_table").await; + let fixture = get_shared_containers(); + let rest_catalog = RestCatalog::new(fixture.catalog_config.clone()); let ns = Namespace::with_properties( NamespaceIdent::from_strs(["apple", "ios"]).unwrap(), @@ -46,11 +49,9 @@ async fn test_append_data_file_conflict() { ]), ); - fixture - .rest_catalog + let _ = rest_catalog .create_namespace(ns.name(), ns.properties().clone()) - .await - .unwrap(); + .await; let schema = Schema::builder() .with_schema_id(1) @@ -64,12 +65,11 @@ async fn test_append_data_file_conflict() { .unwrap(); let table_creation = TableCreation::builder() - .name("t1".to_string()) + .name("t3".to_string()) .schema(schema.clone()) .build(); - let table = fixture - .rest_catalog + let table = rest_catalog .create_table(ns.name(), table_creation) .await .unwrap(); @@ -120,7 +120,7 @@ async fn test_append_data_file_conflict() { append_action.add_data_files(data_file.clone()).unwrap(); let tx2 = append_action.apply().await.unwrap(); let table = tx2 - .commit(&fixture.rest_catalog) + .commit(&rest_catalog) .await .expect("The first commit should not fail."); @@ -138,5 +138,5 @@ async fn test_append_data_file_conflict() { assert_eq!(batches[0], batch); // another commit should fail - assert!(tx1.commit(&fixture.rest_catalog).await.is_err()); + assert!(tx1.commit(&rest_catalog).await.is_err()); } diff --git a/crates/integration_tests/tests/datafusion.rs b/crates/integration_tests/tests/shared/datafusion.rs similarity index 96% rename from crates/integration_tests/tests/datafusion.rs rename to crates/integration_tests/tests/shared/datafusion.rs index 1586298ff..badb6496f 100644 --- a/crates/integration_tests/tests/datafusion.rs +++ b/crates/integration_tests/tests/shared/datafusion.rs @@ -25,17 +25,18 @@ use datafusion::catalog::TableProvider; use datafusion::error::DataFusionError; use datafusion::prelude::SessionContext; use iceberg::{Catalog, TableIdent}; +use iceberg_catalog_rest::RestCatalog; use iceberg_datafusion::IcebergTableProvider; -use iceberg_integration_tests::set_test_fixture; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; +use crate::get_shared_containers; + #[tokio::test] async fn test_basic_queries() -> Result<(), DataFusionError> { - let fixture = set_test_fixture("datafusion_basic_read").await; - - let catalog = fixture.rest_catalog; + let fixture = get_shared_containers(); + let rest_catalog = RestCatalog::new(fixture.catalog_config.clone()); - let table = catalog + let table = rest_catalog .load_table(&TableIdent::from_strs(["default", "types_test"]).unwrap()) .await .unwrap(); diff --git a/crates/integration_tests/tests/shared/mod.rs b/crates/integration_tests/tests/shared/mod.rs new file mode 100644 index 000000000..f1f439bf7 --- /dev/null +++ b/crates/integration_tests/tests/shared/mod.rs @@ -0,0 +1,7 @@ +mod append_data_file_test; +mod append_partition_data_file_test; +mod conflict_commit_test; +mod datafusion; +mod read_evolved_schema; +mod read_positional_deletes; +mod scan_all_type; diff --git a/crates/integration_tests/tests/read_evolved_schema.rs b/crates/integration_tests/tests/shared/read_evolved_schema.rs similarity index 92% rename from crates/integration_tests/tests/read_evolved_schema.rs rename to crates/integration_tests/tests/shared/read_evolved_schema.rs index 64e29810c..bdf54f734 100644 --- a/crates/integration_tests/tests/read_evolved_schema.rs +++ b/crates/integration_tests/tests/shared/read_evolved_schema.rs @@ -22,15 +22,16 @@ use futures::TryStreamExt; use iceberg::expr::Reference; use iceberg::spec::Datum; use iceberg::{Catalog, TableIdent}; -use iceberg_integration_tests::set_test_fixture; +use iceberg_catalog_rest::RestCatalog; + +use crate::get_shared_containers; #[tokio::test] async fn test_evolved_schema() { - let fixture = set_test_fixture("read_evolved_schema").await; - - let catalog = fixture.rest_catalog; + let fixture = get_shared_containers(); + let rest_catalog = RestCatalog::new(fixture.catalog_config.clone()); - let table = catalog + let table = rest_catalog .load_table(&TableIdent::from_strs(["default", "test_rename_column"]).unwrap()) .await .unwrap(); @@ -64,7 +65,7 @@ async fn test_evolved_schema() { assert_eq!(actual, vec!["Java", "Python"]); // Evolve column - let table = catalog + let table = rest_catalog .load_table(&TableIdent::from_strs(["default", "test_promote_column"]).unwrap()) .await .unwrap(); diff --git a/crates/integration_tests/tests/read_positional_deletes.rs b/crates/integration_tests/tests/shared/read_positional_deletes.rs similarity index 89% rename from crates/integration_tests/tests/read_positional_deletes.rs rename to crates/integration_tests/tests/shared/read_positional_deletes.rs index 41ca057a6..dba02a537 100644 --- a/crates/integration_tests/tests/read_positional_deletes.rs +++ b/crates/integration_tests/tests/shared/read_positional_deletes.rs @@ -19,15 +19,16 @@ use iceberg::ErrorKind::FeatureUnsupported; use iceberg::{Catalog, TableIdent}; -use iceberg_integration_tests::set_test_fixture; +use iceberg_catalog_rest::RestCatalog; + +use crate::get_shared_containers; #[tokio::test] async fn test_read_table_with_positional_deletes() { - let fixture = set_test_fixture("read_table_with_positional_deletes").await; - - let catalog = fixture.rest_catalog; + let fixture = get_shared_containers(); + let rest_catalog = RestCatalog::new(fixture.catalog_config.clone()); - let table = catalog + let table = rest_catalog .load_table( &TableIdent::from_strs(["default", "test_positional_merge_on_read_double_deletes"]) .unwrap(), diff --git a/crates/integration_tests/tests/scan_all_type.rs b/crates/integration_tests/tests/shared/scan_all_type.rs similarity index 97% rename from crates/integration_tests/tests/scan_all_type.rs rename to crates/integration_tests/tests/shared/scan_all_type.rs index 517f6ea87..49c9462cf 100644 --- a/crates/integration_tests/tests/scan_all_type.rs +++ b/crates/integration_tests/tests/shared/scan_all_type.rs @@ -41,14 +41,17 @@ use iceberg::writer::file_writer::location_generator::{ use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation}; -use iceberg_integration_tests::set_test_fixture; +use iceberg_catalog_rest::RestCatalog; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use parquet::file::properties::WriterProperties; use uuid::Uuid; +use crate::get_shared_containers; + #[tokio::test] async fn test_scan_all_type() { - let fixture = set_test_fixture("test_scan_all_type").await; + let fixture = get_shared_containers(); + let rest_catalog = RestCatalog::new(fixture.catalog_config.clone()); let ns = Namespace::with_properties( NamespaceIdent::from_strs(["apple", "ios"]).unwrap(), @@ -58,11 +61,9 @@ async fn test_scan_all_type() { ]), ); - fixture - .rest_catalog + let _ = rest_catalog .create_namespace(ns.name(), ns.properties().clone()) - .await - .unwrap(); + .await; let schema = Schema::builder() .with_schema_id(1) .with_identifier_field_ids(vec![2]) @@ -133,12 +134,11 @@ async fn test_scan_all_type() { .unwrap(); let table_creation = TableCreation::builder() - .name("t1".to_string()) + .name("t4".to_string()) .schema(schema.clone()) .build(); - let table = fixture - .rest_catalog + let table = rest_catalog .create_table(ns.name(), table_creation) .await .unwrap(); @@ -321,7 +321,7 @@ async fn test_scan_all_type() { let mut append_action = tx.fast_append(None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); let tx = append_action.apply().await.unwrap(); - let table = tx.commit(&fixture.rest_catalog).await.unwrap(); + let table = tx.commit(&rest_catalog).await.unwrap(); // check result let batch_stream = table diff --git a/crates/test_utils/src/docker.rs b/crates/test_utils/src/docker.rs index bde9737b1..42e6c39b9 100644 --- a/crates/test_utils/src/docker.rs +++ b/crates/test_utils/src/docker.rs @@ -65,7 +65,7 @@ impl DockerCompose { } } - pub fn run(&self) { + pub fn up(&self) { let mut cmd = Command::new("docker"); cmd.current_dir(&self.docker_compose_dir); @@ -91,6 +91,28 @@ impl DockerCompose { ) } + pub fn down(&self) { + let mut cmd = Command::new("docker"); + cmd.current_dir(&self.docker_compose_dir); + + cmd.args(vec![ + "compose", + "-p", + self.project_name.as_str(), + "down", + "-v", + "--remove-orphans", + ]); + + run_command( + cmd, + format!( + "Stopping docker compose in {}, project name: {}", + self.docker_compose_dir, self.project_name + ), + ) + } + pub fn get_container_ip(&self, service_name: impl AsRef) -> IpAddr { let container_name = format!("{}-{}-1", self.project_name, service_name.as_ref()); let mut cmd = Command::new("docker"); @@ -114,24 +136,6 @@ impl DockerCompose { impl Drop for DockerCompose { fn drop(&mut self) { - let mut cmd = Command::new("docker"); - cmd.current_dir(&self.docker_compose_dir); - - cmd.args(vec![ - "compose", - "-p", - self.project_name.as_str(), - "down", - "-v", - "--remove-orphans", - ]); - - run_command( - cmd, - format!( - "Stopping docker compose in {}, project name: {}", - self.docker_compose_dir, self.project_name - ), - ) + self.down() } }