diff --git a/crates/iceberg/src/inspect/manifests.rs b/crates/iceberg/src/inspect/manifests.rs index ab63d2f6e..e94e48a45 100644 --- a/crates/iceberg/src/inspect/manifests.rs +++ b/crates/iceberg/src/inspect/manifests.rs @@ -15,17 +15,20 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; use std::sync::Arc; use arrow_array::builder::{ - BooleanBuilder, ListBuilder, PrimitiveBuilder, StringBuilder, StructBuilder, + BooleanBuilder, GenericListBuilder, ListBuilder, PrimitiveBuilder, StringBuilder, StructBuilder, }; -use arrow_array::types::{Int32Type, Int64Type, Int8Type}; +use arrow_array::types::{Int32Type, Int64Type}; use arrow_array::RecordBatch; -use arrow_schema::{DataType, Field, Fields, Schema}; +use arrow_schema::{DataType, Field, Fields}; use futures::{stream, StreamExt}; +use crate::arrow::schema_to_arrow_schema; use crate::scan::ArrowRecordBatchStream; +use crate::spec::{FieldSummary, ListType, NestedField, PrimitiveType, StructType, Type}; use crate::table::Table; use crate::Result; @@ -40,44 +43,111 @@ impl<'a> ManifestsTable<'a> { Self { table } } - fn partition_summary_fields() -> Vec { - vec![ - Field::new("contains_null", DataType::Boolean, false), - Field::new("contains_nan", DataType::Boolean, true), - Field::new("lower_bound", DataType::Utf8, true), - Field::new("upper_bound", DataType::Utf8, true), - ] - } - - /// Returns the schema of the manifests table. - pub fn schema(&self) -> Schema { - Schema::new(vec![ - Field::new("content", DataType::Int8, false), - Field::new("path", DataType::Utf8, false), - Field::new("length", DataType::Int64, false), - Field::new("partition_spec_id", DataType::Int32, false), - Field::new("added_snapshot_id", DataType::Int64, false), - Field::new("added_data_files_count", DataType::Int32, false), - Field::new("existing_data_files_count", DataType::Int32, false), - Field::new("deleted_data_files_count", DataType::Int32, false), - Field::new("added_delete_files_count", DataType::Int32, false), - Field::new("existing_delete_files_count", DataType::Int32, false), - Field::new("deleted_delete_files_count", DataType::Int32, false), - Field::new( + /// Returns the iceberg schema of the manifests table. + pub fn schema(&self) -> crate::spec::Schema { + let fields = vec![ + NestedField::new(14, "content", Type::Primitive(PrimitiveType::Int), true), + NestedField::new(1, "path", Type::Primitive(PrimitiveType::String), true), + NestedField::new(2, "length", Type::Primitive(PrimitiveType::Long), true), + NestedField::new( + 3, + "partition_spec_id", + Type::Primitive(PrimitiveType::Int), + true, + ), + NestedField::new( + 4, + "added_snapshot_id", + Type::Primitive(PrimitiveType::Long), + true, + ), + NestedField::new( + 5, + "added_data_files_count", + Type::Primitive(PrimitiveType::Int), + true, + ), + NestedField::new( + 6, + "existing_data_files_count", + Type::Primitive(PrimitiveType::Int), + true, + ), + NestedField::new( + 7, + "deleted_data_files_count", + Type::Primitive(PrimitiveType::Int), + true, + ), + NestedField::new( + 15, + "added_delete_files_count", + Type::Primitive(PrimitiveType::Int), + true, + ), + NestedField::new( + 16, + "existing_delete_files_count", + Type::Primitive(PrimitiveType::Int), + true, + ), + NestedField::new( + 17, + "deleted_delete_files_count", + Type::Primitive(PrimitiveType::Int), + true, + ), + NestedField::new( + 8, "partition_summaries", - DataType::List(Arc::new(Field::new_struct( - "item", - Self::partition_summary_fields(), - false, - ))), - false, + Type::List(ListType { + element_field: Arc::new(NestedField::new( + 9, + "item", + Type::Struct(StructType::new(vec![ + Arc::new(NestedField::new( + 10, + "contains_null", + Type::Primitive(PrimitiveType::Boolean), + true, + )), + Arc::new(NestedField::new( + 11, + "contains_nan", + Type::Primitive(PrimitiveType::Boolean), + false, + )), + Arc::new(NestedField::new( + 12, + "lower_bound", + Type::Primitive(PrimitiveType::String), + false, + )), + Arc::new(NestedField::new( + 13, + "upper_bound", + Type::Primitive(PrimitiveType::String), + false, + )), + ])), + true, + )), + }), + true, ), - ]) + ]; + + crate::spec::Schema::builder() + .with_fields(fields.into_iter().map(|f| f.into())) + .build() + .unwrap() } /// Scans the manifests table. pub async fn scan(&self) -> Result { - let mut content = PrimitiveBuilder::::new(); + let schema = schema_to_arrow_schema(&self.schema())?; + + let mut content = PrimitiveBuilder::::new(); let mut path = StringBuilder::new(); let mut length = PrimitiveBuilder::::new(); let mut partition_spec_id = PrimitiveBuilder::::new(); @@ -88,22 +158,14 @@ impl<'a> ManifestsTable<'a> { let mut added_delete_files_count = PrimitiveBuilder::::new(); let mut existing_delete_files_count = PrimitiveBuilder::::new(); let mut deleted_delete_files_count = PrimitiveBuilder::::new(); - let mut partition_summaries = ListBuilder::new(StructBuilder::from_fields( - Fields::from(Self::partition_summary_fields()), - 0, - )) - .with_field(Arc::new(Field::new_struct( - "item", - Self::partition_summary_fields(), - false, - ))); + let mut partition_summaries = self.partition_summary_builder()?; if let Some(snapshot) = self.table.metadata().current_snapshot() { let manifest_list = snapshot .load_manifest_list(self.table.file_io(), &self.table.metadata_ref()) .await?; for manifest in manifest_list.entries() { - content.append_value(manifest.content as i8); + content.append_value(manifest.content as i32); path.append_value(manifest.manifest_path.clone()); length.append_value(manifest.manifest_length); partition_spec_id.append_value(manifest.partition_spec_id); @@ -119,32 +181,11 @@ impl<'a> ManifestsTable<'a> { .append_value(manifest.existing_files_count.unwrap_or(0) as i32); deleted_delete_files_count .append_value(manifest.deleted_files_count.unwrap_or(0) as i32); - - let partition_summaries_builder = partition_summaries.values(); - for summary in &manifest.partitions { - partition_summaries_builder - .field_builder::(0) - .unwrap() - .append_value(summary.contains_null); - partition_summaries_builder - .field_builder::(1) - .unwrap() - .append_option(summary.contains_nan); - partition_summaries_builder - .field_builder::(2) - .unwrap() - .append_option(summary.lower_bound.as_ref().map(|v| v.to_string())); - partition_summaries_builder - .field_builder::(3) - .unwrap() - .append_option(summary.upper_bound.as_ref().map(|v| v.to_string())); - partition_summaries_builder.append(true); - } - partition_summaries.append(true); + self.append_partition_summaries(&mut partition_summaries, &manifest.partitions); } } - let batch = RecordBatch::try_new(Arc::new(self.schema()), vec![ + let batch = RecordBatch::try_new(Arc::new(schema), vec![ Arc::new(content.finish()), Arc::new(path.finish()), Arc::new(length.finish()), @@ -158,9 +199,60 @@ impl<'a> ManifestsTable<'a> { Arc::new(deleted_delete_files_count.finish()), Arc::new(partition_summaries.finish()), ])?; - Ok(stream::iter(vec![Ok(batch)]).boxed()) } + + fn partition_summary_builder(&self) -> Result> { + let schema = schema_to_arrow_schema(&self.schema())?; + let partition_summary_fields = + match schema.field_with_name("partition_summaries")?.data_type() { + DataType::List(list_type) => match list_type.data_type() { + DataType::Struct(fields) => fields.to_vec(), + _ => unreachable!(), + }, + _ => unreachable!(), + }; + + let partition_summaries = ListBuilder::new(StructBuilder::from_fields( + Fields::from(partition_summary_fields.clone()), + 0, + )) + .with_field(Arc::new( + Field::new_struct("item", partition_summary_fields, false).with_metadata( + HashMap::from([("PARQUET:field_id".to_string(), "9".to_string())]), + ), + )); + + Ok(partition_summaries) + } + + fn append_partition_summaries( + &self, + builder: &mut GenericListBuilder, + partitions: &[FieldSummary], + ) { + let partition_summaries_builder = builder.values(); + for summary in partitions { + partition_summaries_builder + .field_builder::(0) + .unwrap() + .append_value(summary.contains_null); + partition_summaries_builder + .field_builder::(1) + .unwrap() + .append_option(summary.contains_nan); + partition_summaries_builder + .field_builder::(2) + .unwrap() + .append_option(summary.lower_bound.as_ref().map(|v| v.to_string())); + partition_summaries_builder + .field_builder::(3) + .unwrap() + .append_option(summary.upper_bound.as_ref().map(|v| v.to_string())); + partition_summaries_builder.append(true); + } + builder.append(true); + } } #[cfg(test)] @@ -175,25 +267,25 @@ mod tests { let mut fixture = TableTestFixture::new(); fixture.setup_manifest_files().await; - let batch_stream = fixture.table.inspect().manifests().scan().await.unwrap(); + let record_batch = fixture.table.inspect().manifests().scan().await.unwrap(); check_record_batches( - batch_stream, + record_batch, expect![[r#" - Field { name: "content", data_type: Int8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "length", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "partition_spec_id", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "added_snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "added_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "existing_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "deleted_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "added_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "existing_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "deleted_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "partition_summaries", data_type: List(Field { name: "item", data_type: Struct([Field { name: "contains_null", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "contains_nan", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]], + Field { name: "content", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "14"} }, + Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} }, + Field { name: "length", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} }, + Field { name: "partition_spec_id", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} }, + Field { name: "added_snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "4"} }, + Field { name: "added_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "5"} }, + Field { name: "existing_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "6"} }, + Field { name: "deleted_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "7"} }, + Field { name: "added_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "15"} }, + Field { name: "existing_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "16"} }, + Field { name: "deleted_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "17"} }, + Field { name: "partition_summaries", data_type: List(Field { name: "item", data_type: Struct([Field { name: "contains_null", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "10"} }, Field { name: "contains_nan", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "11"} }, Field { name: "lower_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "12"} }, Field { name: "upper_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "13"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "9"} }), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "8"} }"#]], expect![[r#" - content: PrimitiveArray + content: PrimitiveArray [ 0, ],