Skip to content

Commit

Permalink
Build batches scan before returning stream
Browse files Browse the repository at this point in the history
  • Loading branch information
rshkv committed Jan 7, 2025
1 parent 8283288 commit e67f809
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 141 deletions.
23 changes: 0 additions & 23 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ arrow-schema = { workspace = true }
arrow-select = { workspace = true }
arrow-string = { workspace = true }
async-std = { workspace = true, optional = true, features = ["attributes"] }
async-stream = { workspace = true }
async-trait = { workspace = true }
bimap = { workspace = true }
bitvec = { workspace = true }
Expand Down
162 changes: 79 additions & 83 deletions crates/iceberg/src/inspect/manifests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ use arrow_array::builder::{
use arrow_array::types::{Int32Type, Int64Type, Int8Type};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Fields, Schema};
use async_stream::try_stream;
use futures::StreamExt;
use futures::{stream, StreamExt};

use crate::scan::ArrowRecordBatchStream;
use crate::table::Table;
Expand Down Expand Up @@ -78,92 +77,89 @@ impl<'a> ManifestsTable<'a> {

/// Scans the manifests table.
pub async fn scan(&self) -> Result<ArrowRecordBatchStream> {
let arrow_schema = Arc::new(self.schema());
let table_metadata = self.table.metadata_ref();
let file_io = self.table.file_io().clone();
let mut content = PrimitiveBuilder::<Int8Type>::new();
let mut path = StringBuilder::new();
let mut length = PrimitiveBuilder::<Int64Type>::new();
let mut partition_spec_id = PrimitiveBuilder::<Int32Type>::new();
let mut added_snapshot_id = PrimitiveBuilder::<Int64Type>::new();
let mut added_data_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut existing_data_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut deleted_data_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut added_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut existing_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut deleted_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut partition_summaries = ListBuilder::new(StructBuilder::from_fields(
Fields::from(Self::partition_summary_fields()),
0,
))
.with_field(Arc::new(Field::new_struct(
"item",
Self::partition_summary_fields(),
false,
)));

Ok(try_stream! {
let mut content = PrimitiveBuilder::<Int8Type>::new();
let mut path = StringBuilder::new();
let mut length = PrimitiveBuilder::<Int64Type>::new();
let mut partition_spec_id = PrimitiveBuilder::<Int32Type>::new();
let mut added_snapshot_id = PrimitiveBuilder::<Int64Type>::new();
let mut added_data_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut existing_data_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut deleted_data_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut added_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut existing_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut deleted_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut partition_summaries = ListBuilder::new(StructBuilder::from_fields(
Fields::from(Self::partition_summary_fields()),
0,
))
.with_field(Arc::new(Field::new_struct(
"item",
Self::partition_summary_fields(),
false,
)));

if let Some(snapshot) = table_metadata.current_snapshot() {
let manifest_list = snapshot.load_manifest_list(&file_io, &table_metadata).await?;
for manifest in manifest_list.entries() {
content.append_value(manifest.content as i8);
path.append_value(manifest.manifest_path.clone());
length.append_value(manifest.manifest_length);
partition_spec_id.append_value(manifest.partition_spec_id);
added_snapshot_id.append_value(manifest.added_snapshot_id);
added_data_files_count.append_value(manifest.added_files_count.unwrap_or(0) as i32);
existing_data_files_count
.append_value(manifest.existing_files_count.unwrap_or(0) as i32);
deleted_data_files_count
.append_value(manifest.deleted_files_count.unwrap_or(0) as i32);
added_delete_files_count
.append_value(manifest.added_files_count.unwrap_or(0) as i32);
existing_delete_files_count
.append_value(manifest.existing_files_count.unwrap_or(0) as i32);
deleted_delete_files_count
.append_value(manifest.deleted_files_count.unwrap_or(0) as i32);
if let Some(snapshot) = self.table.metadata().current_snapshot() {
let manifest_list = snapshot
.load_manifest_list(self.table.file_io(), &self.table.metadata_ref())
.await?;
for manifest in manifest_list.entries() {
content.append_value(manifest.content as i8);
path.append_value(manifest.manifest_path.clone());
length.append_value(manifest.manifest_length);
partition_spec_id.append_value(manifest.partition_spec_id);
added_snapshot_id.append_value(manifest.added_snapshot_id);
added_data_files_count.append_value(manifest.added_files_count.unwrap_or(0) as i32);
existing_data_files_count
.append_value(manifest.existing_files_count.unwrap_or(0) as i32);
deleted_data_files_count
.append_value(manifest.deleted_files_count.unwrap_or(0) as i32);
added_delete_files_count
.append_value(manifest.added_files_count.unwrap_or(0) as i32);
existing_delete_files_count
.append_value(manifest.existing_files_count.unwrap_or(0) as i32);
deleted_delete_files_count
.append_value(manifest.deleted_files_count.unwrap_or(0) as i32);

let partition_summaries_builder = partition_summaries.values();
for summary in &manifest.partitions {
partition_summaries_builder
.field_builder::<BooleanBuilder>(0)
.unwrap()
.append_value(summary.contains_null);
partition_summaries_builder
.field_builder::<BooleanBuilder>(1)
.unwrap()
.append_option(summary.contains_nan);
partition_summaries_builder
.field_builder::<StringBuilder>(2)
.unwrap()
.append_option(summary.lower_bound.as_ref().map(|v| v.to_string()));
partition_summaries_builder
.field_builder::<StringBuilder>(3)
.unwrap()
.append_option(summary.upper_bound.as_ref().map(|v| v.to_string()));
partition_summaries_builder.append(true);
}
partition_summaries.append(true);
let partition_summaries_builder = partition_summaries.values();
for summary in &manifest.partitions {
partition_summaries_builder
.field_builder::<BooleanBuilder>(0)
.unwrap()
.append_value(summary.contains_null);
partition_summaries_builder
.field_builder::<BooleanBuilder>(1)
.unwrap()
.append_option(summary.contains_nan);
partition_summaries_builder
.field_builder::<StringBuilder>(2)
.unwrap()
.append_option(summary.lower_bound.as_ref().map(|v| v.to_string()));
partition_summaries_builder
.field_builder::<StringBuilder>(3)
.unwrap()
.append_option(summary.upper_bound.as_ref().map(|v| v.to_string()));
partition_summaries_builder.append(true);
}
partition_summaries.append(true);
}

yield RecordBatch::try_new(arrow_schema, vec![
Arc::new(content.finish()),
Arc::new(path.finish()),
Arc::new(length.finish()),
Arc::new(partition_spec_id.finish()),
Arc::new(added_snapshot_id.finish()),
Arc::new(added_data_files_count.finish()),
Arc::new(existing_data_files_count.finish()),
Arc::new(deleted_data_files_count.finish()),
Arc::new(added_delete_files_count.finish()),
Arc::new(existing_delete_files_count.finish()),
Arc::new(deleted_delete_files_count.finish()),
Arc::new(partition_summaries.finish()),
])?;
}
.boxed())

let batch = RecordBatch::try_new(Arc::new(self.schema()), vec![
Arc::new(content.finish()),
Arc::new(path.finish()),
Arc::new(length.finish()),
Arc::new(partition_spec_id.finish()),
Arc::new(added_snapshot_id.finish()),
Arc::new(added_data_files_count.finish()),
Arc::new(existing_data_files_count.finish()),
Arc::new(deleted_data_files_count.finish()),
Arc::new(added_delete_files_count.finish()),
Arc::new(existing_delete_files_count.finish()),
Arc::new(deleted_delete_files_count.finish()),
Arc::new(partition_summaries.finish()),
])?;

Ok(stream::iter(vec![Ok(batch)]).boxed())
}
}

Expand Down
63 changes: 29 additions & 34 deletions crates/iceberg/src/inspect/snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder};
use arrow_array::types::{Int64Type, TimestampMillisecondType};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use async_stream::try_stream;
use futures::StreamExt;
use futures::{stream, StreamExt};

use crate::scan::ArrowRecordBatchStream;
use crate::table::Table;
Expand Down Expand Up @@ -74,41 +73,37 @@ impl<'a> SnapshotsTable<'a> {

/// Scans the snapshots table.
pub async fn scan(&self) -> Result<ArrowRecordBatchStream> {
let arrow_schema = Arc::new(self.schema());
let table_metadata = self.table.metadata_ref();
let mut committed_at =
PrimitiveBuilder::<TimestampMillisecondType>::new().with_timezone("+00:00");
let mut snapshot_id = PrimitiveBuilder::<Int64Type>::new();
let mut parent_id = PrimitiveBuilder::<Int64Type>::new();
let mut operation = StringBuilder::new();
let mut manifest_list = StringBuilder::new();
let mut summary = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new());

Ok(try_stream! {
let mut committed_at =
PrimitiveBuilder::<TimestampMillisecondType>::new().with_timezone("+00:00");
let mut snapshot_id = PrimitiveBuilder::<Int64Type>::new();
let mut parent_id = PrimitiveBuilder::<Int64Type>::new();
let mut operation = StringBuilder::new();
let mut manifest_list = StringBuilder::new();
let mut summary = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new());

for snapshot in table_metadata.snapshots() {
committed_at.append_value(snapshot.timestamp_ms());
snapshot_id.append_value(snapshot.snapshot_id());
parent_id.append_option(snapshot.parent_snapshot_id());
manifest_list.append_value(snapshot.manifest_list());
operation.append_value(snapshot.summary().operation.as_str());
for (key, value) in &snapshot.summary().additional_properties {
summary.keys().append_value(key);
summary.values().append_value(value);
}
summary.append(true)?;
for snapshot in self.table.metadata().snapshots() {
committed_at.append_value(snapshot.timestamp_ms());
snapshot_id.append_value(snapshot.snapshot_id());
parent_id.append_option(snapshot.parent_snapshot_id());
manifest_list.append_value(snapshot.manifest_list());
operation.append_value(snapshot.summary().operation.as_str());
for (key, value) in &snapshot.summary().additional_properties {
summary.keys().append_value(key);
summary.values().append_value(value);
}

yield RecordBatch::try_new(arrow_schema, vec![
Arc::new(committed_at.finish()),
Arc::new(snapshot_id.finish()),
Arc::new(parent_id.finish()),
Arc::new(operation.finish()),
Arc::new(manifest_list.finish()),
Arc::new(summary.finish()),
])?;
summary.append(true)?;
}
.boxed())

let batch = RecordBatch::try_new(Arc::new(self.schema()), vec![
Arc::new(committed_at.finish()),
Arc::new(snapshot_id.finish()),
Arc::new(parent_id.finish()),
Arc::new(operation.finish()),
Arc::new(manifest_list.finish()),
Arc::new(summary.finish()),
])?;

Ok(stream::iter(vec![Ok(batch)]).boxed())
}
}

Expand Down

0 comments on commit e67f809

Please sign in to comment.