Skip to content

Commit

Permalink
Add History metadata table
Browse files Browse the repository at this point in the history
  • Loading branch information
rshkv committed Dec 30, 2024
1 parent 328e18e commit ddb6b14
Showing 1 changed file with 138 additions and 2 deletions.
140 changes: 138 additions & 2 deletions crates/iceberg/src/metadata_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

//! Metadata table api.
use std::collections::HashSet;
use std::sync::Arc;

use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder};
use arrow_array::builder::{BooleanBuilder, MapBuilder, PrimitiveBuilder, StringBuilder};
use arrow_array::types::{Int64Type, TimestampMillisecondType};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema, TimeUnit};

use crate::spec::TableMetadata;
use crate::spec::{SnapshotRef, TableMetadata};
use crate::table::Table;
use crate::Result;

Expand All @@ -50,6 +51,13 @@ impl MetadataTable {
}
}

/// Get the history table.
pub fn history(&self) -> HistoryTable {
HistoryTable {
metadata_table: self,
}
}

fn metadata(&self) -> &TableMetadata {
self.0.metadata()
}
Expand Down Expand Up @@ -128,6 +136,97 @@ impl<'a> SnapshotsTable<'a> {
}
}

/// History table.
///
/// Shows how the table's current snapshot has changed over time and when each
/// snapshot became the current snapshot.
///
/// Unlike the [Snapshots][SnapshotsTable], this metadata table has less detail
/// per snapshot but includes ancestry information of the current snapshot.
///
/// `is_current_ancestor` indicates whether the snapshot is an ancestor of the
/// current snapshot. If `false`, then the snapshot was rolled back.
pub struct HistoryTable<'a> {
metadata_table: &'a MetadataTable,
}

impl<'a> HistoryTable<'a> {
fn schema(&self) -> Schema {
Schema::new(vec![
Field::new(
"made_current_at",
DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
false,
),
Field::new("snapshot_id", DataType::Int64, false),
Field::new("parent_id", DataType::Int64, true),
Field::new("is_current_ancestor", DataType::Boolean, false),
])
}

fn scan(&self) -> Result<RecordBatch> {
let table_metadata = self.metadata_table.metadata();
let ancestors_by_snapshot_id: HashSet<i64> =
SnapshotAncestors::from_current_snapshot(table_metadata)
.map(|snapshot| snapshot.snapshot_id())
.collect();

let mut made_current_at =
PrimitiveBuilder::<TimestampMillisecondType>::new().with_timezone("+00:00");
let mut snapshot_id = PrimitiveBuilder::<Int64Type>::new();
let mut parent_id = PrimitiveBuilder::<Int64Type>::new();
let mut is_current_ancestor = BooleanBuilder::new();

for snapshot in table_metadata.snapshots() {
made_current_at.append_value(snapshot.timestamp_ms());
snapshot_id.append_value(snapshot.snapshot_id());
parent_id.append_option(snapshot.parent_snapshot_id());
is_current_ancestor
.append_value(ancestors_by_snapshot_id.contains(&snapshot.snapshot_id()));
}

Ok(RecordBatch::try_new(Arc::new(Self::schema(self)), vec![
Arc::new(made_current_at.finish()),
Arc::new(snapshot_id.finish()),
Arc::new(parent_id.finish()),
Arc::new(is_current_ancestor.finish()),
])?)
}
}

/// Utility to iterate parent-by-parent over the ancestors of a snapshot.
struct SnapshotAncestors<'a> {
table_metadata: &'a TableMetadata,
snapshot: Option<&'a SnapshotRef>,
}

impl<'a> SnapshotAncestors<'a> {
fn from_current_snapshot(table_metadata: &'a TableMetadata) -> Self {
SnapshotAncestors {
table_metadata,
snapshot: table_metadata.current_snapshot(),
}
}
}

impl<'a> Iterator for SnapshotAncestors<'a> {
type Item = &'a SnapshotRef;

/// Return the current `snapshot` and move this iterator to the parent snapshot.
fn next(&mut self) -> Option<Self::Item> {
if let Some(snapshot) = self.snapshot {
let parent = match snapshot.parent_snapshot_id() {
Some(parent_snapshot_id) => self.table_metadata.snapshot_by_id(parent_snapshot_id),
None => None,
};
self.snapshot = parent;
Some(snapshot)
} else {
None
}
}
}

#[cfg(test)]
mod tests {
use expect_test::{expect, Expect};
Expand Down Expand Up @@ -253,4 +352,41 @@ mod tests {
Some("committed_at"),
);
}

#[test]
fn test_history_table() {
let table = TableTestFixture::new().table;
let record_batch = table.metadata_table().history().scan().unwrap();
check_record_batch(
record_batch,
expect![[r#"
Field { name: "made_current_at", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "parent_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "is_current_ancestor", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]],
expect![[r#"
made_current_at: PrimitiveArray<Timestamp(Millisecond, Some("+00:00"))>
[
2018-01-04T21:22:35.770+00:00,
2019-04-12T20:29:15.770+00:00,
],
snapshot_id: PrimitiveArray<Int64>
[
3051729675574597004,
3055729675574597004,
],
parent_id: PrimitiveArray<Int64>
[
null,
3051729675574597004,
],
is_current_ancestor: BooleanArray
[
true,
true,
]"#]],
&[],
Some("made_current_at"),
);
}
}

0 comments on commit ddb6b14

Please sign in to comment.