diff --git a/Makefile b/Makefile index d846303ff..c34f6c97d 100644 --- a/Makefile +++ b/Makefile @@ -34,6 +34,9 @@ cargo-sort: check: check-fmt check-clippy cargo-sort +unit-test: + cargo test --no-fail-fast --lib --all-features --workspace + test: cargo test --no-fail-fast --all-targets --all-features --workspace cargo test --no-fail-fast --doc --all-features --workspace \ No newline at end of file diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index bc12edff7..b14b1295e 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -1311,511 +1311,452 @@ mod tests { use crate::spec::Type; use std::sync::Arc; - #[test] - fn test_parse_manifest_v2_unpartition() { - let path = format!( - "{}/testdata/unpartition_manifest_v2.avro", - env!("CARGO_MANIFEST_DIR") - ); - let bs = fs::read(path).expect("read_file must succeed"); - let manifest = Manifest::parse_avro(bs.as_slice()).unwrap(); - // test metadata - assert!(manifest.metadata.schema_id == 0); - assert_eq!(manifest.metadata.schema, { - let fields = vec![ - // id v_int v_long v_float v_double v_varchar v_bool v_date v_timestamp v_decimal v_ts_ntz - Arc::new(NestedField::optional( - 1, - "id", - Type::Primitive(PrimitiveType::Long), - )), - Arc::new(NestedField::optional( - 2, - "v_int", - Type::Primitive(PrimitiveType::Int), - )), - Arc::new(NestedField::optional( - 3, - "v_long", - Type::Primitive(PrimitiveType::Long), - )), - Arc::new(NestedField::optional( - 4, - "v_float", - Type::Primitive(PrimitiveType::Float), - )), - Arc::new(NestedField::optional( - 5, - "v_double", - Type::Primitive(PrimitiveType::Double), - )), - Arc::new(NestedField::optional( - 6, - "v_varchar", - Type::Primitive(PrimitiveType::String), - )), - Arc::new(NestedField::optional( - 7, - "v_bool", - Type::Primitive(PrimitiveType::Boolean), - )), - Arc::new(NestedField::optional( - 8, - "v_date", - Type::Primitive(PrimitiveType::Date), - )), - Arc::new(NestedField::optional( - 9, - "v_timestamp", - Type::Primitive(PrimitiveType::Timestamptz), - )), - Arc::new(NestedField::optional( - 10, - "v_decimal", - Type::Primitive(PrimitiveType::Decimal { - precision: 36, - scale: 10, - }), - )), - Arc::new(NestedField::optional( - 11, - "v_ts_ntz", - Type::Primitive(PrimitiveType::Timestamp), - )), - ]; - Schema::builder().with_fields(fields).build().unwrap() - }); - assert!(manifest.metadata.partition_spec.fields.is_empty()); - assert!(manifest.metadata.content == ManifestContentType::Data); - assert!(manifest.metadata.format_version == FormatVersion::V2); - // test entries - assert!(manifest.entries.len() == 1); - let entry = &manifest.entries[0]; - assert!(entry.status == ManifestStatus::Added); - assert!(entry.snapshot_id == Some(0)); - assert!(entry.sequence_number == Some(1)); - assert!(entry.file_sequence_number == Some(1)); - assert_eq!( - entry.data_file, - DataFile { - content: DataContentType::Data, - file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(), - file_format: DataFileFormat::Parquet, - partition: Struct::empty(), - record_count: 1, - file_size_in_bytes: 5442, - column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]), - value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]), - null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]), - nan_value_counts: HashMap::new(), - lower_bounds: HashMap::new(), - upper_bounds: HashMap::new(), - key_metadata: Vec::new(), - split_offsets: vec![4], - equality_ids: Vec::new(), - sort_order_id: None, - } - ); + #[tokio::test] + async fn test_parse_manifest_v2_unpartition() { + let manifest = Manifest { + metadata: ManifestMetadata { + schema_id: 0, + schema: Schema::builder() + .with_fields(vec![ + // id v_int v_long v_float v_double v_varchar v_bool v_date v_timestamp v_decimal v_ts_ntz + Arc::new(NestedField::optional( + 1, + "id", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + 2, + "v_int", + Type::Primitive(PrimitiveType::Int), + )), + Arc::new(NestedField::optional( + 3, + "v_long", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + 4, + "v_float", + Type::Primitive(PrimitiveType::Float), + )), + Arc::new(NestedField::optional( + 5, + "v_double", + Type::Primitive(PrimitiveType::Double), + )), + Arc::new(NestedField::optional( + 6, + "v_varchar", + Type::Primitive(PrimitiveType::String), + )), + Arc::new(NestedField::optional( + 7, + "v_bool", + Type::Primitive(PrimitiveType::Boolean), + )), + Arc::new(NestedField::optional( + 8, + "v_date", + Type::Primitive(PrimitiveType::Date), + )), + Arc::new(NestedField::optional( + 9, + "v_timestamp", + Type::Primitive(PrimitiveType::Timestamptz), + )), + Arc::new(NestedField::optional( + 10, + "v_decimal", + Type::Primitive(PrimitiveType::Decimal { + precision: 36, + scale: 10, + }), + )), + Arc::new(NestedField::optional( + 11, + "v_ts_ntz", + Type::Primitive(PrimitiveType::Timestamp), + )), + ]) + .build() + .unwrap(), + partition_spec: PartitionSpec { + spec_id: 0, + fields: vec![], + }, + content: ManifestContentType::Data, + format_version: FormatVersion::V2, + }, + entries: vec![ + ManifestEntry { + status: ManifestStatus::Added, + snapshot_id: None, + sequence_number: None, + file_sequence_number: None, + data_file: DataFile { + content: DataContentType::Data, + file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(), + file_format: DataFileFormat::Parquet, + partition: Struct::empty(), + record_count: 1, + file_size_in_bytes: 5442, + column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]), + value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]), + null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]), + nan_value_counts: HashMap::new(), + lower_bounds: HashMap::new(), + upper_bounds: HashMap::new(), + key_metadata: Vec::new(), + split_offsets: vec![4], + equality_ids: Vec::new(), + sort_order_id: None, + } + } + ] + }; + + let writer = |output_file: OutputFile| ManifestWriter::new(output_file, 1, vec![]); + + test_manifest_read_write(manifest, writer).await; } - #[test] - fn test_parse_manifest_v2_partition() { - let path = format!( - "{}/testdata/partition_manifest_v2.avro", - env!("CARGO_MANIFEST_DIR") - ); - let bs = fs::read(path).expect("read_file must succeed"); - let manifest = Manifest::parse_avro(bs.as_slice()).unwrap(); - assert_eq!(manifest.metadata.schema_id, 0); - assert_eq!(manifest.metadata.schema, { - let fields = vec![ - Arc::new(NestedField::optional( - 1, - "id", - Type::Primitive(PrimitiveType::Long), - )), - Arc::new(NestedField::optional( - 2, - "v_int", - Type::Primitive(PrimitiveType::Int), - )), - Arc::new(NestedField::optional( - 3, - "v_long", - Type::Primitive(PrimitiveType::Long), - )), - Arc::new(NestedField::optional( - 4, - "v_float", - Type::Primitive(PrimitiveType::Float), - )), - Arc::new(NestedField::optional( - 5, - "v_double", - Type::Primitive(PrimitiveType::Double), - )), - Arc::new(NestedField::optional( - 6, - "v_varchar", - Type::Primitive(PrimitiveType::String), - )), - Arc::new(NestedField::optional( - 7, - "v_bool", - Type::Primitive(PrimitiveType::Boolean), - )), - Arc::new(NestedField::optional( - 8, - "v_date", - Type::Primitive(PrimitiveType::Date), - )), - Arc::new(NestedField::optional( - 9, - "v_timestamp", - Type::Primitive(PrimitiveType::Timestamptz), - )), - Arc::new(NestedField::optional( - 10, - "v_decimal", - Type::Primitive(PrimitiveType::Decimal { - precision: 36, - scale: 10, - }), - )), - Arc::new(NestedField::optional( - 11, - "v_ts_ntz", - Type::Primitive(PrimitiveType::Timestamp), - )), - ]; - Schema::builder().with_fields(fields).build().unwrap() - }); - assert_eq!(manifest.metadata.partition_spec, { - let fields = vec![ - PartitionField { - name: "v_int".to_string(), - transform: Transform::Identity, - source_id: 2, - field_id: 1000, + #[tokio::test] + async fn test_parse_manifest_v2_partition() { + let manifest = Manifest { + metadata: ManifestMetadata { + schema_id: 0, + schema: Schema::builder() + .with_fields(vec![ + Arc::new(NestedField::optional( + 1, + "id", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + 2, + "v_int", + Type::Primitive(PrimitiveType::Int), + )), + Arc::new(NestedField::optional( + 3, + "v_long", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + 4, + "v_float", + Type::Primitive(PrimitiveType::Float), + )), + Arc::new(NestedField::optional( + 5, + "v_double", + Type::Primitive(PrimitiveType::Double), + )), + Arc::new(NestedField::optional( + 6, + "v_varchar", + Type::Primitive(PrimitiveType::String), + )), + Arc::new(NestedField::optional( + 7, + "v_bool", + Type::Primitive(PrimitiveType::Boolean), + )), + Arc::new(NestedField::optional( + 8, + "v_date", + Type::Primitive(PrimitiveType::Date), + )), + Arc::new(NestedField::optional( + 9, + "v_timestamp", + Type::Primitive(PrimitiveType::Timestamptz), + )), + Arc::new(NestedField::optional( + 10, + "v_decimal", + Type::Primitive(PrimitiveType::Decimal { + precision: 36, + scale: 10, + }), + )), + Arc::new(NestedField::optional( + 11, + "v_ts_ntz", + Type::Primitive(PrimitiveType::Timestamp), + )), + ]) + .build() + .unwrap(), + partition_spec: PartitionSpec { + spec_id: 0, + fields: vec![ + PartitionField { + name: "v_int".to_string(), + transform: Transform::Identity, + source_id: 2, + field_id: 1000, + }, + PartitionField { + name: "v_long".to_string(), + transform: Transform::Identity, + source_id: 3, + field_id: 1001, + }, + ], }, - PartitionField { - name: "v_long".to_string(), - transform: Transform::Identity, - source_id: 3, - field_id: 1001, + content: ManifestContentType::Data, + format_version: FormatVersion::V2, + }, + entries: vec![ManifestEntry { + status: ManifestStatus::Added, + snapshot_id: None, + sequence_number: None, + file_sequence_number: None, + data_file: DataFile { + content: DataContentType::Data, + file_format: DataFileFormat::Parquet, + file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet".to_string(), + partition: Struct::from_iter( + vec![ + (1000, Some(Literal::int(1)), "v_int".to_string()), + (1001, Some(Literal::long(1000)), "v_long".to_string()) + ] + .into_iter() + ), + record_count: 1, + file_size_in_bytes: 5442, + column_sizes: HashMap::from([ + (0, 73), + (6, 34), + (2, 73), + (7, 61), + (3, 61), + (5, 62), + (9, 79), + (10, 73), + (1, 61), + (4, 73), + (8, 73) + ]), + value_counts: HashMap::from([ + (4, 1), + (5, 1), + (2, 1), + (0, 1), + (3, 1), + (6, 1), + (8, 1), + (1, 1), + (10, 1), + (7, 1), + (9, 1) + ]), + null_value_counts: HashMap::from([ + (1, 0), + (6, 0), + (2, 0), + (8, 0), + (0, 0), + (3, 0), + (5, 0), + (9, 0), + (7, 0), + (4, 0), + (10, 0) + ]), + nan_value_counts: HashMap::new(), + lower_bounds: HashMap::new(), + upper_bounds: HashMap::new(), + key_metadata: vec![], + split_offsets: vec![4], + equality_ids: vec![], + sort_order_id: None, }, - ]; - PartitionSpec { spec_id: 0, fields } - }); - assert!(manifest.metadata.content == ManifestContentType::Data); - assert!(manifest.metadata.format_version == FormatVersion::V2); - assert_eq!(manifest.entries.len(), 1); - let entry = &manifest.entries[0]; - assert_eq!(entry.status, ManifestStatus::Added); - assert_eq!(entry.snapshot_id, Some(0)); - assert_eq!(entry.sequence_number, Some(1)); - assert_eq!(entry.file_sequence_number, Some(1)); - assert_eq!(entry.data_file.content, DataContentType::Data); - assert_eq!( - entry.data_file.file_path, - "s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet" - ); - assert_eq!(entry.data_file.file_format, DataFileFormat::Parquet); - assert_eq!( - entry.data_file.partition, - Struct::from_iter( - vec![ - (1000, Some(Literal::int(1)), "v_int".to_string()), - (1001, Some(Literal::long(1000)), "v_long".to_string()) - ] - .into_iter() - ) - ); - assert_eq!(entry.data_file.record_count, 1); - assert_eq!(entry.data_file.file_size_in_bytes, 5442); - assert_eq!( - entry.data_file.column_sizes, - HashMap::from([ - (0, 73), - (6, 34), - (2, 73), - (7, 61), - (3, 61), - (5, 62), - (9, 79), - (10, 73), - (1, 61), - (4, 73), - (8, 73) - ]) - ); - assert_eq!( - entry.data_file.value_counts, - HashMap::from([ - (4, 1), - (5, 1), - (2, 1), - (0, 1), - (3, 1), - (6, 1), - (8, 1), - (1, 1), - (10, 1), - (7, 1), - (9, 1) - ]) - ); - assert_eq!( - entry.data_file.null_value_counts, - HashMap::from([ - (1, 0), - (6, 0), - (2, 0), - (8, 0), - (0, 0), - (3, 0), - (5, 0), - (9, 0), - (7, 0), - (4, 0), - (10, 0) - ]) - ); - assert!(entry.data_file.nan_value_counts.is_empty()); - assert!(entry.data_file.lower_bounds.is_empty()); - assert!(entry.data_file.upper_bounds.is_empty()); - assert!(entry.data_file.key_metadata.is_empty()); - assert_eq!(entry.data_file.split_offsets, vec![4]); - assert!(entry.data_file.equality_ids.is_empty()); - assert_eq!(entry.data_file.sort_order_id, None); - } + }], + }; - #[test] - fn test_parse_manifest_v1_unpartition() { - let path = format!( - "{}/testdata/unpartition_manifest_v1.avro", - env!("CARGO_MANIFEST_DIR") - ); - let bs = fs::read(path).expect("read_file must succeed"); - let manifest = Manifest::parse_avro(bs.as_slice()).unwrap(); - // test metadata - assert!(manifest.metadata.schema_id == 0); - assert_eq!(manifest.metadata.schema, { - let fields = vec![ - Arc::new(NestedField::optional( - 1, - "id", - Type::Primitive(PrimitiveType::Int), - )), - Arc::new(NestedField::optional( - 2, - "data", - Type::Primitive(PrimitiveType::String), - )), - Arc::new(NestedField::optional( - 3, - "comment", - Type::Primitive(PrimitiveType::String), - )), - ]; - Schema::builder() - .with_schema_id(1) - .with_fields(fields) - .build() - .unwrap() - }); - assert!(manifest.metadata.partition_spec.fields.is_empty()); - assert!(manifest.metadata.content == ManifestContentType::Data); - assert!(manifest.metadata.format_version == FormatVersion::V1); - assert_eq!(manifest.entries.len(), 4); - let entry = &manifest.entries[0]; - assert!(entry.status == ManifestStatus::Added); - assert!(entry.snapshot_id == Some(2966623707104393227)); - assert!(entry.sequence_number.is_none()); - assert!(entry.file_sequence_number.is_none()); - assert_eq!( - entry.data_file, - DataFile { - content: DataContentType::Data, - file_path: "s3://testbucket/iceberg_data/iceberg_ctl/iceberg_db/iceberg_tbl/data/00000-7-45268d71-54eb-476c-b42c-942d880c04a1-00001.parquet".to_string(), - file_format: DataFileFormat::Parquet, - partition: Struct::empty(), - record_count: 1, - file_size_in_bytes: 875, - column_sizes: HashMap::from([(1,47),(2,48),(3,52)]), - value_counts: HashMap::from([(1,1),(2,1),(3,1)]), - null_value_counts: HashMap::from([(1,0),(2,0),(3,0)]), - nan_value_counts: HashMap::new(), - lower_bounds: HashMap::from([(1,Literal::int(1)),(2,Literal::string("a")),(3,Literal::string("AC/DC"))]), - upper_bounds: HashMap::from([(1,Literal::int(1)),(2,Literal::string("a")),(3,Literal::string("AC/DC"))]), - key_metadata: vec![], - split_offsets: vec![4], - equality_ids: vec![], - sort_order_id: Some(0), - } - ); + let writer = |output_file: OutputFile| ManifestWriter::new(output_file, 1, vec![]); + + let res = test_manifest_read_write(manifest, writer).await; + + assert_eq!(res.sequence_number, UNASSIGNED_SEQUENCE_NUMBER); + assert_eq!(res.min_sequence_number, UNASSIGNED_SEQUENCE_NUMBER); } - #[test] - fn test_parse_manifest_v1_partition() { - let path = format!( - "{}/testdata/partition_manifest_v1.avro", - env!("CARGO_MANIFEST_DIR") - ); - let bs = fs::read(path).expect("read_file must succeed"); - let manifest = Manifest::parse_avro(bs.as_slice()).unwrap(); - // test metadata - assert!(manifest.metadata.schema_id == 0); - assert_eq!(manifest.metadata.schema, { - let fields = vec![ - Arc::new(NestedField::optional( - 1, - "id", - Type::Primitive(PrimitiveType::Long), - )), - Arc::new(NestedField::optional( - 2, - "data", - Type::Primitive(PrimitiveType::String), - )), - Arc::new(NestedField::optional( - 3, - "category", - Type::Primitive(PrimitiveType::String), - )), - ]; - Schema::builder().with_fields(fields).build().unwrap() - }); - assert_eq!(manifest.metadata.partition_spec, { - let fields = vec![PartitionField { - name: "category".to_string(), - transform: Transform::Identity, - source_id: 3, - field_id: 1000, - }]; - PartitionSpec { spec_id: 0, fields } - }); - assert!(manifest.metadata.content == ManifestContentType::Data); - assert!(manifest.metadata.format_version == FormatVersion::V1); - - // test entries - assert!(manifest.entries.len() == 1); - let entry = &manifest.entries[0]; - assert!(entry.status == ManifestStatus::Added); - assert!(entry.snapshot_id == Some(8205833995881562618)); - assert!(entry.sequence_number.is_none()); - assert!(entry.file_sequence_number.is_none()); - assert_eq!(entry.data_file.content, DataContentType::Data); - assert_eq!( - entry.data_file.file_path, - "s3://testbucket/prod/db/sample/data/category=x/00010-1-d5c93668-1e52-41ac-92a6-bba590cbf249-00001.parquet" - ); - assert_eq!(entry.data_file.file_format, DataFileFormat::Parquet); - assert_eq!( - entry.data_file.partition, - Struct::from_iter( - vec![( - 1000, - Some( - Literal::try_from_bytes(&[120], &Type::Primitive(PrimitiveType::String)) - .unwrap() - ), - "category".to_string() - )] - .into_iter() - ) - ); - assert_eq!(entry.data_file.record_count, 1); - assert_eq!(entry.data_file.file_size_in_bytes, 874); - assert_eq!( - entry.data_file.column_sizes, - HashMap::from([(1, 46), (2, 48), (3, 48)]) - ); - assert_eq!( - entry.data_file.value_counts, - HashMap::from([(1, 1), (2, 1), (3, 1)]) - ); - assert_eq!( - entry.data_file.null_value_counts, - HashMap::from([(1, 0), (2, 0), (3, 0)]) - ); - assert_eq!(entry.data_file.nan_value_counts, HashMap::new()); - assert_eq!( - entry.data_file.lower_bounds, - HashMap::from([ - (1, Literal::long(1)), - (2, Literal::string("a")), - (3, Literal::string("x")) - ]) - ); - assert_eq!( - entry.data_file.upper_bounds, - HashMap::from([ - (1, Literal::long(1)), - (2, Literal::string("a")), - (3, Literal::string("x")) - ]) - ); - assert!(entry.data_file.key_metadata.is_empty()); - assert_eq!(entry.data_file.split_offsets, vec![4]); - assert!(entry.data_file.equality_ids.is_empty()); - assert_eq!(entry.data_file.sort_order_id, Some(0)); + #[tokio::test] + async fn test_parse_manifest_v1_unpartition() { + let manifest = Manifest { + metadata: ManifestMetadata { + schema_id: 1, + schema: Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + Arc::new(NestedField::optional( + 1, + "id", + Type::Primitive(PrimitiveType::Int), + )), + Arc::new(NestedField::optional( + 2, + "data", + Type::Primitive(PrimitiveType::String), + )), + Arc::new(NestedField::optional( + 3, + "comment", + Type::Primitive(PrimitiveType::String), + )), + ]) + .build() + .unwrap(), + partition_spec: PartitionSpec { + spec_id: 0, + fields: vec![], + }, + content: ManifestContentType::Data, + format_version: FormatVersion::V1, + }, + entries: vec![ManifestEntry { + status: ManifestStatus::Added, + snapshot_id: Some(0), + sequence_number: None, + file_sequence_number: None, + data_file: DataFile { + content: DataContentType::Data, + file_path: "s3://testbucket/iceberg_data/iceberg_ctl/iceberg_db/iceberg_tbl/data/00000-7-45268d71-54eb-476c-b42c-942d880c04a1-00001.parquet".to_string(), + file_format: DataFileFormat::Parquet, + partition: Struct::empty(), + record_count: 1, + file_size_in_bytes: 875, + column_sizes: HashMap::from([(1,47),(2,48),(3,52)]), + value_counts: HashMap::from([(1,1),(2,1),(3,1)]), + null_value_counts: HashMap::from([(1,0),(2,0),(3,0)]), + nan_value_counts: HashMap::new(), + lower_bounds: HashMap::from([(1,Literal::int(1)),(2,Literal::string("a")),(3,Literal::string("AC/DC"))]), + upper_bounds: HashMap::from([(1,Literal::int(1)),(2,Literal::string("a")),(3,Literal::string("AC/DC"))]), + key_metadata: vec![], + split_offsets: vec![4], + equality_ids: vec![], + sort_order_id: Some(0), + } + }], + }; + + let writer = + |output_file: OutputFile| ManifestWriter::new(output_file, 2966623707104393227, vec![]); + + test_manifest_read_write(manifest, writer).await; } #[tokio::test] - async fn test_writer_manifest_v1_partition() { - // Read manifest - let path = format!( - "{}/testdata/partition_manifest_v1.avro", - env!("CARGO_MANIFEST_DIR") - ); - let bs = fs::read(path).expect("read_file must succeed"); - let manifest = Manifest::parse_avro(bs.as_slice()).unwrap(); + async fn test_parse_manifest_v1_partition() { + let manifest = Manifest { + metadata: ManifestMetadata { + schema_id: 0, + schema: Schema::builder() + .with_fields(vec![ + Arc::new(NestedField::optional( + 1, + "id", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + 2, + "data", + Type::Primitive(PrimitiveType::String), + )), + Arc::new(NestedField::optional( + 3, + "category", + Type::Primitive(PrimitiveType::String), + )), + ]) + .build() + .unwrap(), + partition_spec: PartitionSpec { + spec_id: 0, + fields: vec![PartitionField { + name: "category".to_string(), + transform: Transform::Identity, + source_id: 3, + field_id: 1000, + }], + }, + content: ManifestContentType::Data, + format_version: FormatVersion::V1, + }, + entries: vec![ + ManifestEntry { + status: ManifestStatus::Added, + snapshot_id: Some(0), + sequence_number: None, + file_sequence_number: None, + data_file: DataFile { + content: DataContentType::Data, + file_path: "s3://testbucket/prod/db/sample/data/category=x/00010-1-d5c93668-1e52-41ac-92a6-bba590cbf249-00001.parquet".to_string(), + file_format: DataFileFormat::Parquet, + partition: Struct::from_iter( + vec![( + 1000, + Some( + Literal::try_from_bytes(&[120], &Type::Primitive(PrimitiveType::String)) + .unwrap() + ), + "category".to_string() + )] + .into_iter() + ), + record_count: 1, + file_size_in_bytes: 874, + column_sizes: HashMap::from([(1, 46), (2, 48), (3, 48)]), + value_counts: HashMap::from([(1, 1), (2, 1), (3, 1)]), + null_value_counts: HashMap::from([(1, 0), (2, 0), (3, 0)]), + nan_value_counts: HashMap::new(), + lower_bounds: HashMap::from([ + (1, Literal::long(1)), + (2, Literal::string("a")), + (3, Literal::string("x")) + ]), + upper_bounds: HashMap::from([ + (1, Literal::long(1)), + (2, Literal::string("a")), + (3, Literal::string("x")) + ]), + key_metadata: vec![], + split_offsets: vec![4], + equality_ids: vec![], + sort_order_id: Some(0), + }, + } + ] + }; - // Write manifest - let temp_dir = TempDir::new().unwrap(); - let path = temp_dir.path().join("manifest_list_v1.avro"); - let io = FileIOBuilder::new_fs_io().build().unwrap(); - let output_file = io.new_output(path.to_str().unwrap()).unwrap(); - let writer = ManifestWriter::new(output_file, 1, vec![]); - let entry = writer.write(manifest.clone()).await.unwrap(); + let writer = |output_file: OutputFile| ManifestWriter::new(output_file, 1, vec![]); + + let entry = test_manifest_read_write(manifest, writer).await; - // Check partition summary assert_eq!(entry.partitions.len(), 1); assert_eq!(entry.partitions[0].lower_bound, Some(Literal::string("x"))); assert_eq!(entry.partitions[0].upper_bound, Some(Literal::string("x"))); - - // Verify manifest - let bs = fs::read(path).expect("read_file must succeed"); - let actual_manifest = Manifest::parse_avro(bs.as_slice()).unwrap(); - - assert_eq!(actual_manifest, manifest); } - #[tokio::test] - async fn test_writer_manifest_v2_partition() { - // Read manifest - let path = format!( - "{}/testdata/partition_manifest_v2.avro", - env!("CARGO_MANIFEST_DIR") - ); - let bs = fs::read(path).expect("read_file must succeed"); - let manifest = Manifest::parse_avro(bs.as_slice()).unwrap(); - - // Write manifest + async fn test_manifest_read_write( + manifest: Manifest, + writer_builder: impl FnOnce(OutputFile) -> ManifestWriter, + ) -> ManifestListEntry { let temp_dir = TempDir::new().unwrap(); - let path = temp_dir.path().join("manifest_list_v2.avro"); + let path = temp_dir.path().join("test_manifest.avro"); let io = FileIOBuilder::new_fs_io().build().unwrap(); let output_file = io.new_output(path.to_str().unwrap()).unwrap(); - let writer = ManifestWriter::new(output_file, 1, vec![]); + let writer = writer_builder(output_file); let res = writer.write(manifest.clone()).await.unwrap(); - assert_eq!(res.sequence_number, UNASSIGNED_SEQUENCE_NUMBER); - assert_eq!(res.min_sequence_number, 1); // Verify manifest let bs = fs::read(path).expect("read_file must succeed"); let actual_manifest = Manifest::parse_avro(bs.as_slice()).unwrap(); assert_eq!(actual_manifest, manifest); + res } } diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index db0c30c2c..a1ce291ea 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -1047,55 +1047,104 @@ mod test { use super::_serde::ManifestListV2; - #[test] - fn test_parse_manifest_list_v1() { - let path = format!( - "{}/testdata/simple_manifest_list_v1.avro", - env!("CARGO_MANIFEST_DIR") + #[tokio::test] + async fn test_parse_manifest_list_v1() { + let manifest_list = ManifestList { + entries: vec![ + ManifestListEntry { + manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(), + manifest_length: 5806, + partition_spec_id: 0, + content: ManifestContentType::Data, + sequence_number: 0, + min_sequence_number: 0, + added_snapshot_id: 1646658105718557341, + added_data_files_count: Some(3), + existing_data_files_count: Some(0), + deleted_data_files_count: Some(0), + added_rows_count: Some(3), + existing_rows_count: Some(0), + deleted_rows_count: Some(0), + partitions: vec![], + key_metadata: vec![], + } + ] + }; + + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + + let tmp_dir = TempDir::new().unwrap(); + let file_name = "simple_manifest_list_v1.avro"; + let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name); + + let mut writer = ManifestListWriter::v1( + file_io.new_output(full_path.clone()).unwrap(), + 1646658105718557341, + 1646658105718557341, ); - let bs = fs::read(path).expect("read_file must succeed"); + writer + .add_manifest_entries(manifest_list.entries.clone().into_iter()) + .unwrap(); + writer.close().await.unwrap(); - let manifest_list = ManifestList::parse_with_version( + let bs = fs::read(full_path).expect("read_file must succeed"); + + let parsed_manifest_list = ManifestList::parse_with_version( &bs, crate::spec::FormatVersion::V1, &StructType::new(vec![]), ) .unwrap(); - assert_eq!(1, manifest_list.entries.len()); - assert_eq!( - manifest_list.entries[0], - ManifestListEntry { - manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(), - manifest_length: 5806, - partition_spec_id: 0, - content: ManifestContentType::Data, - sequence_number: 0, - min_sequence_number: 0, - added_snapshot_id: 1646658105718557341, - added_data_files_count: Some(3), - existing_data_files_count: Some(0), - deleted_data_files_count: Some(0), - added_rows_count: Some(3), - existing_rows_count: Some(0), - deleted_rows_count: Some(0), - partitions: vec![], - key_metadata: vec![], - } - ); + assert_eq!(manifest_list, parsed_manifest_list); } - #[test] - fn test_parse_manifest_list_v2() { - let path = format!( - "{}/testdata/simple_manifest_list_v2.avro", - env!("CARGO_MANIFEST_DIR") + #[tokio::test] + async fn test_parse_manifest_list_v2() { + let manifest_list = ManifestList { + entries: vec![ + ManifestListEntry { + manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(), + manifest_length: 6926, + partition_spec_id: 1, + content: ManifestContentType::Data, + sequence_number: 1, + min_sequence_number: 1, + added_snapshot_id: 377075049360453639, + added_data_files_count: Some(1), + existing_data_files_count: Some(0), + deleted_data_files_count: Some(0), + added_rows_count: Some(3), + existing_rows_count: Some(0), + deleted_rows_count: Some(0), + partitions: vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Literal::long(1)), upper_bound: Some(Literal::long(1))}], + key_metadata: vec![], + } + ] + }; + + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + + let tmp_dir = TempDir::new().unwrap(); + let file_name = "simple_manifest_list_v1.avro"; + let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name); + + let mut writer = ManifestListWriter::v2( + file_io.new_output(full_path.clone()).unwrap(), + 1646658105718557341, + 1646658105718557341, + 1, ); - let bs = fs::read(path).expect("read_file must succeed"); + writer + .add_manifest_entries(manifest_list.entries.clone().into_iter()) + .unwrap(); + writer.close().await.unwrap(); - let manifest_list = ManifestList::parse_with_version( + let bs = fs::read(full_path).expect("read_file must succeed"); + + let parsed_manifest_list = ManifestList::parse_with_version( &bs, crate::spec::FormatVersion::V2, &StructType::new(vec![Arc::new(NestedField::required( @@ -1106,27 +1155,7 @@ mod test { ) .unwrap(); - assert_eq!(1, manifest_list.entries.len()); - assert_eq!( - manifest_list.entries[0], - ManifestListEntry { - manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(), - manifest_length: 6926, - partition_spec_id: 1, - content: ManifestContentType::Data, - sequence_number: 1, - min_sequence_number: 1, - added_snapshot_id: 377075049360453639, - added_data_files_count: Some(1), - existing_data_files_count: Some(0), - deleted_data_files_count: Some(0), - added_rows_count: Some(3), - existing_rows_count: Some(0), - deleted_rows_count: Some(0), - partitions: vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Literal::long(1)), upper_bound: Some(Literal::long(1))}], - key_metadata: vec![], - } - ); + assert_eq!(manifest_list, parsed_manifest_list); } #[test] diff --git a/crates/iceberg/testdata/partition_manifest_v1.avro b/crates/iceberg/testdata/partition_manifest_v1.avro deleted file mode 100644 index d1ada6188..000000000 Binary files a/crates/iceberg/testdata/partition_manifest_v1.avro and /dev/null differ diff --git a/crates/iceberg/testdata/partition_manifest_v2.avro b/crates/iceberg/testdata/partition_manifest_v2.avro deleted file mode 100644 index 9d10a8711..000000000 Binary files a/crates/iceberg/testdata/partition_manifest_v2.avro and /dev/null differ diff --git a/crates/iceberg/testdata/simple_manifest_list_v1.avro b/crates/iceberg/testdata/simple_manifest_list_v1.avro deleted file mode 100644 index bcdd126de..000000000 Binary files a/crates/iceberg/testdata/simple_manifest_list_v1.avro and /dev/null differ diff --git a/crates/iceberg/testdata/simple_manifest_list_v2.avro b/crates/iceberg/testdata/simple_manifest_list_v2.avro deleted file mode 100644 index 852d77313..000000000 Binary files a/crates/iceberg/testdata/simple_manifest_list_v2.avro and /dev/null differ diff --git a/crates/iceberg/testdata/unpartition_manifest_v1.avro b/crates/iceberg/testdata/unpartition_manifest_v1.avro deleted file mode 100644 index a4ef37646..000000000 Binary files a/crates/iceberg/testdata/unpartition_manifest_v1.avro and /dev/null differ diff --git a/crates/iceberg/testdata/unpartition_manifest_v2.avro b/crates/iceberg/testdata/unpartition_manifest_v2.avro deleted file mode 100644 index 7652159f3..000000000 Binary files a/crates/iceberg/testdata/unpartition_manifest_v2.avro and /dev/null differ