Skip to content

Commit

Permalink
refactor the partition summaries
Browse files Browse the repository at this point in the history
  • Loading branch information
flaneur2020 committed Jan 24, 2025
1 parent 5b36838 commit a7b2c5c
Showing 1 changed file with 50 additions and 35 deletions.
85 changes: 50 additions & 35 deletions crates/iceberg/src/inspect/manifests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use arrow_array::builder::{
BooleanBuilder, ListBuilder, PrimitiveBuilder, StringBuilder, StructBuilder,
BooleanBuilder, GenericListBuilder, ListBuilder, PrimitiveBuilder, StringBuilder, StructBuilder,
};
use arrow_array::types::{Int32Type, Int64Type};
use arrow_array::RecordBatch;
Expand All @@ -28,7 +28,7 @@ use futures::{stream, StreamExt};

use crate::arrow::schema_to_arrow_schema;
use crate::scan::ArrowRecordBatchStream;
use crate::spec::{ListType, NestedField, PrimitiveType, StructType, Type};
use crate::spec::{FieldSummary, ListType, NestedField, PrimitiveType, StructType, Type};
use crate::table::Table;
use crate::Result;

Expand Down Expand Up @@ -143,8 +143,7 @@ impl<'a> ManifestsTable<'a> {
.unwrap()
}

/// Scans the manifests table.
pub async fn scan(&self) -> Result<ArrowRecordBatchStream> {
fn partition_summary_builder(&self) -> Result<GenericListBuilder<i32, StructBuilder>> {
let schema = schema_to_arrow_schema(&self.schema())?;
let partition_summary_fields = if let DataType::List(list_type) =
schema.field_with_name("partition_summaries")?.data_type()
Expand All @@ -158,6 +157,51 @@ impl<'a> ManifestsTable<'a> {
unreachable!()
};

let partition_summaries = ListBuilder::new(StructBuilder::from_fields(
Fields::from(partition_summary_fields.clone()),
0,
))
.with_field(Arc::new(
Field::new_struct("item", partition_summary_fields, false).with_metadata(
HashMap::from([("PARQUET:field_id".to_string(), "9".to_string())]),
),
));

Ok(partition_summaries)
}

fn append_partition_summary(
&self,
builder: &mut GenericListBuilder<i32, StructBuilder>,
partitions: &[FieldSummary],
) {
let partition_summaries_builder = builder.values();
for summary in partitions {
partition_summaries_builder
.field_builder::<BooleanBuilder>(0)
.unwrap()
.append_value(summary.contains_null);
partition_summaries_builder
.field_builder::<BooleanBuilder>(1)
.unwrap()
.append_option(summary.contains_nan);
partition_summaries_builder
.field_builder::<StringBuilder>(2)
.unwrap()
.append_option(summary.lower_bound.as_ref().map(|v| v.to_string()));
partition_summaries_builder
.field_builder::<StringBuilder>(3)
.unwrap()
.append_option(summary.upper_bound.as_ref().map(|v| v.to_string()));
partition_summaries_builder.append(true);
}
builder.append(true);
}

/// Scans the manifests table.
pub async fn scan(&self) -> Result<ArrowRecordBatchStream> {
let schema = schema_to_arrow_schema(&self.schema())?;

let mut content = PrimitiveBuilder::<Int32Type>::new();
let mut path = StringBuilder::new();
let mut length = PrimitiveBuilder::<Int64Type>::new();
Expand All @@ -169,15 +213,7 @@ impl<'a> ManifestsTable<'a> {
let mut added_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut existing_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut deleted_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut partition_summaries = ListBuilder::new(StructBuilder::from_fields(
Fields::from(partition_summary_fields.clone()),
0,
))
.with_field(Arc::new(
Field::new_struct("item", partition_summary_fields, false).with_metadata(
HashMap::from([("PARQUET:field_id".to_string(), "9".to_string())]),
),
));
let mut partition_summaries = self.partition_summary_builder()?;

if let Some(snapshot) = self.table.metadata().current_snapshot() {
let manifest_list = snapshot
Expand All @@ -200,28 +236,7 @@ impl<'a> ManifestsTable<'a> {
.append_value(manifest.existing_files_count.unwrap_or(0) as i32);
deleted_delete_files_count
.append_value(manifest.deleted_files_count.unwrap_or(0) as i32);

let partition_summaries_builder = partition_summaries.values();
for summary in &manifest.partitions {
partition_summaries_builder
.field_builder::<BooleanBuilder>(0)
.unwrap()
.append_value(summary.contains_null);
partition_summaries_builder
.field_builder::<BooleanBuilder>(1)
.unwrap()
.append_option(summary.contains_nan);
partition_summaries_builder
.field_builder::<StringBuilder>(2)
.unwrap()
.append_option(summary.lower_bound.as_ref().map(|v| v.to_string()));
partition_summaries_builder
.field_builder::<StringBuilder>(3)
.unwrap()
.append_option(summary.upper_bound.as_ref().map(|v| v.to_string()));
partition_summaries_builder.append(true);
}
partition_summaries.append(true);
self.append_partition_summary(&mut partition_summaries, &manifest.partitions);
}
}

Expand Down

0 comments on commit a7b2c5c

Please sign in to comment.