Skip to content

Commit f830d17

Browse files
committed
feat(datafusion): support metadata tables for Datafusion
Signed-off-by: xxchan <[email protected]>
1 parent 314af4c commit f830d17

File tree

15 files changed

+522
-87
lines changed

15 files changed

+522
-87
lines changed

Cargo.lock

+28-4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ serde_derive = "1.0.204"
9595
serde_json = "1.0.138"
9696
serde_repr = "0.1.16"
9797
serde_with = "3.4"
98+
strum = "0.27"
9899
tempfile = "3.18"
99100
thrift = "0.17.0"
100101
tokio = { version = "1.36", default-features = false }

crates/iceberg/Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ bitvec = { workspace = true }
5959
bytes = { workspace = true }
6060
chrono = { workspace = true }
6161
derive_builder = { workspace = true }
62+
expect-test = { workspace = true }
6263
fnv = { workspace = true }
6364
futures = { workspace = true }
6465
itertools = { workspace = true }
@@ -80,6 +81,7 @@ serde_derive = { workspace = true }
8081
serde_json = { workspace = true }
8182
serde_repr = { workspace = true }
8283
serde_with = { workspace = true }
84+
strum = { workspace = true, features = ["derive"] }
8385
thrift = { workspace = true }
8486
tokio = { workspace = true, optional = true, features = ["sync"] }
8587
typed-builder = { workspace = true }
@@ -89,7 +91,6 @@ zstd = { workspace = true }
8991

9092
[dev-dependencies]
9193
ctor = { workspace = true }
92-
expect-test = { workspace = true }
9394
iceberg-catalog-memory = { workspace = true }
9495
iceberg_test_utils = { path = "../test_utils", features = ["tests"] }
9596
pretty_assertions = { workspace = true }

crates/iceberg/src/inspect/manifests.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -258,9 +258,10 @@ impl<'a> ManifestsTable<'a> {
258258
#[cfg(test)]
259259
mod tests {
260260
use expect_test::expect;
261+
use futures::TryStreamExt;
261262

262-
use crate::inspect::metadata_table::tests::check_record_batches;
263263
use crate::scan::tests::TableTestFixture;
264+
use crate::test_utils::check_record_batches;
264265

265266
#[tokio::test]
266267
async fn test_manifests_table() {
@@ -270,7 +271,7 @@ mod tests {
270271
let record_batch = fixture.table.inspect().manifests().scan().await.unwrap();
271272

272273
check_record_batches(
273-
record_batch,
274+
record_batch.try_collect::<Vec<_>>().await.unwrap(),
274275
expect![[r#"
275276
Field { name: "content", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "14"} },
276277
Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} },
@@ -355,6 +356,6 @@ mod tests {
355356
]"#]],
356357
&["path", "length"],
357358
Some("path"),
358-
).await;
359+
);
359360
}
360361
}

crates/iceberg/src/inspect/metadata_table.rs

+37-64
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,43 @@ use crate::table::Table;
2727
#[derive(Debug)]
2828
pub struct MetadataTable<'a>(&'a Table);
2929

30+
/// Metadata table type.
31+
#[derive(Debug, Clone, strum::EnumIter)]
32+
pub enum MetadataTableType {
33+
/// [`SnapshotsTable`]
34+
Snapshots,
35+
/// [`ManifestsTable`]
36+
Manifests,
37+
}
38+
39+
impl MetadataTableType {
40+
/// Returns the string representation of the metadata table type.
41+
pub fn as_str(&self) -> &str {
42+
match self {
43+
MetadataTableType::Snapshots => "snapshots",
44+
MetadataTableType::Manifests => "manifests",
45+
}
46+
}
47+
48+
/// Returns all the metadata table types.
49+
pub fn all_types() -> impl Iterator<Item = Self> {
50+
use strum::IntoEnumIterator;
51+
Self::iter()
52+
}
53+
}
54+
55+
impl TryFrom<&str> for MetadataTableType {
56+
type Error = String;
57+
58+
fn try_from(value: &str) -> std::result::Result<Self, String> {
59+
match value {
60+
"snapshots" => Ok(Self::Snapshots),
61+
"manifests" => Ok(Self::Manifests),
62+
_ => Err(format!("invalid metadata table type: {value}")),
63+
}
64+
}
65+
}
66+
3067
impl<'a> MetadataTable<'a> {
3168
/// Creates a new metadata scan.
3269
pub fn new(table: &'a Table) -> Self {
@@ -43,67 +80,3 @@ impl<'a> MetadataTable<'a> {
4380
ManifestsTable::new(self.0)
4481
}
4582
}
46-
47-
#[cfg(test)]
48-
pub mod tests {
49-
use expect_test::Expect;
50-
use futures::TryStreamExt;
51-
use itertools::Itertools;
52-
53-
use crate::scan::ArrowRecordBatchStream;
54-
55-
/// Snapshot testing to check the resulting record batch.
56-
///
57-
/// - `expected_schema/data`: put `expect![[""]]` as a placeholder,
58-
/// and then run test with `UPDATE_EXPECT=1 cargo test` to automatically update the result,
59-
/// or use rust-analyzer (see [video](https://github.com/rust-analyzer/expect-test)).
60-
/// Check the doc of [`expect_test`] for more details.
61-
/// - `ignore_check_columns`: Some columns are not stable, so we can skip them.
62-
/// - `sort_column`: The order of the data might be non-deterministic, so we can sort it by a column.
63-
pub async fn check_record_batches(
64-
batch_stream: ArrowRecordBatchStream,
65-
expected_schema: Expect,
66-
expected_data: Expect,
67-
ignore_check_columns: &[&str],
68-
sort_column: Option<&str>,
69-
) {
70-
let record_batches = batch_stream.try_collect::<Vec<_>>().await.unwrap();
71-
assert!(!record_batches.is_empty(), "Empty record batches");
72-
73-
// Combine record batches using the first batch's schema
74-
let first_batch = record_batches.first().unwrap();
75-
let record_batch =
76-
arrow_select::concat::concat_batches(&first_batch.schema(), &record_batches).unwrap();
77-
78-
let mut columns = record_batch.columns().to_vec();
79-
if let Some(sort_column) = sort_column {
80-
let column = record_batch.column_by_name(sort_column).unwrap();
81-
let indices = arrow_ord::sort::sort_to_indices(column, None, None).unwrap();
82-
columns = columns
83-
.iter()
84-
.map(|column| arrow_select::take::take(column.as_ref(), &indices, None).unwrap())
85-
.collect_vec();
86-
}
87-
88-
expected_schema.assert_eq(&format!(
89-
"{}",
90-
record_batch.schema().fields().iter().format(",\n")
91-
));
92-
expected_data.assert_eq(&format!(
93-
"{}",
94-
record_batch
95-
.schema()
96-
.fields()
97-
.iter()
98-
.zip_eq(columns)
99-
.map(|(field, column)| {
100-
if ignore_check_columns.contains(&field.name().as_str()) {
101-
format!("{}: (skipped)", field.name())
102-
} else {
103-
format!("{}: {:?}", field.name(), column)
104-
}
105-
})
106-
.format(",\n")
107-
));
108-
}
109-
}

crates/iceberg/src/inspect/snapshots.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,10 @@ impl<'a> SnapshotsTable<'a> {
110110
#[cfg(test)]
111111
mod tests {
112112
use expect_test::expect;
113+
use futures::TryStreamExt;
113114

114-
use crate::inspect::metadata_table::tests::check_record_batches;
115115
use crate::scan::tests::TableTestFixture;
116+
use crate::test_utils::check_record_batches;
116117

117118
#[tokio::test]
118119
async fn test_snapshots_table() {
@@ -121,7 +122,7 @@ mod tests {
121122
let batch_stream = table.inspect().snapshots().scan().await.unwrap();
122123

123124
check_record_batches(
124-
batch_stream,
125+
batch_stream.try_collect::<Vec<_>>().await.unwrap(),
125126
expect![[r#"
126127
Field { name: "committed_at", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
127128
Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
@@ -184,6 +185,6 @@ mod tests {
184185
]"#]],
185186
&["manifest_list"],
186187
Some("committed_at"),
187-
).await;
188+
);
188189
}
189190
}

crates/iceberg/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ mod runtime;
8484

8585
pub mod arrow;
8686
pub(crate) mod delete_file_index;
87+
pub mod test_utils;
8788
mod utils;
8889
pub mod writer;
8990

crates/iceberg/src/test_utils.rs

+78
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Test utilities.
19+
//! This module is pub just for internal testing.
20+
//! It is subject to change and is not intended to be used by external users.
21+
22+
use arrow_array::RecordBatch;
23+
use expect_test::Expect;
24+
use itertools::Itertools;
25+
26+
/// Snapshot testing to check the resulting record batch.
27+
///
28+
/// - `expected_schema/data`: put `expect![[""]]` as a placeholder,
29+
/// and then run test with `UPDATE_EXPECT=1 cargo test` to automatically update the result,
30+
/// or use rust-analyzer (see [video](https://github.com/rust-analyzer/expect-test)).
31+
/// Check the doc of [`expect_test`] for more details.
32+
/// - `ignore_check_columns`: Some columns are not stable, so we can skip them.
33+
/// - `sort_column`: The order of the data might be non-deterministic, so we can sort it by a column.
34+
pub fn check_record_batches(
35+
record_batches: Vec<RecordBatch>,
36+
expected_schema: Expect,
37+
expected_data: Expect,
38+
ignore_check_columns: &[&str],
39+
sort_column: Option<&str>,
40+
) {
41+
assert!(!record_batches.is_empty(), "Empty record batches");
42+
43+
// Combine record batches using the first batch's schema
44+
let first_batch = record_batches.first().unwrap();
45+
let record_batch =
46+
arrow_select::concat::concat_batches(&first_batch.schema(), &record_batches).unwrap();
47+
48+
let mut columns = record_batch.columns().to_vec();
49+
if let Some(sort_column) = sort_column {
50+
let column = record_batch.column_by_name(sort_column).unwrap();
51+
let indices = arrow_ord::sort::sort_to_indices(column, None, None).unwrap();
52+
columns = columns
53+
.iter()
54+
.map(|column| arrow_select::take::take(column.as_ref(), &indices, None).unwrap())
55+
.collect_vec();
56+
}
57+
58+
expected_schema.assert_eq(&format!(
59+
"{}",
60+
record_batch.schema().fields().iter().format(",\n")
61+
));
62+
expected_data.assert_eq(&format!(
63+
"{}",
64+
record_batch
65+
.schema()
66+
.fields()
67+
.iter()
68+
.zip_eq(columns)
69+
.map(|(field, column)| {
70+
if ignore_check_columns.contains(&field.name().as_str()) {
71+
format!("{}: (skipped)", field.name())
72+
} else {
73+
format!("{}: {:?}", field.name(), column)
74+
}
75+
})
76+
.format(",\n")
77+
));
78+
}

crates/integrations/datafusion/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ iceberg = { workspace = true }
4040
tokio = { workspace = true }
4141

4242
[dev-dependencies]
43+
expect-test = { workspace = true }
4344
iceberg-catalog-memory = { workspace = true }
4445
parquet = { workspace = true }
4546
tempfile = { workspace = true }

0 commit comments

Comments
 (0)