Skip to content

fix: coerce int96 resolution inside of list, struct, and map types #16058

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
May 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
cb05d64
Add test generated from schema in Comet.
mbutrovich May 12, 2025
2cd5942
Checkpoint DFS.
mbutrovich May 14, 2025
a9cc08e
Checkpoint with working transformation.
mbutrovich May 14, 2025
6eecac4
fmt, clippy fixes.
mbutrovich May 14, 2025
b33a95a
Remove maximum stack depth.
mbutrovich May 14, 2025
221627b
More testing.
mbutrovich May 14, 2025
aad4ce6
Improve tests.
mbutrovich May 15, 2025
9129c55
Improve docs.
mbutrovich May 15, 2025
6372f1f
Use a smaller HashSet instead of HashMap with every field in it. More…
mbutrovich May 15, 2025
ea38af5
Use a smaller HashSet instead of HashMap with every field in it. More…
mbutrovich May 15, 2025
3005a15
More docs.
mbutrovich May 15, 2025
c50e737
More docs.
mbutrovich May 15, 2025
034a776
Fix typo.
mbutrovich May 15, 2025
2b9d32a
Merge branch 'main' into int96_again_again
mbutrovich May 15, 2025
1f96786
Refactor match with nested if lets to make it more readable.
mbutrovich May 15, 2025
957ff63
Address some PR feedback.
mbutrovich May 15, 2025
0e272f6
Rename variables in struct processing to address PR feedback. Do List…
mbutrovich May 15, 2025
5fbe458
Rename variables in list processing to address PR feedback.
mbutrovich May 15, 2025
1ddb8c1
Update docs.
mbutrovich May 15, 2025
10d378f
Simplify list parquet path generation.
mbutrovich May 15, 2025
247866d
Map support.
mbutrovich May 15, 2025
74019a5
Remove old TODO.
mbutrovich May 15, 2025
9baad44
Merge branch 'main' into int96_again_again
mbutrovich May 15, 2025
591440a
Merge branch 'main' into int96_again_again
mbutrovich May 16, 2025
76777af
Reduce redundant docs be referring to docs above.
mbutrovich May 16, 2025
e805858
Reduce redundant docs be referring to docs above.
mbutrovich May 16, 2025
08ad98b
Add parquet file generated from CometFuzzTestSuite ParquetGenerator (…
mbutrovich May 16, 2025
be54b6a
Fix clippy.
mbutrovich May 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 119 additions & 1 deletion datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ mod tests {
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaBuilder};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
use arrow_schema::SchemaRef;
use arrow_schema::{SchemaRef, TimeUnit};
use bytes::{BufMut, BytesMut};
use datafusion_common::config::TableParquetOptions;
use datafusion_common::test_util::{batches_to_sort_string, batches_to_string};
Expand Down Expand Up @@ -1229,6 +1229,124 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn parquet_exec_with_int96_nested() -> Result<()> {
// This test ensures that we maintain compatibility with coercing int96 to the desired
// resolution when they're within a nested type (e.g., struct, map, list). This file
// originates from a modified CometFuzzTestSuite ParquetGenerator to generate combinations
// of primitive and complex columns using int96. Other tests cover reading the data
// correctly with this coercion. Here we're only checking the coerced schema is correct.
let testdata = "../../datafusion/core/tests/data";
let filename = "int96_nested.parquet";
let session_ctx = SessionContext::new();
let state = session_ctx.state();
let task_ctx = state.task_ctx();

let parquet_exec = scan_format(
&state,
&ParquetFormat::default().with_coerce_int96(Some("us".to_string())),
None,
testdata,
filename,
None,
None,
)
.await
.unwrap();
assert_eq!(parquet_exec.output_partitioning().partition_count(), 1);

let mut results = parquet_exec.execute(0, task_ctx.clone())?;
let batch = results.next().await.unwrap()?;

let expected_schema = Arc::new(Schema::new(vec![
Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None), true),
Field::new_struct(
"c1",
vec![Field::new(
"c0",
DataType::Timestamp(TimeUnit::Microsecond, None),
true,
)],
true,
),
Field::new_struct(
"c2",
vec![Field::new_list(
"c0",
Field::new(
"element",
DataType::Timestamp(TimeUnit::Microsecond, None),
true,
),
true,
)],
true,
),
Field::new_map(
"c3",
"key_value",
Field::new(
"key",
DataType::Timestamp(TimeUnit::Microsecond, None),
false,
),
Field::new(
"value",
DataType::Timestamp(TimeUnit::Microsecond, None),
true,
),
false,
true,
),
Field::new_list(
"c4",
Field::new(
"element",
DataType::Timestamp(TimeUnit::Microsecond, None),
true,
),
true,
),
Field::new_list(
"c5",
Field::new_struct(
"element",
vec![Field::new(
"c0",
DataType::Timestamp(TimeUnit::Microsecond, None),
true,
)],
true,
),
true,
),
Field::new_list(
"c6",
Field::new_map(
"element",
"key_value",
Field::new(
"key",
DataType::Timestamp(TimeUnit::Microsecond, None),
false,
),
Field::new(
"value",
DataType::Timestamp(TimeUnit::Microsecond, None),
true,
),
false,
true,
),
true,
),
]));

assert_eq!(batch.schema(), expected_schema);

Ok(())
}

#[tokio::test]
async fn parquet_exec_with_range() -> Result<()> {
fn file_range(meta: &ObjectMeta, start: i64, end: i64) -> PartitionedFile {
Expand Down
Binary file added datafusion/core/tests/data/int96_nested.parquet
Binary file not shown.
Loading