Skip to content

Commit

Permalink
pass test
Browse files Browse the repository at this point in the history
  • Loading branch information
flaneur2020 committed Jan 3, 2025
1 parent ecb5b9e commit 569bb7a
Showing 1 changed file with 51 additions and 72 deletions.
123 changes: 51 additions & 72 deletions crates/iceberg/src/metadata_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@

//! Metadata table api.
use std::collections::HashMap;
use std::sync::Arc;

use arrow_array::builder::{
BooleanBuilder, ListBuilder, MapBuilder, PrimitiveBuilder, StringBuilder, StructBuilder,
};
use arrow_array::types::{Int32Type, Int64Type, Int8Type, TimestampMillisecondType};
use arrow_array::types::{Int32Type, Int64Type, TimestampMillisecondType};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit};

use crate::arrow::schema_to_arrow_schema;
use crate::spec::{ListType, NestedField, PrimitiveType, StructType, Type};
use crate::table::Table;
use crate::Result;
Expand Down Expand Up @@ -135,101 +137,66 @@ pub struct ManifestsTable<'a> {
}

impl<'a> ManifestsTable<'a> {
fn partition_summary_fields(&self) -> Vec<Field> {
vec![
Field::new("contains_null", DataType::Boolean, false),
Field::new("contains_nan", DataType::Boolean, true),
Field::new("lower_bound", DataType::Utf8, true),
Field::new("upper_bound", DataType::Utf8, true),
]
}

/// Returns the fields of the manifests table.
fn fields(&self) -> Vec<Field> {
vec![
Field::new("content", DataType::Int8, false),
Field::new("path", DataType::Utf8, false),
Field::new("length", DataType::Int64, false),
Field::new("partition_spec_id", DataType::Int32, false),
Field::new("added_snapshot_id", DataType::Int64, false),
Field::new("added_data_files_count", DataType::Int32, false),
Field::new("existing_data_files_count", DataType::Int32, false),
Field::new("deleted_data_files_count", DataType::Int32, false),
Field::new("added_delete_files_count", DataType::Int32, false),
Field::new("existing_delete_files_count", DataType::Int32, false),
Field::new("deleted_delete_files_count", DataType::Int32, false),
Field::new(
"partition_summaries",
DataType::List(Arc::new(Field::new_struct(
"item",
self.partition_summary_fields(),
false,
))),
false,
),
]
}

/// Returns the iceberg schema of the manifests table.
pub fn schema(&self) -> crate::spec::Schema {
let fields = vec![
NestedField::new(14, "content", Type::Primitive(PrimitiveType::Int), false),
NestedField::new(1, "path", Type::Primitive(PrimitiveType::String), false),
NestedField::new(2, "length", Type::Primitive(PrimitiveType::Long), false),
NestedField::new(14, "content", Type::Primitive(PrimitiveType::Int), true),
NestedField::new(1, "path", Type::Primitive(PrimitiveType::String), true),
NestedField::new(2, "length", Type::Primitive(PrimitiveType::Long), true),
NestedField::new(
3,
"partition_spec_id",
Type::Primitive(PrimitiveType::Int),
false,
true,
),
NestedField::new(
4,
"added_snapshot_id",
Type::Primitive(PrimitiveType::Long),
false,
true,
),
NestedField::new(
5,
"added_data_files_count",
Type::Primitive(PrimitiveType::Int),
false,
true,
),
NestedField::new(
6,
"existing_data_files_count",
Type::Primitive(PrimitiveType::Int),
false,
true,
),
NestedField::new(
7,
"deleted_data_files_count",
Type::Primitive(PrimitiveType::Int),
false,
true,
),
NestedField::new(
15,
"added_delete_files_count",
Type::Primitive(PrimitiveType::Int),
false,
true,
),
NestedField::new(
16,
"existing_delete_files_count",
Type::Primitive(PrimitiveType::Int),
false,
true,
),
NestedField::new(
17,
"deleted_delete_files_count",
Type::Primitive(PrimitiveType::Int),
false,
true,
),
NestedField::new(
8,
"partition_summaries",
Type::List(ListType {
element_field: Arc::new(NestedField::new(
0,
9,
"item",
Type::Struct(StructType::new(vec![
Arc::new(NestedField::new(
Expand Down Expand Up @@ -257,10 +224,10 @@ impl<'a> ManifestsTable<'a> {
false,
)),
])),
false,
true,
)),
}),
false,
true,
),
];

Expand All @@ -272,7 +239,20 @@ impl<'a> ManifestsTable<'a> {

/// Scans the manifests table.
pub async fn scan(&self) -> Result<RecordBatch> {
let mut content = PrimitiveBuilder::<Int8Type>::new();
let schema = schema_to_arrow_schema(&self.schema())?;
let partition_summary_fields = if let DataType::List(list_type) =
schema.field_with_name("partition_summaries")?.data_type()
{
if let DataType::Struct(fields) = list_type.data_type() {
fields.to_vec()
} else {
unreachable!()
}
} else {
unreachable!()
};

let mut content = PrimitiveBuilder::<Int32Type>::new();
let mut path = StringBuilder::new();
let mut length = PrimitiveBuilder::<Int64Type>::new();
let mut partition_spec_id = PrimitiveBuilder::<Int32Type>::new();
Expand All @@ -284,21 +264,21 @@ impl<'a> ManifestsTable<'a> {
let mut existing_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut deleted_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut partition_summaries = ListBuilder::new(StructBuilder::from_fields(
Fields::from(self.partition_summary_fields()),
Fields::from(partition_summary_fields.clone()),
0,
))
.with_field(Arc::new(Field::new_struct(
"item",
self.partition_summary_fields(),
false,
)));
.with_field(Arc::new(
Field::new_struct("item", partition_summary_fields, false).with_metadata(
HashMap::from([("PARQUET:field_id".to_string(), "9".to_string())]),
),
));

if let Some(snapshot) = self.table.metadata().current_snapshot() {
let manifest_list = snapshot
.load_manifest_list(self.table.file_io(), &self.table.metadata_ref())
.await?;
for manifest in manifest_list.entries() {
content.append_value(manifest.content as i8);
content.append_value(manifest.content as i32);
path.append_value(manifest.manifest_path.clone());
length.append_value(manifest.manifest_length);
partition_spec_id.append_value(manifest.partition_spec_id);
Expand Down Expand Up @@ -339,7 +319,6 @@ impl<'a> ManifestsTable<'a> {
}
}

let schema = Schema::new(self.fields());
Ok(RecordBatch::try_new(Arc::new(schema), vec![
Arc::new(content.finish()),
Arc::new(path.finish()),
Expand Down Expand Up @@ -499,20 +478,20 @@ mod tests {
check_record_batch(
record_batch,
expect![[r#"
Field { name: "content", data_type: Int8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "length", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "partition_spec_id", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "added_snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "added_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "existing_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "deleted_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "added_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "existing_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "deleted_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "partition_summaries", data_type: List(Field { name: "item", data_type: Struct([Field { name: "contains_null", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "contains_nan", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]],
Field { name: "content", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "14"} },
Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} },
Field { name: "length", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} },
Field { name: "partition_spec_id", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} },
Field { name: "added_snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "4"} },
Field { name: "added_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "5"} },
Field { name: "existing_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "6"} },
Field { name: "deleted_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "7"} },
Field { name: "added_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "15"} },
Field { name: "existing_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "16"} },
Field { name: "deleted_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "17"} },
Field { name: "partition_summaries", data_type: List(Field { name: "item", data_type: Struct([Field { name: "contains_null", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "10"} }, Field { name: "contains_nan", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "11"} }, Field { name: "lower_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "12"} }, Field { name: "upper_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "13"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "9"} }), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "8"} }"#]],
expect![[r#"
content: PrimitiveArray<Int8>
content: PrimitiveArray<Int32>
[
0,
],
Expand Down

0 comments on commit 569bb7a

Please sign in to comment.