Skip to content

Commit

Permalink
Move 'build_batch' into 'scan'
Browse files Browse the repository at this point in the history
  • Loading branch information
rshkv committed Jan 7, 2025
1 parent 66bc85a commit 27a5068
Showing 1 changed file with 31 additions and 36 deletions.
67 changes: 31 additions & 36 deletions crates/iceberg/src/inspect/snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ use std::sync::Arc;
use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder};
use arrow_array::types::{Int64Type, TimestampMillisecondType};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use futures::StreamExt;

use crate::scan::ArrowRecordBatchStream;
use crate::spec::TableMetadata;
use crate::table::Table;
use crate::Result;

Expand Down Expand Up @@ -77,42 +76,38 @@ impl<'a> SnapshotsTable<'a> {
let arrow_schema = Arc::new(self.schema());
let table_metadata = self.table.metadata_ref();

Ok(
futures::stream::once(async move { Self::build_batch(arrow_schema, &table_metadata) })
.boxed(),
)
}

fn build_batch(arrow_schema: SchemaRef, table_metadata: &TableMetadata) -> Result<RecordBatch> {
let mut committed_at =
PrimitiveBuilder::<TimestampMillisecondType>::new().with_timezone("+00:00");
let mut snapshot_id = PrimitiveBuilder::<Int64Type>::new();
let mut parent_id = PrimitiveBuilder::<Int64Type>::new();
let mut operation = StringBuilder::new();
let mut manifest_list = StringBuilder::new();
let mut summary = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new());

for snapshot in table_metadata.snapshots() {
committed_at.append_value(snapshot.timestamp_ms());
snapshot_id.append_value(snapshot.snapshot_id());
parent_id.append_option(snapshot.parent_snapshot_id());
manifest_list.append_value(snapshot.manifest_list());
operation.append_value(snapshot.summary().operation.as_str());
for (key, value) in &snapshot.summary().additional_properties {
summary.keys().append_value(key);
summary.values().append_value(value);
Ok(futures::stream::once(async move {
let mut committed_at =
PrimitiveBuilder::<TimestampMillisecondType>::new().with_timezone("+00:00");
let mut snapshot_id = PrimitiveBuilder::<Int64Type>::new();
let mut parent_id = PrimitiveBuilder::<Int64Type>::new();
let mut operation = StringBuilder::new();
let mut manifest_list = StringBuilder::new();
let mut summary = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new());

for snapshot in table_metadata.snapshots() {
committed_at.append_value(snapshot.timestamp_ms());
snapshot_id.append_value(snapshot.snapshot_id());
parent_id.append_option(snapshot.parent_snapshot_id());
manifest_list.append_value(snapshot.manifest_list());
operation.append_value(snapshot.summary().operation.as_str());
for (key, value) in &snapshot.summary().additional_properties {
summary.keys().append_value(key);
summary.values().append_value(value);
}
summary.append(true)?;
}
summary.append(true)?;
}

Ok(RecordBatch::try_new(arrow_schema, vec![
Arc::new(committed_at.finish()),
Arc::new(snapshot_id.finish()),
Arc::new(parent_id.finish()),
Arc::new(operation.finish()),
Arc::new(manifest_list.finish()),
Arc::new(summary.finish()),
])?)
Ok(RecordBatch::try_new(arrow_schema, vec![
Arc::new(committed_at.finish()),
Arc::new(snapshot_id.finish()),
Arc::new(parent_id.finish()),
Arc::new(operation.finish()),
Arc::new(manifest_list.finish()),
Arc::new(summary.finish()),
])?)
})
.boxed())
}
}

Expand Down

0 comments on commit 27a5068

Please sign in to comment.