From 27a50689d8da9c46b1e6efecd28aa1ccee1378d6 Mon Sep 17 00:00:00 2001 From: Willi Raschkowski Date: Tue, 7 Jan 2025 10:53:52 +0000 Subject: [PATCH] Move 'build_batch' into 'scan' --- crates/iceberg/src/inspect/snapshots.rs | 67 ++++++++++++------------- 1 file changed, 31 insertions(+), 36 deletions(-) diff --git a/crates/iceberg/src/inspect/snapshots.rs b/crates/iceberg/src/inspect/snapshots.rs index ee362ef93..191a909fe 100644 --- a/crates/iceberg/src/inspect/snapshots.rs +++ b/crates/iceberg/src/inspect/snapshots.rs @@ -20,11 +20,10 @@ use std::sync::Arc; use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder}; use arrow_array::types::{Int64Type, TimestampMillisecondType}; use arrow_array::RecordBatch; -use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit}; +use arrow_schema::{DataType, Field, Schema, TimeUnit}; use futures::StreamExt; use crate::scan::ArrowRecordBatchStream; -use crate::spec::TableMetadata; use crate::table::Table; use crate::Result; @@ -77,42 +76,38 @@ impl<'a> SnapshotsTable<'a> { let arrow_schema = Arc::new(self.schema()); let table_metadata = self.table.metadata_ref(); - Ok( - futures::stream::once(async move { Self::build_batch(arrow_schema, &table_metadata) }) - .boxed(), - ) - } - - fn build_batch(arrow_schema: SchemaRef, table_metadata: &TableMetadata) -> Result { - let mut committed_at = - PrimitiveBuilder::::new().with_timezone("+00:00"); - let mut snapshot_id = PrimitiveBuilder::::new(); - let mut parent_id = PrimitiveBuilder::::new(); - let mut operation = StringBuilder::new(); - let mut manifest_list = StringBuilder::new(); - let mut summary = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new()); - - for snapshot in table_metadata.snapshots() { - committed_at.append_value(snapshot.timestamp_ms()); - snapshot_id.append_value(snapshot.snapshot_id()); - parent_id.append_option(snapshot.parent_snapshot_id()); - manifest_list.append_value(snapshot.manifest_list()); - operation.append_value(snapshot.summary().operation.as_str()); - for (key, value) in &snapshot.summary().additional_properties { - summary.keys().append_value(key); - summary.values().append_value(value); + Ok(futures::stream::once(async move { + let mut committed_at = + PrimitiveBuilder::::new().with_timezone("+00:00"); + let mut snapshot_id = PrimitiveBuilder::::new(); + let mut parent_id = PrimitiveBuilder::::new(); + let mut operation = StringBuilder::new(); + let mut manifest_list = StringBuilder::new(); + let mut summary = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new()); + + for snapshot in table_metadata.snapshots() { + committed_at.append_value(snapshot.timestamp_ms()); + snapshot_id.append_value(snapshot.snapshot_id()); + parent_id.append_option(snapshot.parent_snapshot_id()); + manifest_list.append_value(snapshot.manifest_list()); + operation.append_value(snapshot.summary().operation.as_str()); + for (key, value) in &snapshot.summary().additional_properties { + summary.keys().append_value(key); + summary.values().append_value(value); + } + summary.append(true)?; } - summary.append(true)?; - } - Ok(RecordBatch::try_new(arrow_schema, vec![ - Arc::new(committed_at.finish()), - Arc::new(snapshot_id.finish()), - Arc::new(parent_id.finish()), - Arc::new(operation.finish()), - Arc::new(manifest_list.finish()), - Arc::new(summary.finish()), - ])?) + Ok(RecordBatch::try_new(arrow_schema, vec![ + Arc::new(committed_at.finish()), + Arc::new(snapshot_id.finish()), + Arc::new(parent_id.finish()), + Arc::new(operation.finish()), + Arc::new(manifest_list.finish()), + Arc::new(summary.finish()), + ])?) + }) + .boxed()) } }