Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(metadata): export iceberg schema in manifests table #871

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 176 additions & 84 deletions crates/iceberg/src/inspect/manifests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::sync::Arc;

use arrow_array::builder::{
BooleanBuilder, ListBuilder, PrimitiveBuilder, StringBuilder, StructBuilder,
BooleanBuilder, GenericListBuilder, ListBuilder, PrimitiveBuilder, StringBuilder, StructBuilder,
};
use arrow_array::types::{Int32Type, Int64Type, Int8Type};
use arrow_array::types::{Int32Type, Int64Type};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Fields, Schema};
use arrow_schema::{DataType, Field, Fields};
use futures::{stream, StreamExt};

use crate::arrow::schema_to_arrow_schema;
use crate::scan::ArrowRecordBatchStream;
use crate::spec::{FieldSummary, ListType, NestedField, PrimitiveType, StructType, Type};
use crate::table::Table;
use crate::Result;

Expand All @@ -40,44 +43,111 @@ impl<'a> ManifestsTable<'a> {
Self { table }
}

fn partition_summary_fields() -> Vec<Field> {
vec![
Field::new("contains_null", DataType::Boolean, false),
Field::new("contains_nan", DataType::Boolean, true),
Field::new("lower_bound", DataType::Utf8, true),
Field::new("upper_bound", DataType::Utf8, true),
]
}

/// Returns the schema of the manifests table.
pub fn schema(&self) -> Schema {
Schema::new(vec![
Field::new("content", DataType::Int8, false),
Field::new("path", DataType::Utf8, false),
Field::new("length", DataType::Int64, false),
Field::new("partition_spec_id", DataType::Int32, false),
Field::new("added_snapshot_id", DataType::Int64, false),
Field::new("added_data_files_count", DataType::Int32, false),
Field::new("existing_data_files_count", DataType::Int32, false),
Field::new("deleted_data_files_count", DataType::Int32, false),
Field::new("added_delete_files_count", DataType::Int32, false),
Field::new("existing_delete_files_count", DataType::Int32, false),
Field::new("deleted_delete_files_count", DataType::Int32, false),
Field::new(
/// Returns the iceberg schema of the manifests table.
pub fn schema(&self) -> crate::spec::Schema {
let fields = vec![
NestedField::new(14, "content", Type::Primitive(PrimitiveType::Int), true),
NestedField::new(1, "path", Type::Primitive(PrimitiveType::String), true),
NestedField::new(2, "length", Type::Primitive(PrimitiveType::Long), true),
NestedField::new(
3,
"partition_spec_id",
Type::Primitive(PrimitiveType::Int),
true,
),
NestedField::new(
4,
"added_snapshot_id",
Type::Primitive(PrimitiveType::Long),
true,
),
NestedField::new(
5,
"added_data_files_count",
Type::Primitive(PrimitiveType::Int),
true,
),
NestedField::new(
6,
"existing_data_files_count",
Type::Primitive(PrimitiveType::Int),
true,
),
NestedField::new(
7,
"deleted_data_files_count",
Type::Primitive(PrimitiveType::Int),
true,
),
NestedField::new(
15,
"added_delete_files_count",
Type::Primitive(PrimitiveType::Int),
true,
),
NestedField::new(
16,
"existing_delete_files_count",
Type::Primitive(PrimitiveType::Int),
true,
),
NestedField::new(
17,
"deleted_delete_files_count",
Type::Primitive(PrimitiveType::Int),
true,
),
NestedField::new(
8,
"partition_summaries",
DataType::List(Arc::new(Field::new_struct(
"item",
Self::partition_summary_fields(),
false,
))),
false,
Type::List(ListType {
element_field: Arc::new(NestedField::new(
9,
"item",
Type::Struct(StructType::new(vec![
Arc::new(NestedField::new(
10,
"contains_null",
Type::Primitive(PrimitiveType::Boolean),
true,
)),
Arc::new(NestedField::new(
11,
"contains_nan",
Type::Primitive(PrimitiveType::Boolean),
false,
)),
Arc::new(NestedField::new(
12,
"lower_bound",
Type::Primitive(PrimitiveType::String),
false,
)),
Arc::new(NestedField::new(
13,
"upper_bound",
Type::Primitive(PrimitiveType::String),
false,
)),
])),
true,
)),
}),
true,
),
])
];

crate::spec::Schema::builder()
.with_fields(fields.into_iter().map(|f| f.into()))
.build()
.unwrap()
}

/// Scans the manifests table.
pub async fn scan(&self) -> Result<ArrowRecordBatchStream> {
let mut content = PrimitiveBuilder::<Int8Type>::new();
let schema = schema_to_arrow_schema(&self.schema())?;

let mut content = PrimitiveBuilder::<Int32Type>::new();
let mut path = StringBuilder::new();
let mut length = PrimitiveBuilder::<Int64Type>::new();
let mut partition_spec_id = PrimitiveBuilder::<Int32Type>::new();
Expand All @@ -88,22 +158,14 @@ impl<'a> ManifestsTable<'a> {
let mut added_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut existing_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut deleted_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut partition_summaries = ListBuilder::new(StructBuilder::from_fields(
Fields::from(Self::partition_summary_fields()),
0,
))
.with_field(Arc::new(Field::new_struct(
"item",
Self::partition_summary_fields(),
false,
)));
let mut partition_summaries = self.partition_summary_builder()?;

if let Some(snapshot) = self.table.metadata().current_snapshot() {
let manifest_list = snapshot
.load_manifest_list(self.table.file_io(), &self.table.metadata_ref())
.await?;
for manifest in manifest_list.entries() {
content.append_value(manifest.content as i8);
content.append_value(manifest.content as i32);
path.append_value(manifest.manifest_path.clone());
length.append_value(manifest.manifest_length);
partition_spec_id.append_value(manifest.partition_spec_id);
Expand All @@ -119,32 +181,11 @@ impl<'a> ManifestsTable<'a> {
.append_value(manifest.existing_files_count.unwrap_or(0) as i32);
deleted_delete_files_count
.append_value(manifest.deleted_files_count.unwrap_or(0) as i32);

let partition_summaries_builder = partition_summaries.values();
for summary in &manifest.partitions {
partition_summaries_builder
.field_builder::<BooleanBuilder>(0)
.unwrap()
.append_value(summary.contains_null);
partition_summaries_builder
.field_builder::<BooleanBuilder>(1)
.unwrap()
.append_option(summary.contains_nan);
partition_summaries_builder
.field_builder::<StringBuilder>(2)
.unwrap()
.append_option(summary.lower_bound.as_ref().map(|v| v.to_string()));
partition_summaries_builder
.field_builder::<StringBuilder>(3)
.unwrap()
.append_option(summary.upper_bound.as_ref().map(|v| v.to_string()));
partition_summaries_builder.append(true);
}
partition_summaries.append(true);
self.append_partition_summaries(&mut partition_summaries, &manifest.partitions);
}
}

let batch = RecordBatch::try_new(Arc::new(self.schema()), vec![
let batch = RecordBatch::try_new(Arc::new(schema), vec![
Arc::new(content.finish()),
Arc::new(path.finish()),
Arc::new(length.finish()),
Expand All @@ -158,9 +199,60 @@ impl<'a> ManifestsTable<'a> {
Arc::new(deleted_delete_files_count.finish()),
Arc::new(partition_summaries.finish()),
])?;

Ok(stream::iter(vec![Ok(batch)]).boxed())
}

fn partition_summary_builder(&self) -> Result<GenericListBuilder<i32, StructBuilder>> {
let schema = schema_to_arrow_schema(&self.schema())?;
let partition_summary_fields =
match schema.field_with_name("partition_summaries")?.data_type() {
DataType::List(list_type) => match list_type.data_type() {
DataType::Struct(fields) => fields.to_vec(),
_ => unreachable!(),
},
_ => unreachable!(),
};

let partition_summaries = ListBuilder::new(StructBuilder::from_fields(
Fields::from(partition_summary_fields.clone()),
0,
))
.with_field(Arc::new(
Field::new_struct("item", partition_summary_fields, false).with_metadata(
HashMap::from([("PARQUET:field_id".to_string(), "9".to_string())]),
),
));

Ok(partition_summaries)
}

fn append_partition_summaries(
&self,
builder: &mut GenericListBuilder<i32, StructBuilder>,
partitions: &[FieldSummary],
) {
let partition_summaries_builder = builder.values();
for summary in partitions {
partition_summaries_builder
.field_builder::<BooleanBuilder>(0)
.unwrap()
.append_value(summary.contains_null);
partition_summaries_builder
.field_builder::<BooleanBuilder>(1)
.unwrap()
.append_option(summary.contains_nan);
partition_summaries_builder
.field_builder::<StringBuilder>(2)
.unwrap()
.append_option(summary.lower_bound.as_ref().map(|v| v.to_string()));
partition_summaries_builder
.field_builder::<StringBuilder>(3)
.unwrap()
.append_option(summary.upper_bound.as_ref().map(|v| v.to_string()));
partition_summaries_builder.append(true);
}
builder.append(true);
}
}

#[cfg(test)]
Expand All @@ -175,25 +267,25 @@ mod tests {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;

let batch_stream = fixture.table.inspect().manifests().scan().await.unwrap();
let record_batch = fixture.table.inspect().manifests().scan().await.unwrap();

check_record_batches(
batch_stream,
record_batch,
expect![[r#"
Field { name: "content", data_type: Int8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "length", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "partition_spec_id", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "added_snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "added_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "existing_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "deleted_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "added_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "existing_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "deleted_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "partition_summaries", data_type: List(Field { name: "item", data_type: Struct([Field { name: "contains_null", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "contains_nan", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]],
Field { name: "content", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "14"} },
Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} },
Field { name: "length", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} },
Field { name: "partition_spec_id", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} },
Field { name: "added_snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "4"} },
Field { name: "added_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "5"} },
Field { name: "existing_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "6"} },
Field { name: "deleted_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "7"} },
Field { name: "added_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "15"} },
Field { name: "existing_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "16"} },
Field { name: "deleted_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "17"} },
Field { name: "partition_summaries", data_type: List(Field { name: "item", data_type: Struct([Field { name: "contains_null", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "10"} }, Field { name: "contains_nan", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "11"} }, Field { name: "lower_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "12"} }, Field { name: "upper_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "13"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "9"} }), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "8"} }"#]],
expect![[r#"
content: PrimitiveArray<Int8>
content: PrimitiveArray<Int32>
[
0,
],
Expand Down
Loading