From ddb6b1487b0523183c39358265646b70d7424742 Mon Sep 17 00:00:00 2001 From: Willi Raschkowski Date: Tue, 24 Dec 2024 16:21:53 +0100 Subject: [PATCH] Add History metadata table --- crates/iceberg/src/metadata_scan.rs | 140 +++++++++++++++++++++++++++- 1 file changed, 138 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/metadata_scan.rs b/crates/iceberg/src/metadata_scan.rs index 942d7605c..b8af933c3 100644 --- a/crates/iceberg/src/metadata_scan.rs +++ b/crates/iceberg/src/metadata_scan.rs @@ -17,14 +17,15 @@ //! Metadata table api. +use std::collections::HashSet; use std::sync::Arc; -use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder}; +use arrow_array::builder::{BooleanBuilder, MapBuilder, PrimitiveBuilder, StringBuilder}; use arrow_array::types::{Int64Type, TimestampMillisecondType}; use arrow_array::RecordBatch; use arrow_schema::{DataType, Field, Schema, TimeUnit}; -use crate::spec::TableMetadata; +use crate::spec::{SnapshotRef, TableMetadata}; use crate::table::Table; use crate::Result; @@ -50,6 +51,13 @@ impl MetadataTable { } } + /// Get the history table. + pub fn history(&self) -> HistoryTable { + HistoryTable { + metadata_table: self, + } + } + fn metadata(&self) -> &TableMetadata { self.0.metadata() } @@ -128,6 +136,97 @@ impl<'a> SnapshotsTable<'a> { } } +/// History table. +/// +/// Shows how the table's current snapshot has changed over time and when each +/// snapshot became the current snapshot. +/// +/// Unlike the [Snapshots][SnapshotsTable], this metadata table has less detail +/// per snapshot but includes ancestry information of the current snapshot. +/// +/// `is_current_ancestor` indicates whether the snapshot is an ancestor of the +/// current snapshot. If `false`, then the snapshot was rolled back. +pub struct HistoryTable<'a> { + metadata_table: &'a MetadataTable, +} + +impl<'a> HistoryTable<'a> { + fn schema(&self) -> Schema { + Schema::new(vec![ + Field::new( + "made_current_at", + DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())), + false, + ), + Field::new("snapshot_id", DataType::Int64, false), + Field::new("parent_id", DataType::Int64, true), + Field::new("is_current_ancestor", DataType::Boolean, false), + ]) + } + + fn scan(&self) -> Result { + let table_metadata = self.metadata_table.metadata(); + let ancestors_by_snapshot_id: HashSet = + SnapshotAncestors::from_current_snapshot(table_metadata) + .map(|snapshot| snapshot.snapshot_id()) + .collect(); + + let mut made_current_at = + PrimitiveBuilder::::new().with_timezone("+00:00"); + let mut snapshot_id = PrimitiveBuilder::::new(); + let mut parent_id = PrimitiveBuilder::::new(); + let mut is_current_ancestor = BooleanBuilder::new(); + + for snapshot in table_metadata.snapshots() { + made_current_at.append_value(snapshot.timestamp_ms()); + snapshot_id.append_value(snapshot.snapshot_id()); + parent_id.append_option(snapshot.parent_snapshot_id()); + is_current_ancestor + .append_value(ancestors_by_snapshot_id.contains(&snapshot.snapshot_id())); + } + + Ok(RecordBatch::try_new(Arc::new(Self::schema(self)), vec![ + Arc::new(made_current_at.finish()), + Arc::new(snapshot_id.finish()), + Arc::new(parent_id.finish()), + Arc::new(is_current_ancestor.finish()), + ])?) + } +} + +/// Utility to iterate parent-by-parent over the ancestors of a snapshot. +struct SnapshotAncestors<'a> { + table_metadata: &'a TableMetadata, + snapshot: Option<&'a SnapshotRef>, +} + +impl<'a> SnapshotAncestors<'a> { + fn from_current_snapshot(table_metadata: &'a TableMetadata) -> Self { + SnapshotAncestors { + table_metadata, + snapshot: table_metadata.current_snapshot(), + } + } +} + +impl<'a> Iterator for SnapshotAncestors<'a> { + type Item = &'a SnapshotRef; + + /// Return the current `snapshot` and move this iterator to the parent snapshot. + fn next(&mut self) -> Option { + if let Some(snapshot) = self.snapshot { + let parent = match snapshot.parent_snapshot_id() { + Some(parent_snapshot_id) => self.table_metadata.snapshot_by_id(parent_snapshot_id), + None => None, + }; + self.snapshot = parent; + Some(snapshot) + } else { + None + } + } +} + #[cfg(test)] mod tests { use expect_test::{expect, Expect}; @@ -253,4 +352,41 @@ mod tests { Some("committed_at"), ); } + + #[test] + fn test_history_table() { + let table = TableTestFixture::new().table; + let record_batch = table.metadata_table().history().scan().unwrap(); + check_record_batch( + record_batch, + expect![[r#" + Field { name: "made_current_at", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "parent_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "is_current_ancestor", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]], + expect![[r#" + made_current_at: PrimitiveArray + [ + 2018-01-04T21:22:35.770+00:00, + 2019-04-12T20:29:15.770+00:00, + ], + snapshot_id: PrimitiveArray + [ + 3051729675574597004, + 3055729675574597004, + ], + parent_id: PrimitiveArray + [ + null, + 3051729675574597004, + ], + is_current_ancestor: BooleanArray + [ + true, + true, + ]"#]], + &[], + Some("made_current_at"), + ); + } }