Skip to content

Commit

Permalink
Metadata table scans as streams
Browse files Browse the repository at this point in the history
  • Loading branch information
rshkv committed Jan 7, 2025
1 parent e34f428 commit 66bc85a
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 36 deletions.
49 changes: 33 additions & 16 deletions crates/iceberg/src/inspect/manifests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@ use arrow_array::builder::{
};
use arrow_array::types::{Int32Type, Int64Type, Int8Type};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Fields, Schema};
use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
use futures::StreamExt;

use crate::io::FileIO;
use crate::scan::ArrowRecordBatchStream;
use crate::spec::TableMetadata;
use crate::table::Table;
use crate::Result;

Expand All @@ -38,7 +42,7 @@ impl<'a> ManifestsTable<'a> {
Self { table }
}

fn partition_summary_fields(&self) -> Vec<Field> {
fn partition_summary_fields() -> Vec<Field> {
vec![
Field::new("contains_null", DataType::Boolean, false),
Field::new("contains_nan", DataType::Boolean, true),
Expand All @@ -65,7 +69,7 @@ impl<'a> ManifestsTable<'a> {
"partition_summaries",
DataType::List(Arc::new(Field::new_struct(
"item",
self.partition_summary_fields(),
Self::partition_summary_fields(),
false,
))),
false,
Expand All @@ -74,7 +78,22 @@ impl<'a> ManifestsTable<'a> {
}

/// Scans the manifests table.
pub async fn scan(&self) -> Result<RecordBatch> {
pub async fn scan(&self) -> Result<ArrowRecordBatchStream> {
let arrow_schema = Arc::new(self.schema());
let table_metadata = self.table.metadata_ref();
let file_io = self.table.file_io().clone();

Ok(futures::stream::once(async move {
Self::build_batch(arrow_schema, &table_metadata, &file_io).await
})
.boxed())
}

async fn build_batch(
arrow_schema: SchemaRef,
table_metadata: &TableMetadata,
file_io: &FileIO,
) -> Result<RecordBatch> {
let mut content = PrimitiveBuilder::<Int8Type>::new();
let mut path = StringBuilder::new();
let mut length = PrimitiveBuilder::<Int64Type>::new();
Expand All @@ -87,19 +106,17 @@ impl<'a> ManifestsTable<'a> {
let mut existing_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut deleted_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut partition_summaries = ListBuilder::new(StructBuilder::from_fields(
Fields::from(self.partition_summary_fields()),
Fields::from(Self::partition_summary_fields()),
0,
))
.with_field(Arc::new(Field::new_struct(
"item",
self.partition_summary_fields(),
Self::partition_summary_fields(),
false,
)));

if let Some(snapshot) = self.table.metadata().current_snapshot() {
let manifest_list = snapshot
.load_manifest_list(self.table.file_io(), &self.table.metadata_ref())
.await?;
if let Some(snapshot) = table_metadata.current_snapshot() {
let manifest_list = snapshot.load_manifest_list(file_io, table_metadata).await?;
for manifest in manifest_list.entries() {
content.append_value(manifest.content as i8);
path.append_value(manifest.manifest_path.clone());
Expand Down Expand Up @@ -142,7 +159,7 @@ impl<'a> ManifestsTable<'a> {
}
}

Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![
Ok(RecordBatch::try_new(arrow_schema, vec![
Arc::new(content.finish()),
Arc::new(path.finish()),
Arc::new(length.finish()),
Expand All @@ -163,18 +180,18 @@ impl<'a> ManifestsTable<'a> {
mod tests {
use expect_test::expect;

use crate::inspect::metadata_table::tests::check_record_batch;
use crate::inspect::metadata_table::tests::check_record_batches;
use crate::scan::tests::TableTestFixture;

#[tokio::test]
async fn test_manifests_table() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;

let record_batch = fixture.table.inspect().manifests().scan().await.unwrap();
let batch_stream = fixture.table.inspect().manifests().scan().await.unwrap();

check_record_batch(
record_batch,
check_record_batches(
batch_stream,
expect![[r#"
Field { name: "content", data_type: Int8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Expand Down Expand Up @@ -259,6 +276,6 @@ mod tests {
]"#]],
&["path", "length"],
Some("path"),
);
).await;
}
}
26 changes: 18 additions & 8 deletions crates/iceberg/src/inspect/metadata_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,31 +25,33 @@ use crate::table::Table;
/// - <https://iceberg.apache.org/docs/latest/spark-queries/#querying-with-sql>
/// - <https://py.iceberg.apache.org/api/#inspecting-tables>
#[derive(Debug)]
pub struct MetadataTable(Table);
pub struct MetadataTable<'a>(&'a Table);

impl MetadataTable {
impl<'a> MetadataTable<'a> {
/// Creates a new metadata scan.
pub fn new(table: Table) -> Self {
pub fn new(table: &'a Table) -> Self {
Self(table)
}

/// Get the snapshots table.
pub fn snapshots(&self) -> SnapshotsTable {
SnapshotsTable::new(&self.0)
SnapshotsTable::new(self.0)
}

/// Get the manifests table.
pub fn manifests(&self) -> ManifestsTable {
ManifestsTable::new(&self.0)
ManifestsTable::new(self.0)
}
}

#[cfg(test)]
pub mod tests {
use arrow_array::RecordBatch;
use expect_test::Expect;
use futures::TryStreamExt;
use itertools::Itertools;

use crate::scan::ArrowRecordBatchStream;

/// Snapshot testing to check the resulting record batch.
///
/// - `expected_schema/data`: put `expect![[""]]` as a placeholder,
Expand All @@ -58,13 +60,21 @@ pub mod tests {
/// Check the doc of [`expect_test`] for more details.
/// - `ignore_check_columns`: Some columns are not stable, so we can skip them.
/// - `sort_column`: The order of the data might be non-deterministic, so we can sort it by a column.
pub fn check_record_batch(
record_batch: RecordBatch,
pub async fn check_record_batches(
batch_stream: ArrowRecordBatchStream,
expected_schema: Expect,
expected_data: Expect,
ignore_check_columns: &[&str],
sort_column: Option<&str>,
) {
let record_batches = batch_stream.try_collect::<Vec<_>>().await.unwrap();
assert!(!record_batches.is_empty(), "Empty record batches");

// Combine record batches using the first batch's schema
let first_batch = record_batches.first().unwrap();
let record_batch =
arrow_select::concat::concat_batches(&first_batch.schema(), &record_batches).unwrap();

let mut columns = record_batch.columns().to_vec();
if let Some(sort_column) = sort_column {
let column = record_batch.column_by_name(sort_column).unwrap();
Expand Down
37 changes: 26 additions & 11 deletions crates/iceberg/src/inspect/snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ use std::sync::Arc;
use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder};
use arrow_array::types::{Int64Type, TimestampMillisecondType};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
use futures::StreamExt;

use crate::scan::ArrowRecordBatchStream;
use crate::spec::TableMetadata;
use crate::table::Table;
use crate::Result;

Expand Down Expand Up @@ -70,7 +73,17 @@ impl<'a> SnapshotsTable<'a> {
}

/// Scans the snapshots table.
pub fn scan(&self) -> Result<RecordBatch> {
pub async fn scan(&self) -> Result<ArrowRecordBatchStream> {
let arrow_schema = Arc::new(self.schema());
let table_metadata = self.table.metadata_ref();

Ok(
futures::stream::once(async move { Self::build_batch(arrow_schema, &table_metadata) })
.boxed(),
)
}

fn build_batch(arrow_schema: SchemaRef, table_metadata: &TableMetadata) -> Result<RecordBatch> {
let mut committed_at =
PrimitiveBuilder::<TimestampMillisecondType>::new().with_timezone("+00:00");
let mut snapshot_id = PrimitiveBuilder::<Int64Type>::new();
Expand All @@ -79,7 +92,7 @@ impl<'a> SnapshotsTable<'a> {
let mut manifest_list = StringBuilder::new();
let mut summary = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new());

for snapshot in self.table.metadata().snapshots() {
for snapshot in table_metadata.snapshots() {
committed_at.append_value(snapshot.timestamp_ms());
snapshot_id.append_value(snapshot.snapshot_id());
parent_id.append_option(snapshot.parent_snapshot_id());
Expand All @@ -92,7 +105,7 @@ impl<'a> SnapshotsTable<'a> {
summary.append(true)?;
}

Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![
Ok(RecordBatch::try_new(arrow_schema, vec![
Arc::new(committed_at.finish()),
Arc::new(snapshot_id.finish()),
Arc::new(parent_id.finish()),
Expand All @@ -107,15 +120,17 @@ impl<'a> SnapshotsTable<'a> {
mod tests {
use expect_test::expect;

use crate::inspect::metadata_table::tests::check_record_batch;
use crate::inspect::metadata_table::tests::check_record_batches;
use crate::scan::tests::TableTestFixture;

#[test]
fn test_snapshots_table() {
#[tokio::test]
async fn test_snapshots_table() {
let table = TableTestFixture::new().table;
let record_batch = table.inspect().snapshots().scan().unwrap();
check_record_batch(
record_batch,

let batch_stream = table.inspect().snapshots().scan().await.unwrap();

check_record_batches(
batch_stream,
expect![[r#"
Field { name: "committed_at", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Expand Down Expand Up @@ -178,6 +193,6 @@ mod tests {
]"#]],
&["manifest_list"],
Some("committed_at"),
);
).await;
}
}
2 changes: 1 addition & 1 deletion crates/iceberg/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ impl Table {

/// Creates a metadata table which provides table-like APIs for inspecting metadata.
/// See [`MetadataTable`] for more details.
pub fn inspect(self) -> MetadataTable {
pub fn inspect(&self) -> MetadataTable<'_> {
MetadataTable::new(self)
}

Expand Down

0 comments on commit 66bc85a

Please sign in to comment.