diff --git a/crates/iceberg/src/metadata_scan.rs b/crates/iceberg/src/metadata_scan.rs index 7ba1b74ad..edbfa6c94 100644 --- a/crates/iceberg/src/metadata_scan.rs +++ b/crates/iceberg/src/metadata_scan.rs @@ -17,6 +17,7 @@ //! Metadata table api. +use std::collections::HashMap; use std::sync::Arc; use arrow_array::builder::{ @@ -26,6 +27,7 @@ use arrow_array::types::{Int32Type, Int64Type, Int8Type, TimestampMillisecondTyp use arrow_array::RecordBatch; use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit}; +use crate::arrow::{arrow_schema_to_schema, schema_to_arrow_schema}; use crate::spec::{ListType, NestedField, PrimitiveType, StructType, Type}; use crate::table::Table; use crate::Result; @@ -135,101 +137,66 @@ pub struct ManifestsTable<'a> { } impl<'a> ManifestsTable<'a> { - fn partition_summary_fields(&self) -> Vec { - vec![ - Field::new("contains_null", DataType::Boolean, false), - Field::new("contains_nan", DataType::Boolean, true), - Field::new("lower_bound", DataType::Utf8, true), - Field::new("upper_bound", DataType::Utf8, true), - ] - } - - /// Returns the fields of the manifests table. - fn fields(&self) -> Vec { - vec![ - Field::new("content", DataType::Int8, false), - Field::new("path", DataType::Utf8, false), - Field::new("length", DataType::Int64, false), - Field::new("partition_spec_id", DataType::Int32, false), - Field::new("added_snapshot_id", DataType::Int64, false), - Field::new("added_data_files_count", DataType::Int32, false), - Field::new("existing_data_files_count", DataType::Int32, false), - Field::new("deleted_data_files_count", DataType::Int32, false), - Field::new("added_delete_files_count", DataType::Int32, false), - Field::new("existing_delete_files_count", DataType::Int32, false), - Field::new("deleted_delete_files_count", DataType::Int32, false), - Field::new( - "partition_summaries", - DataType::List(Arc::new(Field::new_struct( - "item", - self.partition_summary_fields(), - false, - ))), - false, - ), - ] - } - /// Returns the iceberg schema of the manifests table. pub fn schema(&self) -> crate::spec::Schema { let fields = vec![ - NestedField::new(14, "content", Type::Primitive(PrimitiveType::Int), false), - NestedField::new(1, "path", Type::Primitive(PrimitiveType::String), false), - NestedField::new(2, "length", Type::Primitive(PrimitiveType::Long), false), + NestedField::new(14, "content", Type::Primitive(PrimitiveType::Int), true), + NestedField::new(1, "path", Type::Primitive(PrimitiveType::String), true), + NestedField::new(2, "length", Type::Primitive(PrimitiveType::Long), true), NestedField::new( 3, "partition_spec_id", Type::Primitive(PrimitiveType::Int), - false, + true, ), NestedField::new( 4, "added_snapshot_id", Type::Primitive(PrimitiveType::Long), - false, + true, ), NestedField::new( 5, "added_data_files_count", Type::Primitive(PrimitiveType::Int), - false, + true, ), NestedField::new( 6, "existing_data_files_count", Type::Primitive(PrimitiveType::Int), - false, + true, ), NestedField::new( 7, "deleted_data_files_count", Type::Primitive(PrimitiveType::Int), - false, + true, ), NestedField::new( 15, "added_delete_files_count", Type::Primitive(PrimitiveType::Int), - false, + true, ), NestedField::new( 16, "existing_delete_files_count", Type::Primitive(PrimitiveType::Int), - false, + true, ), NestedField::new( 17, "deleted_delete_files_count", Type::Primitive(PrimitiveType::Int), - false, + true, ), NestedField::new( 8, "partition_summaries", Type::List(ListType { element_field: Arc::new(NestedField::new( - 0, + 9, "item", Type::Struct(StructType::new(vec![ Arc::new(NestedField::new( @@ -257,10 +224,10 @@ impl<'a> ManifestsTable<'a> { false, )), ])), - false, + true, )), }), - false, + true, ), ]; @@ -272,7 +239,20 @@ impl<'a> ManifestsTable<'a> { /// Scans the manifests table. pub async fn scan(&self) -> Result { - let mut content = PrimitiveBuilder::::new(); + let schema = schema_to_arrow_schema(&self.schema())?; + let partition_summary_fields = if let DataType::List(list_type) = + schema.field_with_name("partition_summaries")?.data_type() + { + if let DataType::Struct(fields) = list_type.data_type() { + fields.to_vec() + } else { + unreachable!() + } + } else { + unreachable!() + }; + + let mut content = PrimitiveBuilder::::new(); let mut path = StringBuilder::new(); let mut length = PrimitiveBuilder::::new(); let mut partition_spec_id = PrimitiveBuilder::::new(); @@ -284,21 +264,21 @@ impl<'a> ManifestsTable<'a> { let mut existing_delete_files_count = PrimitiveBuilder::::new(); let mut deleted_delete_files_count = PrimitiveBuilder::::new(); let mut partition_summaries = ListBuilder::new(StructBuilder::from_fields( - Fields::from(self.partition_summary_fields()), + Fields::from(partition_summary_fields.clone()), 0, )) - .with_field(Arc::new(Field::new_struct( - "item", - self.partition_summary_fields(), - false, - ))); + .with_field(Arc::new( + Field::new_struct("item", partition_summary_fields, false).with_metadata( + HashMap::from([("PARQUET:field_id".to_string(), "9".to_string())]), + ), + )); if let Some(snapshot) = self.table.metadata().current_snapshot() { let manifest_list = snapshot .load_manifest_list(self.table.file_io(), &self.table.metadata_ref()) .await?; for manifest in manifest_list.entries() { - content.append_value(manifest.content as i8); + content.append_value(manifest.content as i32); path.append_value(manifest.manifest_path.clone()); length.append_value(manifest.manifest_length); partition_spec_id.append_value(manifest.partition_spec_id); @@ -339,7 +319,6 @@ impl<'a> ManifestsTable<'a> { } } - let schema = Schema::new(self.fields()); Ok(RecordBatch::try_new(Arc::new(schema), vec![ Arc::new(content.finish()), Arc::new(path.finish()), @@ -499,20 +478,20 @@ mod tests { check_record_batch( record_batch, expect![[r#" - Field { name: "content", data_type: Int8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "length", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "partition_spec_id", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "added_snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "added_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "existing_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "deleted_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "added_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "existing_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "deleted_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "partition_summaries", data_type: List(Field { name: "item", data_type: Struct([Field { name: "contains_null", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "contains_nan", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]], + Field { name: "content", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "14"} }, + Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} }, + Field { name: "length", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} }, + Field { name: "partition_spec_id", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} }, + Field { name: "added_snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "4"} }, + Field { name: "added_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "5"} }, + Field { name: "existing_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "6"} }, + Field { name: "deleted_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "7"} }, + Field { name: "added_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "15"} }, + Field { name: "existing_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "16"} }, + Field { name: "deleted_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "17"} }, + Field { name: "partition_summaries", data_type: List(Field { name: "item", data_type: Struct([Field { name: "contains_null", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "10"} }, Field { name: "contains_nan", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "11"} }, Field { name: "lower_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "12"} }, Field { name: "upper_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "13"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "9"} }), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "8"} }"#]], expect![[r#" - content: PrimitiveArray + content: PrimitiveArray [ 0, ],