Skip to content

Commit 90abb68

Browse files
committed
feat(datafusion): support metadata tables for Datafusion
Signed-off-by: xxchan <[email protected]>
1 parent 6e07faa commit 90abb68

File tree

15 files changed

+519
-93
lines changed

15 files changed

+519
-93
lines changed

Cargo.lock

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ serde_derive = "1"
9191
serde_json = "1"
9292
serde_repr = "0.1.16"
9393
serde_with = "3.4"
94+
strum = "0.26"
9495
tempfile = "3.15"
9596
tokio = { version = "1", default-features = false }
9697
typed-builder = "0.20"

crates/iceberg/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ bitvec = { workspace = true }
5858
bytes = { workspace = true }
5959
chrono = { workspace = true }
6060
derive_builder = { workspace = true }
61+
expect-test = { workspace = true }
6162
fnv = { workspace = true }
6263
futures = { workspace = true }
6364
itertools = { workspace = true }
@@ -78,6 +79,7 @@ serde_derive = { workspace = true }
7879
serde_json = { workspace = true }
7980
serde_repr = { workspace = true }
8081
serde_with = { workspace = true }
82+
strum = { workspace = true, features = ["derive"] }
8183
tokio = { workspace = true, optional = true }
8284
typed-builder = { workspace = true }
8385
url = { workspace = true }
@@ -86,7 +88,6 @@ zstd = { workspace = true }
8688

8789
[dev-dependencies]
8890
ctor = { workspace = true }
89-
expect-test = { workspace = true }
9091
iceberg-catalog-memory = { workspace = true }
9192
iceberg_test_utils = { path = "../test_utils", features = ["tests"] }
9293
pretty_assertions = { workspace = true }

crates/iceberg/src/inspect/manifests.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -166,9 +166,9 @@ impl<'a> ManifestsTable<'a> {
166166
#[cfg(test)]
167167
mod tests {
168168
use expect_test::expect;
169+
use futures::TryStreamExt;
169170

170-
use crate::inspect::metadata_table::tests::check_record_batches;
171-
use crate::scan::tests::TableTestFixture;
171+
use crate::{scan::tests::TableTestFixture, test_utils::check_record_batches};
172172

173173
#[tokio::test]
174174
async fn test_manifests_table() {
@@ -178,7 +178,7 @@ mod tests {
178178
let batch_stream = fixture.table.inspect().manifests().scan().await.unwrap();
179179

180180
check_record_batches(
181-
batch_stream,
181+
batch_stream.try_collect::<Vec<_>>().await.unwrap(),
182182
expect![[r#"
183183
Field { name: "content", data_type: Int8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
184184
Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
@@ -263,6 +263,6 @@ mod tests {
263263
]"#]],
264264
&["path", "length"],
265265
Some("path"),
266-
).await;
266+
);
267267
}
268268
}

crates/iceberg/src/inspect/metadata_table.rs

Lines changed: 37 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,43 @@ use crate::table::Table;
2727
#[derive(Debug)]
2828
pub struct MetadataTable<'a>(&'a Table);
2929

30+
/// Metadata table type.
31+
#[derive(Debug, Clone, strum::EnumIter)]
32+
pub enum MetadataTableType {
33+
/// [`SnapshotsTable`]
34+
Snapshots,
35+
/// [`ManifestsTable`]
36+
Manifests,
37+
}
38+
39+
impl MetadataTableType {
40+
/// Returns the string representation of the metadata table type.
41+
pub fn as_str(&self) -> &str {
42+
match self {
43+
MetadataTableType::Snapshots => "snapshots",
44+
MetadataTableType::Manifests => "manifests",
45+
}
46+
}
47+
48+
/// Returns all the metadata table types.
49+
pub fn all_types() -> impl Iterator<Item = Self> {
50+
use strum::IntoEnumIterator;
51+
Self::iter()
52+
}
53+
}
54+
55+
impl TryFrom<&str> for MetadataTableType {
56+
type Error = String;
57+
58+
fn try_from(value: &str) -> std::result::Result<Self, String> {
59+
match value {
60+
"snapshots" => Ok(Self::Snapshots),
61+
"manifests" => Ok(Self::Manifests),
62+
_ => Err(format!("invalid metadata table type: {value}")),
63+
}
64+
}
65+
}
66+
3067
impl<'a> MetadataTable<'a> {
3168
/// Creates a new metadata scan.
3269
pub fn new(table: &'a Table) -> Self {
@@ -43,67 +80,3 @@ impl<'a> MetadataTable<'a> {
4380
ManifestsTable::new(self.0)
4481
}
4582
}
46-
47-
#[cfg(test)]
48-
pub mod tests {
49-
use expect_test::Expect;
50-
use futures::TryStreamExt;
51-
use itertools::Itertools;
52-
53-
use crate::scan::ArrowRecordBatchStream;
54-
55-
/// Snapshot testing to check the resulting record batch.
56-
///
57-
/// - `expected_schema/data`: put `expect![[""]]` as a placeholder,
58-
/// and then run test with `UPDATE_EXPECT=1 cargo test` to automatically update the result,
59-
/// or use rust-analyzer (see [video](https://github.com/rust-analyzer/expect-test)).
60-
/// Check the doc of [`expect_test`] for more details.
61-
/// - `ignore_check_columns`: Some columns are not stable, so we can skip them.
62-
/// - `sort_column`: The order of the data might be non-deterministic, so we can sort it by a column.
63-
pub async fn check_record_batches(
64-
batch_stream: ArrowRecordBatchStream,
65-
expected_schema: Expect,
66-
expected_data: Expect,
67-
ignore_check_columns: &[&str],
68-
sort_column: Option<&str>,
69-
) {
70-
let record_batches = batch_stream.try_collect::<Vec<_>>().await.unwrap();
71-
assert!(!record_batches.is_empty(), "Empty record batches");
72-
73-
// Combine record batches using the first batch's schema
74-
let first_batch = record_batches.first().unwrap();
75-
let record_batch =
76-
arrow_select::concat::concat_batches(&first_batch.schema(), &record_batches).unwrap();
77-
78-
let mut columns = record_batch.columns().to_vec();
79-
if let Some(sort_column) = sort_column {
80-
let column = record_batch.column_by_name(sort_column).unwrap();
81-
let indices = arrow_ord::sort::sort_to_indices(column, None, None).unwrap();
82-
columns = columns
83-
.iter()
84-
.map(|column| arrow_select::take::take(column.as_ref(), &indices, None).unwrap())
85-
.collect_vec();
86-
}
87-
88-
expected_schema.assert_eq(&format!(
89-
"{}",
90-
record_batch.schema().fields().iter().format(",\n")
91-
));
92-
expected_data.assert_eq(&format!(
93-
"{}",
94-
record_batch
95-
.schema()
96-
.fields()
97-
.iter()
98-
.zip_eq(columns)
99-
.map(|(field, column)| {
100-
if ignore_check_columns.contains(&field.name().as_str()) {
101-
format!("{}: (skipped)", field.name())
102-
} else {
103-
format!("{}: {:?}", field.name(), column)
104-
}
105-
})
106-
.format(",\n")
107-
));
108-
}
109-
}

crates/iceberg/src/inspect/snapshots.rs

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -94,14 +94,17 @@ impl<'a> SnapshotsTable<'a> {
9494
summary.append(true)?;
9595
}
9696

97-
let batch = RecordBatch::try_new(Arc::new(self.schema()), vec![
98-
Arc::new(committed_at.finish()),
99-
Arc::new(snapshot_id.finish()),
100-
Arc::new(parent_id.finish()),
101-
Arc::new(operation.finish()),
102-
Arc::new(manifest_list.finish()),
103-
Arc::new(summary.finish()),
104-
])?;
97+
let batch = RecordBatch::try_new(
98+
Arc::new(self.schema()),
99+
vec![
100+
Arc::new(committed_at.finish()),
101+
Arc::new(snapshot_id.finish()),
102+
Arc::new(parent_id.finish()),
103+
Arc::new(operation.finish()),
104+
Arc::new(manifest_list.finish()),
105+
Arc::new(summary.finish()),
106+
],
107+
)?;
105108

106109
Ok(stream::iter(vec![Ok(batch)]).boxed())
107110
}
@@ -110,9 +113,9 @@ impl<'a> SnapshotsTable<'a> {
110113
#[cfg(test)]
111114
mod tests {
112115
use expect_test::expect;
116+
use futures::TryStreamExt;
113117

114-
use crate::inspect::metadata_table::tests::check_record_batches;
115-
use crate::scan::tests::TableTestFixture;
118+
use crate::{scan::tests::TableTestFixture, test_utils::check_record_batches};
116119

117120
#[tokio::test]
118121
async fn test_snapshots_table() {
@@ -121,7 +124,7 @@ mod tests {
121124
let batch_stream = table.inspect().snapshots().scan().await.unwrap();
122125

123126
check_record_batches(
124-
batch_stream,
127+
batch_stream.try_collect::<Vec<_>>().await.unwrap(),
125128
expect![[r#"
126129
Field { name: "committed_at", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
127130
Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
@@ -184,6 +187,6 @@ mod tests {
184187
]"#]],
185188
&["manifest_list"],
186189
Some("committed_at"),
187-
).await;
190+
);
188191
}
189192
}

crates/iceberg/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ pub mod transform;
8383
mod runtime;
8484

8585
pub mod arrow;
86+
pub mod test_utils;
8687
mod utils;
8788
pub mod writer;
8889

crates/iceberg/src/test_utils.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Test utilities.
19+
//! This module is pub just for internal testing.
20+
//! It is subject to change and is not intended to be used by external users.
21+
22+
use arrow_array::RecordBatch;
23+
use expect_test::Expect;
24+
use itertools::Itertools;
25+
26+
/// Snapshot testing to check the resulting record batch.
27+
///
28+
/// - `expected_schema/data`: put `expect![[""]]` as a placeholder,
29+
/// and then run test with `UPDATE_EXPECT=1 cargo test` to automatically update the result,
30+
/// or use rust-analyzer (see [video](https://github.com/rust-analyzer/expect-test)).
31+
/// Check the doc of [`expect_test`] for more details.
32+
/// - `ignore_check_columns`: Some columns are not stable, so we can skip them.
33+
/// - `sort_column`: The order of the data might be non-deterministic, so we can sort it by a column.
34+
pub fn check_record_batches(
35+
record_batches: Vec<RecordBatch>,
36+
expected_schema: Expect,
37+
expected_data: Expect,
38+
ignore_check_columns: &[&str],
39+
sort_column: Option<&str>,
40+
) {
41+
assert!(!record_batches.is_empty(), "Empty record batches");
42+
43+
// Combine record batches using the first batch's schema
44+
let first_batch = record_batches.first().unwrap();
45+
let record_batch =
46+
arrow_select::concat::concat_batches(&first_batch.schema(), &record_batches).unwrap();
47+
48+
let mut columns = record_batch.columns().to_vec();
49+
if let Some(sort_column) = sort_column {
50+
let column = record_batch.column_by_name(sort_column).unwrap();
51+
let indices = arrow_ord::sort::sort_to_indices(column, None, None).unwrap();
52+
columns = columns
53+
.iter()
54+
.map(|column| arrow_select::take::take(column.as_ref(), &indices, None).unwrap())
55+
.collect_vec();
56+
}
57+
58+
expected_schema.assert_eq(&format!(
59+
"{}",
60+
record_batch.schema().fields().iter().format(",\n")
61+
));
62+
expected_data.assert_eq(&format!(
63+
"{}",
64+
record_batch
65+
.schema()
66+
.fields()
67+
.iter()
68+
.zip_eq(columns)
69+
.map(|(field, column)| {
70+
if ignore_check_columns.contains(&field.name().as_str()) {
71+
format!("{}: (skipped)", field.name())
72+
} else {
73+
format!("{}: {:?}", field.name(), column)
74+
}
75+
})
76+
.format(",\n")
77+
));
78+
}

crates/integrations/datafusion/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,6 @@ iceberg = { workspace = true }
3737
tokio = { workspace = true }
3838

3939
[dev-dependencies]
40+
expect-test = { workspace = true }
4041
iceberg-catalog-memory = { workspace = true }
4142
tempfile = { workspace = true }

0 commit comments

Comments
 (0)