From cb05d64404f2e5212f3e9e3dcda7ed7ee9194773 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 12 May 2025 16:23:13 -0400 Subject: [PATCH 01/25] Add test generated from schema in Comet. --- .../datasource-parquet/src/file_format.rs | 72 +++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index bc8f84b87454..960dddf2c2d7 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -1577,3 +1577,75 @@ fn create_max_min_accs( .collect(); (max_values, min_values) } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + + use arrow::datatypes::DataType; + use parquet::schema::parser::parse_message_type; + + #[test] + fn coerce_int96_to_resolution_with_complex_types() { + let spark_schema = " + message spark_schema { + optional int96 c0; + optional group c1 { + optional int96 c0; + } + optional group c2 { + optional group c0 (LIST) { + repeated group list { + optional int96 element; + } + } + } + optional group c3 (LIST) { + repeated group list { + optional int96 element; + } + } + optional group c4 (LIST) { + repeated group list { + optional group element { + optional int96 c0; + } + } + } + } + "; + + let schema = parse_message_type(spark_schema).expect("should parse schema"); + let descr = SchemaDescriptor::new(Arc::new(schema)); + + let arrow_schema = parquet_to_arrow_schema(&descr, None).unwrap(); + + let result = + coerce_int96_to_resolution(&descr, &arrow_schema, &TimeUnit::Microsecond) + .unwrap(); + + result + .flattened_fields() + .iter() + .for_each(|field| match field.data_type() { + DataType::Timestamp(TimeUnit::Microsecond, None) => {} + DataType::Struct(fields) => { + assert_eq!( + fields[0].data_type(), + &DataType::Timestamp(TimeUnit::Microsecond, None) + ) + } + DataType::List(field) => { + assert_eq!( + field.data_type(), + &DataType::Timestamp(TimeUnit::Microsecond, None) + ) + } + _ => { + assert!(false); + } + }) + } +} From 2cd5942692bed33a49da011b8777412eba06187f Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 14 May 2025 14:37:18 -0400 Subject: [PATCH 02/25] Checkpoint DFS. --- .../datasource-parquet/src/file_format.rs | 172 ++++++++++++++---- 1 file changed, 136 insertions(+), 36 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 960dddf2c2d7..3a0cd6dd88ed 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -18,9 +18,11 @@ //! [`ParquetFormat`]: Parquet [`FileFormat`] abstractions use std::any::Any; +use std::cell::RefCell; use std::fmt; use std::fmt::Debug; -use std::ops::Range; +use std::ops::{Deref, Range}; +use std::rc::Rc; use std::sync::Arc; use arrow::array::RecordBatch; @@ -561,12 +563,14 @@ pub fn coerce_int96_to_resolution( let parquet_fields: HashMap<_, _> = parquet_schema .columns() .iter() - .map(|f| { + .enumerate() + .map(|(idx, f)| { let dt = f.physical_type(); if dt.eq(&Type::INT96) { transform = true; } - (f.name(), dt) + println!("{:?}, {:?}", f.path().string(), dt); + ((idx, f.path().string()), dt) }) .collect(); @@ -574,21 +578,129 @@ pub fn coerce_int96_to_resolution( return None; } - let transformed_fields: Vec> = file_schema + type NestedFields = Rc>>; + + let max_level = None; + + let fields = Rc::new(RefCell::new(Vec::with_capacity(file_schema.fields.len()))); + + let normalized_schema = { + let max_level = match max_level.unwrap_or(usize::MAX) { + 0 => usize::MAX, + val => val, + }; + // TODO: Only DFS fields that need it. + let mut stack: Vec<(usize, &FieldRef, NestedFields, Option)> = + file_schema + .fields() + .iter() + .rev() + .map(|f| (0, f, fields.clone(), None)) + .collect(); + + while let Some((depth, field_ref, parent_fields, child_fields)) = stack.pop() { + match field_ref.data_type() { + DataType::Struct(ff) if depth < max_level => { + if let Some(child_fields) = child_fields { + // This is the second time popping off this struct. + assert_eq!(child_fields.borrow().len(), ff.len()); + let updated_field = Field::new_struct( + field_ref.name(), + child_fields.borrow().as_slice(), + field_ref.is_nullable(), + ); + parent_fields.borrow_mut().push(Arc::new(updated_field)); + } else { + let child_fields = + Rc::new(RefCell::new(Vec::with_capacity(ff.len()))); + stack.push(( + depth, + field_ref, + parent_fields, + Some(child_fields.clone()), + )); + // Need to zip these in reverse to maintain original order + for fff in ff.into_iter().rev() { + stack.push((depth + 1, fff, child_fields.clone(), None)); + } + } + } + DataType::List(ff) if depth < max_level => { + if let Some(child_fields) = child_fields { + // This is the second time popping off this list. + assert_eq!(child_fields.borrow().len(), 1); + let updated_field = Field::new_list( + field_ref.name(), + child_fields.borrow()[0].clone(), + field_ref.is_nullable(), + ); + parent_fields.borrow_mut().push(Arc::new(updated_field)); + } else { + let child_fields = Rc::new(RefCell::new(Vec::with_capacity(1))); + stack.push(( + depth, + field_ref, + parent_fields, + Some(child_fields.clone()), + )); + stack.push((depth + 1, ff, child_fields.clone(), None)); + } + } + _ => { + let updated_field = Field::new( + field_ref.name(), + field_ref.data_type().clone(), + field_ref.is_nullable(), + ); + parent_fields.borrow_mut().push(Arc::new(updated_field)); + } + } + } + assert_eq!(fields.borrow().len(), file_schema.fields.len()); + Schema::new(fields.borrow_mut().deref().as_slice()) + }; + + // let normalized_schema = file_schema.normalize(".", None).unwrap(); + println!("normalized fields"); + normalized_schema .fields .iter() - .map(|field| match parquet_fields.get(field.name().as_str()) { - Some(Type::INT96) => { - field_with_new_type(field, DataType::Timestamp(*time_unit, None)) - } - _ => Arc::clone(field), - }) - .collect(); + .for_each(|field| println!("{:?}", field)); - Some(Schema::new_with_metadata( - transformed_fields, - file_schema.metadata.clone(), - )) + println!("file_schema fields"); + file_schema + .fields + .iter() + .for_each(|field| println!("{:?}", field)); + + // file_schema + // .fields + // .iter() + // .zip(normalized_schema.fields.iter()) + // .enumerate() + // .for_each(|(idx, (field, normalized_field))| { + // println!("idx: {:?}", idx); + // println!("field: {:?}", field); + // println!("normalized field: {:?}", normalized_field); + // }); + + // let transformed_fields: Vec> = file_schema + // .fields + // .iter() + // .map(|field| match parquet_fields.get(field.name().as_str()) { + // Some(Type::INT96) => { + // field_with_new_type(field, DataType::Timestamp(*time_unit, None)) + // } + // _ => Arc::clone(field), + // }) + // .collect(); + + // Some(Schema::new_with_metadata( + // transformed_fields, + // file_schema.metadata.clone(), + // )) + + return None; } /// Coerces the file schema if the table schema uses a view type. @@ -1611,6 +1723,7 @@ mod tests { repeated group list { optional group element { optional int96 c0; + optional int96 c1; } } } @@ -1626,26 +1739,13 @@ mod tests { coerce_int96_to_resolution(&descr, &arrow_schema, &TimeUnit::Microsecond) .unwrap(); - result - .flattened_fields() - .iter() - .for_each(|field| match field.data_type() { - DataType::Timestamp(TimeUnit::Microsecond, None) => {} - DataType::Struct(fields) => { - assert_eq!( - fields[0].data_type(), - &DataType::Timestamp(TimeUnit::Microsecond, None) - ) - } - DataType::List(field) => { - assert_eq!( - field.data_type(), - &DataType::Timestamp(TimeUnit::Microsecond, None) - ) - } - _ => { - assert!(false); - } - }) + result.flattened_fields().iter().for_each(|field| { + if field.data_type().is_primitive() { + assert_eq!( + field.data_type(), + &DataType::Timestamp(TimeUnit::Microsecond, None) + ); + } + }) } } From a9cc08eed1edf2d6e51701d7d3560b50776e9c43 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 14 May 2025 15:18:26 -0400 Subject: [PATCH 03/25] Checkpoint with working transformation. --- .../datasource-parquet/src/file_format.rs | 121 ++++++++++-------- 1 file changed, 66 insertions(+), 55 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 3a0cd6dd88ed..be72178aa56f 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -563,14 +563,13 @@ pub fn coerce_int96_to_resolution( let parquet_fields: HashMap<_, _> = parquet_schema .columns() .iter() - .enumerate() - .map(|(idx, f)| { + .map(|f| { let dt = f.physical_type(); if dt.eq(&Type::INT96) { transform = true; } println!("{:?}, {:?}", f.path().string(), dt); - ((idx, f.path().string()), dt) + (f.path().string(), dt) }) .collect(); @@ -584,29 +583,40 @@ pub fn coerce_int96_to_resolution( let fields = Rc::new(RefCell::new(Vec::with_capacity(file_schema.fields.len()))); - let normalized_schema = { + let transformed_schema = { let max_level = match max_level.unwrap_or(usize::MAX) { 0 => usize::MAX, val => val, }; // TODO: Only DFS fields that need it. - let mut stack: Vec<(usize, &FieldRef, NestedFields, Option)> = - file_schema - .fields() - .iter() - .rev() - .map(|f| (0, f, fields.clone(), None)) - .collect(); + let mut stack: Vec<( + usize, + Vec<&str>, + &FieldRef, + NestedFields, + Option, + )> = file_schema + .fields() + .iter() + .rev() + .map(|f| { + let name_vec: Vec<&str> = vec![f.name()]; + (0, name_vec, f, fields.clone(), None) + }) + .collect(); - while let Some((depth, field_ref, parent_fields, child_fields)) = stack.pop() { + while let Some((depth, parquet_path, field_ref, parent_fields, child_fields)) = + stack.pop() + { match field_ref.data_type() { DataType::Struct(ff) if depth < max_level => { if let Some(child_fields) = child_fields { // This is the second time popping off this struct. - assert_eq!(child_fields.borrow().len(), ff.len()); + let child_fields = child_fields.borrow(); + assert_eq!(child_fields.len(), ff.len()); let updated_field = Field::new_struct( field_ref.name(), - child_fields.borrow().as_slice(), + child_fields.as_slice(), field_ref.is_nullable(), ); parent_fields.borrow_mut().push(Arc::new(updated_field)); @@ -615,13 +625,23 @@ pub fn coerce_int96_to_resolution( Rc::new(RefCell::new(Vec::with_capacity(ff.len()))); stack.push(( depth, + parquet_path.clone(), field_ref, parent_fields, Some(child_fields.clone()), )); // Need to zip these in reverse to maintain original order for fff in ff.into_iter().rev() { - stack.push((depth + 1, fff, child_fields.clone(), None)); + let mut parquet_path = parquet_path.clone(); + parquet_path.push("."); + parquet_path.push(fff.name()); + stack.push(( + depth + 1, + parquet_path, + fff, + child_fields.clone(), + None, + )); } } } @@ -637,32 +657,50 @@ pub fn coerce_int96_to_resolution( parent_fields.borrow_mut().push(Arc::new(updated_field)); } else { let child_fields = Rc::new(RefCell::new(Vec::with_capacity(1))); + let mut parquet_path = parquet_path.clone(); + parquet_path.push(".list"); stack.push(( depth, + parquet_path.clone(), field_ref, parent_fields, Some(child_fields.clone()), )); - stack.push((depth + 1, ff, child_fields.clone(), None)); + let mut parquet_path = parquet_path.clone(); + parquet_path.push("."); + parquet_path.push(ff.name()); + stack.push(( + depth + 1, + parquet_path.clone(), + ff, + child_fields.clone(), + None, + )); } } _ => { - let updated_field = Field::new( - field_ref.name(), - field_ref.data_type().clone(), - field_ref.is_nullable(), - ); - parent_fields.borrow_mut().push(Arc::new(updated_field)); + let parquet_path = parquet_path.concat(); + if let Some(&Type::INT96) = parquet_fields.get(parquet_path.as_str()) + { + parent_fields.borrow_mut().push(field_with_new_type( + field_ref, + DataType::Timestamp(*time_unit, None), + )); + } else { + parent_fields.borrow_mut().push(field_ref.clone()); + } } } } assert_eq!(fields.borrow().len(), file_schema.fields.len()); - Schema::new(fields.borrow_mut().deref().as_slice()) + Schema::new_with_metadata( + fields.borrow_mut().clone(), + file_schema.metadata.clone(), + ) }; - // let normalized_schema = file_schema.normalize(".", None).unwrap(); - println!("normalized fields"); - normalized_schema + println!("transformed_schema fields"); + transformed_schema .fields .iter() .for_each(|field| println!("{:?}", field)); @@ -672,35 +710,8 @@ pub fn coerce_int96_to_resolution( .fields .iter() .for_each(|field| println!("{:?}", field)); - - // file_schema - // .fields - // .iter() - // .zip(normalized_schema.fields.iter()) - // .enumerate() - // .for_each(|(idx, (field, normalized_field))| { - // println!("idx: {:?}", idx); - // println!("field: {:?}", field); - // println!("normalized field: {:?}", normalized_field); - // }); - - // let transformed_fields: Vec> = file_schema - // .fields - // .iter() - // .map(|field| match parquet_fields.get(field.name().as_str()) { - // Some(Type::INT96) => { - // field_with_new_type(field, DataType::Timestamp(*time_unit, None)) - // } - // _ => Arc::clone(field), - // }) - // .collect(); - - // Some(Schema::new_with_metadata( - // transformed_fields, - // file_schema.metadata.clone(), - // )) - - return None; + + Some(transformed_schema) } /// Coerces the file schema if the table schema uses a view type. From 6eecac4528dc619595a329920229648615ba436f Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 14 May 2025 15:35:20 -0400 Subject: [PATCH 04/25] fmt, clippy fixes. --- .../datasource-parquet/src/file_format.rs | 44 +++++++++---------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index be72178aa56f..9ca783f4e8c5 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -21,7 +21,7 @@ use std::any::Any; use std::cell::RefCell; use std::fmt; use std::fmt::Debug; -use std::ops::{Deref, Range}; +use std::ops::Range; use std::rc::Rc; use std::sync::Arc; @@ -578,30 +578,30 @@ pub fn coerce_int96_to_resolution( } type NestedFields = Rc>>; - - let max_level = None; + type StackContext<'a> = ( + usize, + Vec<&'a str>, + &'a FieldRef, + NestedFields, + Option, + ); let fields = Rc::new(RefCell::new(Vec::with_capacity(file_schema.fields.len()))); let transformed_schema = { - let max_level = match max_level.unwrap_or(usize::MAX) { - 0 => usize::MAX, - val => val, - }; + // let max_level = match max_level.unwrap_or(usize::MAX) { + // 0 => usize::MAX, + // val => val, + // }; + let max_level = usize::MAX; // TODO: Only DFS fields that need it. - let mut stack: Vec<( - usize, - Vec<&str>, - &FieldRef, - NestedFields, - Option, - )> = file_schema + let mut stack: Vec = file_schema .fields() .iter() .rev() .map(|f| { let name_vec: Vec<&str> = vec![f.name()]; - (0, name_vec, f, fields.clone(), None) + (0, name_vec, f, Rc::clone(&fields), None) }) .collect(); @@ -628,7 +628,7 @@ pub fn coerce_int96_to_resolution( parquet_path.clone(), field_ref, parent_fields, - Some(child_fields.clone()), + Some(Rc::clone(&child_fields)), )); // Need to zip these in reverse to maintain original order for fff in ff.into_iter().rev() { @@ -639,7 +639,7 @@ pub fn coerce_int96_to_resolution( depth + 1, parquet_path, fff, - child_fields.clone(), + Rc::clone(&child_fields), None, )); } @@ -651,7 +651,7 @@ pub fn coerce_int96_to_resolution( assert_eq!(child_fields.borrow().len(), 1); let updated_field = Field::new_list( field_ref.name(), - child_fields.borrow()[0].clone(), + Arc::clone(&child_fields.borrow()[0]), field_ref.is_nullable(), ); parent_fields.borrow_mut().push(Arc::new(updated_field)); @@ -664,7 +664,7 @@ pub fn coerce_int96_to_resolution( parquet_path.clone(), field_ref, parent_fields, - Some(child_fields.clone()), + Some(Rc::clone(&child_fields)), )); let mut parquet_path = parquet_path.clone(); parquet_path.push("."); @@ -673,7 +673,7 @@ pub fn coerce_int96_to_resolution( depth + 1, parquet_path.clone(), ff, - child_fields.clone(), + Rc::clone(&child_fields), None, )); } @@ -687,7 +687,7 @@ pub fn coerce_int96_to_resolution( DataType::Timestamp(*time_unit, None), )); } else { - parent_fields.borrow_mut().push(field_ref.clone()); + parent_fields.borrow_mut().push(Arc::clone(field_ref)); } } } @@ -710,7 +710,7 @@ pub fn coerce_int96_to_resolution( .fields .iter() .for_each(|field| println!("{:?}", field)); - + Some(transformed_schema) } From b33a95a8ee3ac61a5d503c657c1b3bcf09351051 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 14 May 2025 17:13:15 -0400 Subject: [PATCH 05/25] Remove maximum stack depth. --- .../datasource-parquet/src/file_format.rs | 35 +++---------------- 1 file changed, 4 insertions(+), 31 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 9ca783f4e8c5..d49e1c7512d2 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -568,7 +568,6 @@ pub fn coerce_int96_to_resolution( if dt.eq(&Type::INT96) { transform = true; } - println!("{:?}, {:?}", f.path().string(), dt); (f.path().string(), dt) }) .collect(); @@ -579,7 +578,6 @@ pub fn coerce_int96_to_resolution( type NestedFields = Rc>>; type StackContext<'a> = ( - usize, Vec<&'a str>, &'a FieldRef, NestedFields, @@ -589,27 +587,18 @@ pub fn coerce_int96_to_resolution( let fields = Rc::new(RefCell::new(Vec::with_capacity(file_schema.fields.len()))); let transformed_schema = { - // let max_level = match max_level.unwrap_or(usize::MAX) { - // 0 => usize::MAX, - // val => val, - // }; - let max_level = usize::MAX; - // TODO: Only DFS fields that need it. let mut stack: Vec = file_schema .fields() .iter() .rev() - .map(|f| { - let name_vec: Vec<&str> = vec![f.name()]; - (0, name_vec, f, Rc::clone(&fields), None) - }) + .map(|f| (vec![f.name().as_str()], f, Rc::clone(&fields), None)) .collect(); - while let Some((depth, parquet_path, field_ref, parent_fields, child_fields)) = + while let Some((parquet_path, field_ref, parent_fields, child_fields)) = stack.pop() { match field_ref.data_type() { - DataType::Struct(ff) if depth < max_level => { + DataType::Struct(ff) => { if let Some(child_fields) = child_fields { // This is the second time popping off this struct. let child_fields = child_fields.borrow(); @@ -624,7 +613,6 @@ pub fn coerce_int96_to_resolution( let child_fields = Rc::new(RefCell::new(Vec::with_capacity(ff.len()))); stack.push(( - depth, parquet_path.clone(), field_ref, parent_fields, @@ -636,7 +624,6 @@ pub fn coerce_int96_to_resolution( parquet_path.push("."); parquet_path.push(fff.name()); stack.push(( - depth + 1, parquet_path, fff, Rc::clone(&child_fields), @@ -645,7 +632,7 @@ pub fn coerce_int96_to_resolution( } } } - DataType::List(ff) if depth < max_level => { + DataType::List(ff) => { if let Some(child_fields) = child_fields { // This is the second time popping off this list. assert_eq!(child_fields.borrow().len(), 1); @@ -660,7 +647,6 @@ pub fn coerce_int96_to_resolution( let mut parquet_path = parquet_path.clone(); parquet_path.push(".list"); stack.push(( - depth, parquet_path.clone(), field_ref, parent_fields, @@ -670,7 +656,6 @@ pub fn coerce_int96_to_resolution( parquet_path.push("."); parquet_path.push(ff.name()); stack.push(( - depth + 1, parquet_path.clone(), ff, Rc::clone(&child_fields), @@ -699,18 +684,6 @@ pub fn coerce_int96_to_resolution( ) }; - println!("transformed_schema fields"); - transformed_schema - .fields - .iter() - .for_each(|field| println!("{:?}", field)); - - println!("file_schema fields"); - file_schema - .fields - .iter() - .for_each(|field| println!("{:?}", field)); - Some(transformed_schema) } From 221627b65bc890f68154f0daa6e7c0025ae3e9eb Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 14 May 2025 17:50:30 -0400 Subject: [PATCH 06/25] More testing. --- .../datasource-parquet/src/file_format.rs | 65 +++++++++++++++++-- 1 file changed, 58 insertions(+), 7 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index d49e1c7512d2..088187f5cb8d 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -663,18 +663,18 @@ pub fn coerce_int96_to_resolution( )); } } - _ => { - let parquet_path = parquet_path.concat(); - if let Some(&Type::INT96) = parquet_fields.get(parquet_path.as_str()) + _ => match field_ref.data_type() { + DataType::Timestamp(TimeUnit::Nanosecond, None) + if parquet_fields.get(parquet_path.concat().as_str()) + == Some(&Type::INT96) => { parent_fields.borrow_mut().push(field_with_new_type( field_ref, DataType::Timestamp(*time_unit, None), )); - } else { - parent_fields.borrow_mut().push(Arc::clone(field_ref)); } - } + _ => parent_fields.borrow_mut().push(Arc::clone(field_ref)), + }, } } assert_eq!(fields.borrow().len(), file_schema.fields.len()); @@ -1684,7 +1684,58 @@ mod tests { use parquet::schema::parser::parse_message_type; #[test] - fn coerce_int96_to_resolution_with_complex_types() { + fn coerce_int96_to_resolution_with_mixed_timestamps() { + // Unclear if Spark (or other writer) could generate a file with mixed timestamps like this, + // but we want to test the scenario just in case. + let spark_schema = " + message spark_schema { + optional int96 c0; + optional int64 c1 (TIMESTAMP(NANOS,true)); + optional int64 c2 (TIMESTAMP(NANOS,false)); + optional int64 c3 (TIMESTAMP(MILLIS,true)); + optional int64 c4 (TIMESTAMP(MILLIS,false)); + optional int64 c5 (TIMESTAMP(MICROS,true)); + optional int64 c6 (TIMESTAMP(MICROS,false)); + } + "; + + let schema = parse_message_type(spark_schema).expect("should parse schema"); + let descr = SchemaDescriptor::new(Arc::new(schema)); + + let arrow_schema = parquet_to_arrow_schema(&descr, None).unwrap(); + + let result = + coerce_int96_to_resolution(&descr, &arrow_schema, &TimeUnit::Microsecond) + .unwrap(); + + // Only the first field (c0) should be converted to a microsecond timestamp because + let expected_schema = Schema::new(vec![ + Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None), true), + Field::new( + "c1", + DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())), + true, + ), + Field::new("c2", DataType::Timestamp(TimeUnit::Nanosecond, None), true), + Field::new( + "c3", + DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())), + true, + ), + Field::new("c4", DataType::Timestamp(TimeUnit::Millisecond, None), true), + Field::new( + "c5", + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + true, + ), + Field::new("c6", DataType::Timestamp(TimeUnit::Microsecond, None), true), + ]); + + assert_eq!(result, expected_schema); + } + + #[test] + fn coerce_int96_to_resolution_with_nested_types() { let spark_schema = " message spark_schema { optional int96 c0; From aad4ce6bed20d844c7e153421f06adafb0a7a291 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 15 May 2025 09:29:41 -0400 Subject: [PATCH 07/25] Improve tests. --- .../datasource-parquet/src/file_format.rs | 70 ++++++++++++++++--- 1 file changed, 60 insertions(+), 10 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 088187f5cb8d..198ff2363ba7 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -1686,7 +1686,8 @@ mod tests { #[test] fn coerce_int96_to_resolution_with_mixed_timestamps() { // Unclear if Spark (or other writer) could generate a file with mixed timestamps like this, - // but we want to test the scenario just in case. + // but we want to test the scenario just in case since it's at least a valid schema as far + // as the Parquet spec is concerned. let spark_schema = " message spark_schema { optional int96 c0; @@ -1708,7 +1709,8 @@ mod tests { coerce_int96_to_resolution(&descr, &arrow_schema, &TimeUnit::Microsecond) .unwrap(); - // Only the first field (c0) should be converted to a microsecond timestamp because + // Only the first field (c0) should be converted to a microsecond timestamp because it's the + // only timestamp that originated from an INT96. let expected_schema = Schema::new(vec![ Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None), true), Field::new( @@ -1774,13 +1776,61 @@ mod tests { coerce_int96_to_resolution(&descr, &arrow_schema, &TimeUnit::Microsecond) .unwrap(); - result.flattened_fields().iter().for_each(|field| { - if field.data_type().is_primitive() { - assert_eq!( - field.data_type(), - &DataType::Timestamp(TimeUnit::Microsecond, None) - ); - } - }) + let expected_schema = Schema::new(vec![ + Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None), true), + Field::new_struct( + "c1", + vec![Field::new( + "c0", + DataType::Timestamp(TimeUnit::Microsecond, None), + true, + )], + true, + ), + Field::new_struct( + "c2", + vec![Field::new_list( + "c0", + Field::new( + "element", + DataType::Timestamp(TimeUnit::Microsecond, None), + true, + ), + true, + )], + true, + ), + Field::new_list( + "c3", + Field::new( + "element", + DataType::Timestamp(TimeUnit::Microsecond, None), + true, + ), + true, + ), + Field::new_list( + "c4", + Field::new_struct( + "element", + vec![ + Field::new( + "c0", + DataType::Timestamp(TimeUnit::Microsecond, None), + true, + ), + Field::new( + "c1", + DataType::Timestamp(TimeUnit::Microsecond, None), + true, + ), + ], + true, + ), + true, + ), + ]); + + assert_eq!(result, expected_schema); } } From 9129c55e4fc98f98d0cc3c9c1609efe150e894fe Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 15 May 2025 10:00:45 -0400 Subject: [PATCH 08/25] Improve docs. --- .../datasource-parquet/src/file_format.rs | 58 ++++++++++++++----- 1 file changed, 43 insertions(+), 15 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 198ff2363ba7..1570f527e584 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -573,6 +573,7 @@ pub fn coerce_int96_to_resolution( .collect(); if !transform { + // The schema doesn't contain any int96 fields, so skip the remaining logic. return None; } @@ -586,7 +587,11 @@ pub fn coerce_int96_to_resolution( let fields = Rc::new(RefCell::new(Vec::with_capacity(file_schema.fields.len()))); + // TODO: It might be possible to only DFS into nested fields that we know contain an int96 if we + // keep track of the column index above and use that information below. That can be a future + // optimization for large schemas. let transformed_schema = { + // Populate the stack with our top-level fields. let mut stack: Vec = file_schema .fields() .iter() @@ -594,13 +599,16 @@ pub fn coerce_int96_to_resolution( .map(|f| (vec![f.name().as_str()], f, Rc::clone(&fields), None)) .collect(); + // Pop fields to DFS into until we have exhausted the stack. while let Some((parquet_path, field_ref, parent_fields, child_fields)) = stack.pop() { match field_ref.data_type() { DataType::Struct(ff) => { if let Some(child_fields) = child_fields { - // This is the second time popping off this struct. + // This is the second time popping off this struct. The child_fields vector + // now contains each field that has been DFS'd into, and we can construct + // the resulting struct with correct child types. let child_fields = child_fields.borrow(); assert_eq!(child_fields.len(), ff.len()); let updated_field = Field::new_struct( @@ -610,6 +618,10 @@ pub fn coerce_int96_to_resolution( ); parent_fields.borrow_mut().push(Arc::new(updated_field)); } else { + // This is the first time popping off this struct. We don't yet know the + // correct types of its children (i.e., if they need coercing) so we create + // a vector for child_fields, push the struct node back onto the stack to be + // processed again (see above) after all of its children. let child_fields = Rc::new(RefCell::new(Vec::with_capacity(ff.len()))); stack.push(( @@ -618,9 +630,12 @@ pub fn coerce_int96_to_resolution( parent_fields, Some(Rc::clone(&child_fields)), )); - // Need to zip these in reverse to maintain original order + // Push all of the children in reverse to maintain original schema order due + // to stack processing. for fff in ff.into_iter().rev() { let mut parquet_path = parquet_path.clone(); + // Build up a normalized path that we'll use as a key into the original + // parquet_fields map above to test if this originated as int96. parquet_path.push("."); parquet_path.push(fff.name()); stack.push(( @@ -634,7 +649,9 @@ pub fn coerce_int96_to_resolution( } DataType::List(ff) => { if let Some(child_fields) = child_fields { - // This is the second time popping off this list. + // This is the second time popping off this list. The child_fields vector + // now contains one field that has been DFS'd into, and we can construct + // the resulting list with correct child type. assert_eq!(child_fields.borrow().len(), 1); let updated_field = Field::new_list( field_ref.name(), @@ -643,8 +660,15 @@ pub fn coerce_int96_to_resolution( ); parent_fields.borrow_mut().push(Arc::new(updated_field)); } else { + // This is the first time popping off this list. We don't yet know the + // correct types of its child (i.e., if they need coercing) so we create + // a vector for child_fields, push the list node back onto the stack to be + // processed again (see above) after its child. let child_fields = Rc::new(RefCell::new(Vec::with_capacity(1))); let mut parquet_path = parquet_path.clone(); + // Spark uses a 3-tier definition for arrays/lists that result in a group + // named "list" that is not maintained when parsing to Arrow. We just push + // this name into the path. parquet_path.push(".list"); stack.push(( parquet_path.clone(), @@ -652,6 +676,8 @@ pub fn coerce_int96_to_resolution( parent_fields, Some(Rc::clone(&child_fields)), )); + // Build up a normalized path that we'll use as a key into the original + // parquet_fields map above to test if this originated as int96. let mut parquet_path = parquet_path.clone(); parquet_path.push("."); parquet_path.push(ff.name()); @@ -663,18 +689,20 @@ pub fn coerce_int96_to_resolution( )); } } - _ => match field_ref.data_type() { - DataType::Timestamp(TimeUnit::Nanosecond, None) - if parquet_fields.get(parquet_path.concat().as_str()) - == Some(&Type::INT96) => - { - parent_fields.borrow_mut().push(field_with_new_type( - field_ref, - DataType::Timestamp(*time_unit, None), - )); - } - _ => parent_fields.borrow_mut().push(Arc::clone(field_ref)), - }, + DataType::Timestamp(TimeUnit::Nanosecond, None) + if parquet_fields.get(parquet_path.concat().as_str()) + == Some(&Type::INT96) => + // We found a timestamp(nanos) and it originated as int96. Coerce it to the correct + // time_unit. + { + parent_fields.borrow_mut().push(field_with_new_type( + field_ref, + DataType::Timestamp(*time_unit, None), + )); + } + // Other types can be cloned as they are. + // TODO: Other nested types like map. + _ => parent_fields.borrow_mut().push(Arc::clone(field_ref)), } } assert_eq!(fields.borrow().len(), file_schema.fields.len()); From 6372f1fa7d747c3a7e1291afe6008ea52686823c Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 15 May 2025 10:25:08 -0400 Subject: [PATCH 09/25] Use a smaller HashSet instead of HashMap with every field in it. More docs. --- .../datasource-parquet/src/file_format.rs | 41 ++++++++++--------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 1570f527e584..32afa9dd688e 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -43,7 +43,7 @@ use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::stats::Precision; use datafusion_common::{ internal_datafusion_err, internal_err, not_impl_err, ColumnStatistics, - DataFusionError, GetExt, Result, DEFAULT_PARQUET_EXTENSION, + DataFusionError, GetExt, HashSet, Result, DEFAULT_PARQUET_EXTENSION, }; use datafusion_common::{HashMap, Statistics}; use datafusion_common_runtime::{JoinSet, SpawnedTask}; @@ -559,30 +559,26 @@ pub fn coerce_int96_to_resolution( file_schema: &Schema, time_unit: &TimeUnit, ) -> Option { - let mut transform = false; - let parquet_fields: HashMap<_, _> = parquet_schema + // Traverse the parquet_schema columns looking for int96 physical types. If encountered, insert + // the field's full path into a set. + let int96_fields: HashSet<_> = parquet_schema .columns() .iter() - .map(|f| { - let dt = f.physical_type(); - if dt.eq(&Type::INT96) { - transform = true; - } - (f.path().string(), dt) - }) + .filter(|f| f.physical_type().eq(&Type::INT96)) + .map(|f| f.path().string()) .collect(); - if !transform { + if int96_fields.is_empty() { // The schema doesn't contain any int96 fields, so skip the remaining logic. return None; } type NestedFields = Rc>>; type StackContext<'a> = ( - Vec<&'a str>, - &'a FieldRef, - NestedFields, - Option, + Vec<&'a str>, // The path to the field referenced below + &'a FieldRef, // The field currently being processed + NestedFields, // The parent's fields that this field will be possibly coerced and inserted into + Option, // Nested types need to create their own vector of fields for their children ); let fields = Rc::new(RefCell::new(Vec::with_capacity(file_schema.fields.len()))); @@ -624,6 +620,8 @@ pub fn coerce_int96_to_resolution( // processed again (see above) after all of its children. let child_fields = Rc::new(RefCell::new(Vec::with_capacity(ff.len()))); + // Note that here we push the struct back onto the stack with its + // parent_fields in the same position, now with Some(child_fields). stack.push(( parquet_path.clone(), field_ref, @@ -635,9 +633,11 @@ pub fn coerce_int96_to_resolution( for fff in ff.into_iter().rev() { let mut parquet_path = parquet_path.clone(); // Build up a normalized path that we'll use as a key into the original - // parquet_fields map above to test if this originated as int96. + // int96_fields set above to test if this originated as int96. parquet_path.push("."); parquet_path.push(fff.name()); + // Note that here we push the field onto the stack using the struct's + // new child_fields vector as the field's parent_fields. stack.push(( parquet_path, fff, @@ -670,6 +670,8 @@ pub fn coerce_int96_to_resolution( // named "list" that is not maintained when parsing to Arrow. We just push // this name into the path. parquet_path.push(".list"); + // Note that here we push the list back onto the stack with its + // parent_fields in the same position, now with Some(child_fields). stack.push(( parquet_path.clone(), field_ref, @@ -677,10 +679,12 @@ pub fn coerce_int96_to_resolution( Some(Rc::clone(&child_fields)), )); // Build up a normalized path that we'll use as a key into the original - // parquet_fields map above to test if this originated as int96. + // int96_fields set above to test if this originated as int96. let mut parquet_path = parquet_path.clone(); parquet_path.push("."); parquet_path.push(ff.name()); + // Note that here we push the field onto the stack using the list's + // new child_fields vector as the field's parent_fields. stack.push(( parquet_path.clone(), ff, @@ -690,8 +694,7 @@ pub fn coerce_int96_to_resolution( } } DataType::Timestamp(TimeUnit::Nanosecond, None) - if parquet_fields.get(parquet_path.concat().as_str()) - == Some(&Type::INT96) => + if int96_fields.contains(parquet_path.concat().as_str()) => // We found a timestamp(nanos) and it originated as int96. Coerce it to the correct // time_unit. { From ea38af55914195156541a8d9141ce8d08fcde0dc Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 15 May 2025 10:26:26 -0400 Subject: [PATCH 10/25] Use a smaller HashSet instead of HashMap with every field in it. More docs. --- datafusion/datasource-parquet/src/file_format.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 32afa9dd688e..b14a842461ce 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -573,6 +573,9 @@ pub fn coerce_int96_to_resolution( return None; } + // Do a DFS into the schema using a stack, looking for timestamp(nanos) fields that originated + // as int96 to coerce to the provided time_unit. + type NestedFields = Rc>>; type StackContext<'a> = ( Vec<&'a str>, // The path to the field referenced below From 3005a15406aa397d0010960541f69a281a7ca748 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 15 May 2025 10:33:37 -0400 Subject: [PATCH 11/25] More docs. --- datafusion/datasource-parquet/src/file_format.rs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index b14a842461ce..c849a1dc24d5 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -578,10 +578,15 @@ pub fn coerce_int96_to_resolution( type NestedFields = Rc>>; type StackContext<'a> = ( - Vec<&'a str>, // The path to the field referenced below - &'a FieldRef, // The field currently being processed - NestedFields, // The parent's fields that this field will be possibly coerced and inserted into - Option, // Nested types need to create their own vector of fields for their children + Vec<&'a str>, // The pull parquet path to the field currently being processed. + &'a FieldRef, // The field currently being processed. + NestedFields, // The parent's fields that this field will be (possibly) type-coerced and + // inserted into. All fields have a parent, so this is not an Option type. + Option, // Nested types need to create their own vector of fields for their + // children. For primitive types this will remain None. For nested types it is None the + // first time they are processed. Then, we instantiate a vector for its children, push the + // field back onto the stack to be processed again, and DFS into its children. The next time + // we process the field, we know we have DFS'd into the children because this field is Some. ); let fields = Rc::new(RefCell::new(Vec::with_capacity(file_schema.fields.len()))); From c50e737745ca4a5df4b1b43806988a4eb85ccd8d Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 15 May 2025 10:41:10 -0400 Subject: [PATCH 12/25] More docs. --- datafusion/datasource-parquet/src/file_format.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index c849a1dc24d5..e8f12306c75e 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -589,6 +589,8 @@ pub fn coerce_int96_to_resolution( // we process the field, we know we have DFS'd into the children because this field is Some. ); + // This is our top-level fields from which we will construct our schema. We pass this into our + // initial stack context as the parent fields, and the DFS populates it. let fields = Rc::new(RefCell::new(Vec::with_capacity(file_schema.fields.len()))); // TODO: It might be possible to only DFS into nested fields that we know contain an int96 if we From 034a776ce2283f668683b64f396b04b82e81c473 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 15 May 2025 10:46:05 -0400 Subject: [PATCH 13/25] Fix typo. --- datafusion/datasource-parquet/src/file_format.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index e8f12306c75e..802722a29cdd 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -578,15 +578,17 @@ pub fn coerce_int96_to_resolution( type NestedFields = Rc>>; type StackContext<'a> = ( - Vec<&'a str>, // The pull parquet path to the field currently being processed. + Vec<&'a str>, // The full parquet path to the field currently being processed. &'a FieldRef, // The field currently being processed. NestedFields, // The parent's fields that this field will be (possibly) type-coerced and // inserted into. All fields have a parent, so this is not an Option type. Option, // Nested types need to create their own vector of fields for their - // children. For primitive types this will remain None. For nested types it is None the - // first time they are processed. Then, we instantiate a vector for its children, push the - // field back onto the stack to be processed again, and DFS into its children. The next time - // we process the field, we know we have DFS'd into the children because this field is Some. + // children. For primitive types this will remain None. For nested + // types it is None the first time they are processed. Then, we + // instantiate a vector for its children, push the field back onto the + // stack to be processed again, and DFS into its children. The next + // time we process the field, we know we have DFS'd into the children + // because this field is Some. ); // This is our top-level fields from which we will construct our schema. We pass this into our From 1f96786fd9fcb48e9607eb1053d627e43607dab6 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 15 May 2025 12:49:05 -0400 Subject: [PATCH 14/25] Refactor match with nested if lets to make it more readable. --- .../datasource-parquet/src/file_format.rs | 171 +++++++++--------- 1 file changed, 82 insertions(+), 89 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index cfcf8f7c8b46..c5060c99b23d 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -564,7 +564,7 @@ pub fn coerce_int96_to_resolution( let int96_fields: HashSet<_> = parquet_schema .columns() .iter() - .filter(|f| f.physical_type().eq(&Type::INT96)) + .filter(|f| f.physical_type() == Type::INT96) .map(|f| f.path().string()) .collect(); @@ -611,101 +611,94 @@ pub fn coerce_int96_to_resolution( while let Some((parquet_path, field_ref, parent_fields, child_fields)) = stack.pop() { - match field_ref.data_type() { - DataType::Struct(ff) => { - if let Some(child_fields) = child_fields { - // This is the second time popping off this struct. The child_fields vector - // now contains each field that has been DFS'd into, and we can construct - // the resulting struct with correct child types. - let child_fields = child_fields.borrow(); - assert_eq!(child_fields.len(), ff.len()); - let updated_field = Field::new_struct( - field_ref.name(), - child_fields.as_slice(), - field_ref.is_nullable(), - ); - parent_fields.borrow_mut().push(Arc::new(updated_field)); - } else { - // This is the first time popping off this struct. We don't yet know the - // correct types of its children (i.e., if they need coercing) so we create - // a vector for child_fields, push the struct node back onto the stack to be - // processed again (see above) after all of its children. - let child_fields = - Rc::new(RefCell::new(Vec::with_capacity(ff.len()))); - // Note that here we push the struct back onto the stack with its - // parent_fields in the same position, now with Some(child_fields). - stack.push(( - parquet_path.clone(), - field_ref, - parent_fields, - Some(Rc::clone(&child_fields)), - )); - // Push all of the children in reverse to maintain original schema order due - // to stack processing. - for fff in ff.into_iter().rev() { - let mut parquet_path = parquet_path.clone(); - // Build up a normalized path that we'll use as a key into the original - // int96_fields set above to test if this originated as int96. - parquet_path.push("."); - parquet_path.push(fff.name()); - // Note that here we push the field onto the stack using the struct's - // new child_fields vector as the field's parent_fields. - stack.push(( - parquet_path, - fff, - Rc::clone(&child_fields), - None, - )); - } - } - } - DataType::List(ff) => { - if let Some(child_fields) = child_fields { - // This is the second time popping off this list. The child_fields vector - // now contains one field that has been DFS'd into, and we can construct - // the resulting list with correct child type. - assert_eq!(child_fields.borrow().len(), 1); - let updated_field = Field::new_list( - field_ref.name(), - Arc::clone(&child_fields.borrow()[0]), - field_ref.is_nullable(), - ); - parent_fields.borrow_mut().push(Arc::new(updated_field)); - } else { - // This is the first time popping off this list. We don't yet know the - // correct types of its child (i.e., if they need coercing) so we create - // a vector for child_fields, push the list node back onto the stack to be - // processed again (see above) after its child. - let child_fields = Rc::new(RefCell::new(Vec::with_capacity(1))); + match (field_ref.data_type(), child_fields) { + (DataType::Struct(ff), None) => { + // This is the first time popping off this struct. We don't yet know the + // correct types of its children (i.e., if they need coercing) so we create + // a vector for child_fields, push the struct node back onto the stack to be + // processed again (see below) after all of its children. + let child_fields = + Rc::new(RefCell::new(Vec::with_capacity(ff.len()))); + // Note that here we push the struct back onto the stack with its + // parent_fields in the same position, now with Some(child_fields). + stack.push(( + parquet_path.clone(), + field_ref, + parent_fields, + Some(Rc::clone(&child_fields)), + )); + // Push all of the children in reverse to maintain original schema order due + // to stack processing. + for fff in ff.into_iter().rev() { let mut parquet_path = parquet_path.clone(); - // Spark uses a 3-tier definition for arrays/lists that result in a group - // named "list" that is not maintained when parsing to Arrow. We just push - // this name into the path. - parquet_path.push(".list"); - // Note that here we push the list back onto the stack with its - // parent_fields in the same position, now with Some(child_fields). - stack.push(( - parquet_path.clone(), - field_ref, - parent_fields, - Some(Rc::clone(&child_fields)), - )); // Build up a normalized path that we'll use as a key into the original // int96_fields set above to test if this originated as int96. - let mut parquet_path = parquet_path.clone(); parquet_path.push("."); - parquet_path.push(ff.name()); - // Note that here we push the field onto the stack using the list's + parquet_path.push(fff.name()); + // Note that here we push the field onto the stack using the struct's // new child_fields vector as the field's parent_fields. - stack.push(( - parquet_path.clone(), - ff, - Rc::clone(&child_fields), - None, - )); + stack.push((parquet_path, fff, Rc::clone(&child_fields), None)); } } - DataType::Timestamp(TimeUnit::Nanosecond, None) + (DataType::Struct(ff), Some(child_fields)) => { + // This is the second time popping off this struct. The child_fields vector + // now contains each field that has been DFS'd into, and we can construct + // the resulting struct with correct child types. + let child_fields = child_fields.borrow(); + assert_eq!(child_fields.len(), ff.len()); + let updated_field = Field::new_struct( + field_ref.name(), + child_fields.as_slice(), + field_ref.is_nullable(), + ); + parent_fields.borrow_mut().push(Arc::new(updated_field)); + } + (DataType::List(ff), None) => { + // This is the first time popping off this list. We don't yet know the + // correct types of its child (i.e., if they need coercing) so we create + // a vector for child_fields, push the list node back onto the stack to be + // processed again (see below) after its child. + let child_fields = Rc::new(RefCell::new(Vec::with_capacity(1))); + let mut parquet_path = parquet_path.clone(); + // Spark uses a 3-tier definition for arrays/lists that result in a group + // named "list" that is not maintained when parsing to Arrow. We just push + // this name into the path. + parquet_path.push(".list"); + // Note that here we push the list back onto the stack with its + // parent_fields in the same position, now with Some(child_fields). + stack.push(( + parquet_path.clone(), + field_ref, + parent_fields, + Some(Rc::clone(&child_fields)), + )); + // Build up a normalized path that we'll use as a key into the original + // int96_fields set above to test if this originated as int96. + let mut parquet_path = parquet_path.clone(); + parquet_path.push("."); + parquet_path.push(ff.name()); + // Note that here we push the field onto the stack using the list's + // new child_fields vector as the field's parent_fields. + stack.push(( + parquet_path.clone(), + ff, + Rc::clone(&child_fields), + None, + )); + } + (DataType::List(_), Some(child_fields)) => { + // This is the second time popping off this list. The child_fields vector + // now contains one field that has been DFS'd into, and we can construct + // the resulting list with correct child type. + assert_eq!(child_fields.borrow().len(), 1); + let updated_field = Field::new_list( + field_ref.name(), + Arc::clone(&child_fields.borrow()[0]), + field_ref.is_nullable(), + ); + parent_fields.borrow_mut().push(Arc::new(updated_field)); + } + (DataType::Timestamp(TimeUnit::Nanosecond, None), None) if int96_fields.contains(parquet_path.concat().as_str()) => // We found a timestamp(nanos) and it originated as int96. Coerce it to the correct // time_unit. From 957ff634a1a92df86005eb0a468a49b388475106 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 15 May 2025 12:59:27 -0400 Subject: [PATCH 15/25] Address some PR feedback. --- datafusion/datasource-parquet/src/file_format.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index c5060c99b23d..df34d0edb23b 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -578,8 +578,8 @@ pub fn coerce_int96_to_resolution( type NestedFields = Rc>>; type StackContext<'a> = ( - Vec<&'a str>, // The full parquet path to the field currently being processed. - &'a FieldRef, // The field currently being processed. + Vec<&'a str>, // The Parquet column path (e.g., "c0.list.element.c1") for the current field. + &'a FieldRef, // The current field to be processed. NestedFields, // The parent's fields that this field will be (possibly) type-coerced and // inserted into. All fields have a parent, so this is not an Option type. Option, // Nested types need to create their own vector of fields for their @@ -596,8 +596,8 @@ pub fn coerce_int96_to_resolution( let fields = Rc::new(RefCell::new(Vec::with_capacity(file_schema.fields.len()))); // TODO: It might be possible to only DFS into nested fields that we know contain an int96 if we - // keep track of the column index above and use that information below. That can be a future - // optimization for large schemas. + // use some sort of LPM data structure to check if we're currently DFS'ing nested types that are + // in a column path that contains an int96. That can be a future optimization for large schemas. let transformed_schema = { // Populate the stack with our top-level fields. let mut stack: Vec = file_schema From 0e272f6874f6f74445bcc16fe177e511624d54fc Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 15 May 2025 13:08:26 -0400 Subject: [PATCH 16/25] Rename variables in struct processing to address PR feedback. Do List next. --- .../datasource-parquet/src/file_format.rs | 53 ++++++++++--------- 1 file changed, 27 insertions(+), 26 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index df34d0edb23b..9bc150075917 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -608,50 +608,51 @@ pub fn coerce_int96_to_resolution( .collect(); // Pop fields to DFS into until we have exhausted the stack. - while let Some((parquet_path, field_ref, parent_fields, child_fields)) = + while let Some((parquet_path, current_field, parent_fields, child_fields)) = stack.pop() { - match (field_ref.data_type(), child_fields) { - (DataType::Struct(ff), None) => { + match (current_field.data_type(), child_fields) { + (DataType::Struct(unprocessed_children), None) => { // This is the first time popping off this struct. We don't yet know the // correct types of its children (i.e., if they need coercing) so we create // a vector for child_fields, push the struct node back onto the stack to be // processed again (see below) after all of its children. - let child_fields = - Rc::new(RefCell::new(Vec::with_capacity(ff.len()))); + let child_fields = Rc::new(RefCell::new(Vec::with_capacity( + unprocessed_children.len(), + ))); // Note that here we push the struct back onto the stack with its // parent_fields in the same position, now with Some(child_fields). stack.push(( parquet_path.clone(), - field_ref, + current_field, parent_fields, Some(Rc::clone(&child_fields)), )); - // Push all of the children in reverse to maintain original schema order due - // to stack processing. - for fff in ff.into_iter().rev() { - let mut parquet_path = parquet_path.clone(); + // Push all the children in reverse to maintain original schema order due to + // stack processing. + for child in unprocessed_children.into_iter().rev() { + let mut child_path = parquet_path.clone(); // Build up a normalized path that we'll use as a key into the original // int96_fields set above to test if this originated as int96. - parquet_path.push("."); - parquet_path.push(fff.name()); + child_path.push("."); + child_path.push(child.name()); // Note that here we push the field onto the stack using the struct's // new child_fields vector as the field's parent_fields. - stack.push((parquet_path, fff, Rc::clone(&child_fields), None)); + stack.push((child_path, child, Rc::clone(&child_fields), None)); } } - (DataType::Struct(ff), Some(child_fields)) => { + (DataType::Struct(unprocessed_children), Some(processed_children)) => { // This is the second time popping off this struct. The child_fields vector // now contains each field that has been DFS'd into, and we can construct // the resulting struct with correct child types. - let child_fields = child_fields.borrow(); - assert_eq!(child_fields.len(), ff.len()); - let updated_field = Field::new_struct( - field_ref.name(), - child_fields.as_slice(), - field_ref.is_nullable(), + let processed_children = processed_children.borrow(); + assert_eq!(processed_children.len(), unprocessed_children.len()); + let processed_struct = Field::new_struct( + current_field.name(), + processed_children.as_slice(), + current_field.is_nullable(), ); - parent_fields.borrow_mut().push(Arc::new(updated_field)); + parent_fields.borrow_mut().push(Arc::new(processed_struct)); } (DataType::List(ff), None) => { // This is the first time popping off this list. We don't yet know the @@ -668,7 +669,7 @@ pub fn coerce_int96_to_resolution( // parent_fields in the same position, now with Some(child_fields). stack.push(( parquet_path.clone(), - field_ref, + current_field, parent_fields, Some(Rc::clone(&child_fields)), )); @@ -692,9 +693,9 @@ pub fn coerce_int96_to_resolution( // the resulting list with correct child type. assert_eq!(child_fields.borrow().len(), 1); let updated_field = Field::new_list( - field_ref.name(), + current_field.name(), Arc::clone(&child_fields.borrow()[0]), - field_ref.is_nullable(), + current_field.is_nullable(), ); parent_fields.borrow_mut().push(Arc::new(updated_field)); } @@ -704,13 +705,13 @@ pub fn coerce_int96_to_resolution( // time_unit. { parent_fields.borrow_mut().push(field_with_new_type( - field_ref, + current_field, DataType::Timestamp(*time_unit, None), )); } // Other types can be cloned as they are. // TODO: Other nested types like map. - _ => parent_fields.borrow_mut().push(Arc::clone(field_ref)), + _ => parent_fields.borrow_mut().push(Arc::clone(current_field)), } } assert_eq!(fields.borrow().len(), file_schema.fields.len()); From 5fbe4581665205d9e1c18c03e3fab0edc4617e2d Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 15 May 2025 13:15:05 -0400 Subject: [PATCH 17/25] Rename variables in list processing to address PR feedback. --- .../datasource-parquet/src/file_format.rs | 33 ++++++++++--------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 9bc150075917..1caabad120e4 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -616,7 +616,7 @@ pub fn coerce_int96_to_resolution( // This is the first time popping off this struct. We don't yet know the // correct types of its children (i.e., if they need coercing) so we create // a vector for child_fields, push the struct node back onto the stack to be - // processed again (see below) after all of its children. + // processed again (see below) after processing all its children. let child_fields = Rc::new(RefCell::new(Vec::with_capacity( unprocessed_children.len(), ))); @@ -654,17 +654,17 @@ pub fn coerce_int96_to_resolution( ); parent_fields.borrow_mut().push(Arc::new(processed_struct)); } - (DataType::List(ff), None) => { + (DataType::List(unprocessed_child), None) => { // This is the first time popping off this list. We don't yet know the // correct types of its child (i.e., if they need coercing) so we create // a vector for child_fields, push the list node back onto the stack to be - // processed again (see below) after its child. + // processed again (see below) after processing its child. let child_fields = Rc::new(RefCell::new(Vec::with_capacity(1))); - let mut parquet_path = parquet_path.clone(); + let mut list_path = parquet_path.clone(); // Spark uses a 3-tier definition for arrays/lists that result in a group // named "list" that is not maintained when parsing to Arrow. We just push // this name into the path. - parquet_path.push(".list"); + list_path.push(".list"); // Note that here we push the list back onto the stack with its // parent_fields in the same position, now with Some(child_fields). stack.push(( @@ -675,29 +675,30 @@ pub fn coerce_int96_to_resolution( )); // Build up a normalized path that we'll use as a key into the original // int96_fields set above to test if this originated as int96. - let mut parquet_path = parquet_path.clone(); - parquet_path.push("."); - parquet_path.push(ff.name()); + let mut child_path = list_path.clone(); + child_path.push("."); + child_path.push(unprocessed_child.name()); // Note that here we push the field onto the stack using the list's // new child_fields vector as the field's parent_fields. stack.push(( - parquet_path.clone(), - ff, + child_path.clone(), + unprocessed_child, Rc::clone(&child_fields), None, )); } - (DataType::List(_), Some(child_fields)) => { + (DataType::List(_), Some(processed_children)) => { // This is the second time popping off this list. The child_fields vector // now contains one field that has been DFS'd into, and we can construct // the resulting list with correct child type. - assert_eq!(child_fields.borrow().len(), 1); - let updated_field = Field::new_list( + let processed_children = processed_children.borrow(); + assert_eq!(processed_children.len(), 1); + let processed_list = Field::new_list( current_field.name(), - Arc::clone(&child_fields.borrow()[0]), + Arc::clone(&processed_children[0]), current_field.is_nullable(), ); - parent_fields.borrow_mut().push(Arc::new(updated_field)); + parent_fields.borrow_mut().push(Arc::new(processed_list)); } (DataType::Timestamp(TimeUnit::Nanosecond, None), None) if int96_fields.contains(parquet_path.concat().as_str()) => @@ -710,7 +711,7 @@ pub fn coerce_int96_to_resolution( )); } // Other types can be cloned as they are. - // TODO: Other nested types like map. + // TODO: Other nested types like Map. _ => parent_fields.borrow_mut().push(Arc::clone(current_field)), } } From 1ddb8c165e35432ff40bdcfd07be7bb5c57097d3 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 15 May 2025 13:55:01 -0400 Subject: [PATCH 18/25] Update docs. --- datafusion/datasource-parquet/src/file_format.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 1caabad120e4..a6d3307702b2 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -1775,6 +1775,10 @@ mod tests { #[test] fn coerce_int96_to_resolution_with_nested_types() { + // This schema originates from Comet's CometFuzzTestSuite ParquetGenerator only using int96 + // primitive types with generateStruct and generateArray set to true, with one additional + // field added to make sure all fields in a struct get modified. + // https://github.com/apache/datafusion-comet/blob/main/spark/src/main/scala/org/apache/comet/testing/ParquetGenerator.scala let spark_schema = " message spark_schema { optional int96 c0; From 10d378fafac9c9c0545f3965b207b468382f0939 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 15 May 2025 16:53:15 -0400 Subject: [PATCH 19/25] Simplify list parquet path generation. --- datafusion/datasource-parquet/src/file_format.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index a6d3307702b2..4b0260825b78 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -660,11 +660,6 @@ pub fn coerce_int96_to_resolution( // a vector for child_fields, push the list node back onto the stack to be // processed again (see below) after processing its child. let child_fields = Rc::new(RefCell::new(Vec::with_capacity(1))); - let mut list_path = parquet_path.clone(); - // Spark uses a 3-tier definition for arrays/lists that result in a group - // named "list" that is not maintained when parsing to Arrow. We just push - // this name into the path. - list_path.push(".list"); // Note that here we push the list back onto the stack with its // parent_fields in the same position, now with Some(child_fields). stack.push(( @@ -675,8 +670,11 @@ pub fn coerce_int96_to_resolution( )); // Build up a normalized path that we'll use as a key into the original // int96_fields set above to test if this originated as int96. - let mut child_path = list_path.clone(); - child_path.push("."); + let mut child_path = parquet_path.clone(); + // Spark uses a definition for arrays/lists that results in a group + // named "list" that is not maintained when parsing to Arrow. We just push + // this name into the path. + child_path.push(".list."); child_path.push(unprocessed_child.name()); // Note that here we push the field onto the stack using the list's // new child_fields vector as the field's parent_fields. From 247866d1255549bfecfe82911339ef1455260038 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 15 May 2025 17:22:26 -0400 Subject: [PATCH 20/25] Map support. --- .../datasource-parquet/src/file_format.rs | 99 ++++++++++++++++++- 1 file changed, 96 insertions(+), 3 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 4b0260825b78..043ade249cc6 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -698,6 +698,47 @@ pub fn coerce_int96_to_resolution( ); parent_fields.borrow_mut().push(Arc::new(processed_list)); } + (DataType::Map(unprocessed_child, _), None) => { + // This is the first time popping off this map. We don't yet know the + // correct types of its child (i.e., if they need coercing) so we create + // a vector for child_fields, push the map node back onto the stack to be + // processed again (see below) after processing its child. + let child_fields = Rc::new(RefCell::new(Vec::with_capacity(1))); + // Note that here we push the map back onto the stack with its + // parent_fields in the same position, now with Some(child_fields). + stack.push(( + parquet_path.clone(), + current_field, + parent_fields, + Some(Rc::clone(&child_fields)), + )); + // Build up a normalized path that we'll use as a key into the original + // int96_fields set above to test if this originated as int96. + let mut child_path = parquet_path.clone(); + child_path.push("."); + child_path.push(unprocessed_child.name()); + // Note that here we push the field onto the stack using the map's + // new child_fields vector as the field's parent_fields. + stack.push(( + child_path.clone(), + unprocessed_child, + Rc::clone(&child_fields), + None, + )); + } + (DataType::Map(_, sorted), Some(processed_children)) => { + // This is the second time popping off this map. The child_fields vector + // now contains one field that has been DFS'd into, and we can construct + // the resulting map with correct child type. + let processed_children = processed_children.borrow(); + assert_eq!(processed_children.len(), 1); + let processed_map = Field::new( + current_field.name(), + DataType::Map(Arc::clone(&processed_children[0]), *sorted), + current_field.is_nullable(), + ); + parent_fields.borrow_mut().push(Arc::new(processed_map)); + } (DataType::Timestamp(TimeUnit::Nanosecond, None), None) if int96_fields.contains(parquet_path.concat().as_str()) => // We found a timestamp(nanos) and it originated as int96. Coerce it to the correct @@ -1773,9 +1814,9 @@ mod tests { #[test] fn coerce_int96_to_resolution_with_nested_types() { - // This schema originates from Comet's CometFuzzTestSuite ParquetGenerator only using int96 - // primitive types with generateStruct and generateArray set to true, with one additional - // field added to make sure all fields in a struct get modified. + // This schema is derived from Comet's CometFuzzTestSuite ParquetGenerator only using int96 + // primitive types with generateStruct, generateArray, and generateMap set to true, with one + // additional field added to c4's struct to make sure all fields in a struct get modified. // https://github.com/apache/datafusion-comet/blob/main/spark/src/main/scala/org/apache/comet/testing/ParquetGenerator.scala let spark_schema = " message spark_schema { @@ -1803,6 +1844,22 @@ mod tests { } } } + optional group c5 (MAP) { + repeated group key_value { + required int96 key; + optional int96 value; + } + } + optional group c6 (LIST) { + repeated group list { + optional group element (MAP) { + repeated group key_value { + required int96 key; + optional int96 value; + } + } + } + } } "; @@ -1868,6 +1925,42 @@ mod tests { ), true, ), + Field::new_map( + "c5", + "key_value", + Field::new( + "key", + DataType::Timestamp(TimeUnit::Microsecond, None), + false, + ), + Field::new( + "value", + DataType::Timestamp(TimeUnit::Microsecond, None), + true, + ), + false, + true, + ), + Field::new_list( + "c6", + Field::new_map( + "element", + "key_value", + Field::new( + "key", + DataType::Timestamp(TimeUnit::Microsecond, None), + false, + ), + Field::new( + "value", + DataType::Timestamp(TimeUnit::Microsecond, None), + true, + ), + false, + true, + ), + true, + ), ]); assert_eq!(result, expected_schema); From 74019a54f6a63e164bbe245bf4b3d899330debc9 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 15 May 2025 17:30:07 -0400 Subject: [PATCH 21/25] Remove old TODO. --- datafusion/datasource-parquet/src/file_format.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 043ade249cc6..d03d1ca349b8 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -750,7 +750,6 @@ pub fn coerce_int96_to_resolution( )); } // Other types can be cloned as they are. - // TODO: Other nested types like Map. _ => parent_fields.borrow_mut().push(Arc::clone(current_field)), } } From 76777af8005a5e45318e5f53cb890f9a59952e1b Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Fri, 16 May 2025 12:15:44 -0400 Subject: [PATCH 22/25] Reduce redundant docs be referring to docs above. --- .../datasource-parquet/src/file_format.rs | 30 +++---------------- 1 file changed, 4 insertions(+), 26 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index d03d1ca349b8..6a6078ec700a 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -655,29 +655,20 @@ pub fn coerce_int96_to_resolution( parent_fields.borrow_mut().push(Arc::new(processed_struct)); } (DataType::List(unprocessed_child), None) => { - // This is the first time popping off this list. We don't yet know the - // correct types of its child (i.e., if they need coercing) so we create - // a vector for child_fields, push the list node back onto the stack to be - // processed again (see below) after processing its child. + // This is the first time popping off this list. See struct docs above. let child_fields = Rc::new(RefCell::new(Vec::with_capacity(1))); - // Note that here we push the list back onto the stack with its - // parent_fields in the same position, now with Some(child_fields). stack.push(( parquet_path.clone(), current_field, parent_fields, Some(Rc::clone(&child_fields)), )); - // Build up a normalized path that we'll use as a key into the original - // int96_fields set above to test if this originated as int96. let mut child_path = parquet_path.clone(); // Spark uses a definition for arrays/lists that results in a group // named "list" that is not maintained when parsing to Arrow. We just push // this name into the path. child_path.push(".list."); child_path.push(unprocessed_child.name()); - // Note that here we push the field onto the stack using the list's - // new child_fields vector as the field's parent_fields. stack.push(( child_path.clone(), unprocessed_child, @@ -686,9 +677,7 @@ pub fn coerce_int96_to_resolution( )); } (DataType::List(_), Some(processed_children)) => { - // This is the second time popping off this list. The child_fields vector - // now contains one field that has been DFS'd into, and we can construct - // the resulting list with correct child type. + // This is the second time popping off this list. See struct docs above. let processed_children = processed_children.borrow(); assert_eq!(processed_children.len(), 1); let processed_list = Field::new_list( @@ -699,26 +688,17 @@ pub fn coerce_int96_to_resolution( parent_fields.borrow_mut().push(Arc::new(processed_list)); } (DataType::Map(unprocessed_child, _), None) => { - // This is the first time popping off this map. We don't yet know the - // correct types of its child (i.e., if they need coercing) so we create - // a vector for child_fields, push the map node back onto the stack to be - // processed again (see below) after processing its child. + // This is the first time popping off this map. See struct docs above. let child_fields = Rc::new(RefCell::new(Vec::with_capacity(1))); - // Note that here we push the map back onto the stack with its - // parent_fields in the same position, now with Some(child_fields). stack.push(( parquet_path.clone(), current_field, parent_fields, Some(Rc::clone(&child_fields)), )); - // Build up a normalized path that we'll use as a key into the original - // int96_fields set above to test if this originated as int96. let mut child_path = parquet_path.clone(); child_path.push("."); child_path.push(unprocessed_child.name()); - // Note that here we push the field onto the stack using the map's - // new child_fields vector as the field's parent_fields. stack.push(( child_path.clone(), unprocessed_child, @@ -727,9 +707,7 @@ pub fn coerce_int96_to_resolution( )); } (DataType::Map(_, sorted), Some(processed_children)) => { - // This is the second time popping off this map. The child_fields vector - // now contains one field that has been DFS'd into, and we can construct - // the resulting map with correct child type. + // This is the second time popping off this map. See struct docs above. let processed_children = processed_children.borrow(); assert_eq!(processed_children.len(), 1); let processed_map = Field::new( From e8058589152f9276961aac8594bbc0c1ddf06bca Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Fri, 16 May 2025 12:21:13 -0400 Subject: [PATCH 23/25] Reduce redundant docs be referring to docs above. --- datafusion/datasource-parquet/src/file_format.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 6a6078ec700a..253bd8872dee 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -707,7 +707,7 @@ pub fn coerce_int96_to_resolution( )); } (DataType::Map(_, sorted), Some(processed_children)) => { - // This is the second time popping off this map. See struct docs above. + // This is the second time popping off this map. See struct docs above. let processed_children = processed_children.borrow(); assert_eq!(processed_children.len(), 1); let processed_map = Field::new( From 08ad98b3996ea479008af81548f810585dd0b8e8 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Fri, 16 May 2025 13:29:01 -0400 Subject: [PATCH 24/25] Add parquet file generated from CometFuzzTestSuite ParquetGenerator (similar to schema in file_format tests) to exercise end-to-end support. --- .../src/datasource/physical_plan/parquet.rs | 120 +++++++++++++++++- .../core/tests/data/int96_nested.parquet | Bin 0 -> 4004 bytes 2 files changed, 119 insertions(+), 1 deletion(-) create mode 100644 datafusion/core/tests/data/int96_nested.parquet diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index e4d5060e065c..23d849ede2e6 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -44,7 +44,7 @@ mod tests { use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaBuilder}; use arrow::record_batch::RecordBatch; use arrow::util::pretty::pretty_format_batches; - use arrow_schema::SchemaRef; + use arrow_schema::{SchemaRef, TimeUnit}; use bytes::{BufMut, BytesMut}; use datafusion_common::config::TableParquetOptions; use datafusion_common::test_util::{batches_to_sort_string, batches_to_string}; @@ -1229,6 +1229,124 @@ mod tests { Ok(()) } + #[tokio::test] + async fn parquet_exec_with_int96_nested() -> Result<()> { + // This test ensures that we maintain compatibility with coercing int96 to the desired + // resolution when they're within a nested type (e.g., struct, map, list). This file + // originates from a modified CometFuzzTestSuite ParquetGenerator to generate combinations + // of primitive and complex columns using int96. Other tests cover reading the data + // correctly with this coercion. Here we're only checking the coerced schema is correct. + let testdata = "../../datafusion/core/tests/data"; + let filename = "int96_nested.parquet"; + let session_ctx = SessionContext::new(); + let state = session_ctx.state(); + let task_ctx = state.task_ctx(); + + let parquet_exec = scan_format( + &state, + &ParquetFormat::default().with_coerce_int96(Some("us".to_string())), + None, + &testdata, + filename, + None, + None, + ) + .await + .unwrap(); + assert_eq!(parquet_exec.output_partitioning().partition_count(), 1); + + let mut results = parquet_exec.execute(0, task_ctx.clone())?; + let batch = results.next().await.unwrap()?; + + let expected_schema = Arc::new(Schema::new(vec![ + Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None), true), + Field::new_struct( + "c1", + vec![Field::new( + "c0", + DataType::Timestamp(TimeUnit::Microsecond, None), + true, + )], + true, + ), + Field::new_struct( + "c2", + vec![Field::new_list( + "c0", + Field::new( + "element", + DataType::Timestamp(TimeUnit::Microsecond, None), + true, + ), + true, + )], + true, + ), + Field::new_map( + "c3", + "key_value", + Field::new( + "key", + DataType::Timestamp(TimeUnit::Microsecond, None), + false, + ), + Field::new( + "value", + DataType::Timestamp(TimeUnit::Microsecond, None), + true, + ), + false, + true, + ), + Field::new_list( + "c4", + Field::new( + "element", + DataType::Timestamp(TimeUnit::Microsecond, None), + true, + ), + true, + ), + Field::new_list( + "c5", + Field::new_struct( + "element", + vec![Field::new( + "c0", + DataType::Timestamp(TimeUnit::Microsecond, None), + true, + )], + true, + ), + true, + ), + Field::new_list( + "c6", + Field::new_map( + "element", + "key_value", + Field::new( + "key", + DataType::Timestamp(TimeUnit::Microsecond, None), + false, + ), + Field::new( + "value", + DataType::Timestamp(TimeUnit::Microsecond, None), + true, + ), + false, + true, + ), + true, + ), + ])); + + assert_eq!(batch.schema(), expected_schema); + + Ok(()) + } + #[tokio::test] async fn parquet_exec_with_range() -> Result<()> { fn file_range(meta: &ObjectMeta, start: i64, end: i64) -> PartitionedFile { diff --git a/datafusion/core/tests/data/int96_nested.parquet b/datafusion/core/tests/data/int96_nested.parquet new file mode 100644 index 0000000000000000000000000000000000000000..708823ded6fafe9099dcc48a5738b4456f079665 GIT binary patch literal 4004 zcmcIn4^Wfm8GpVne+d5+F>m?>B8C{EV9fHLfLV<~EmpKt>Bgp>gycif@IO$gv>ka?mBL@*4o{ga;{R&9?q+*XFI2>rF5+;$JE-2ZqJ)9iBRGyz4k75 zJUn^d-}`%h&+mB-N*1p&5iuEI$QKNGuJbP!rA{IzVt_3@uD>UF?#=?3i&^O#GfENw9E`K$ z;g|L&Ee1%(dx0HmbZvP6N*W*EaQ?}q00xW+5H-<~_CsxFq$)~;NQn#}#mNGIoDoY> z%%#m;7z>>L#gLar`(KH24!E9v=)lhT09lmVGjA78yBENPvFrM#g?2lDli^gU!-IcU z0{j%oO)ES+3#)Qqc<6k&su+tx_Di2i8k!BTnBh9sl|Feq8z2KAf^T2hQIDvzX?(so zb4(79gK?Phje+4;6DCt;Cr}PDxg9{;S~48Wm)|5-9{7`sF;6!%*AbZjV-l=4;8 zFM6}$Ik~H;;KY{&@tivNb>;GvNFamJmeuaHy6~KuqCWOB+oB?X`2ue}(Ual8e}5y1 zD2h}S@{kEYBAqu;Rl&Sx_5}Z&7-bx2$-G$@hyJA4PvkB-nTXBD*ku^3yE+rV$|MzE zJ^gIwJph@QhU0x7twz~$&>!!-JooWHfDL0vA6<6?^K;P-Z>@bpjcEneu&w>k3=NjX z6qPCS-p#=YwNSvzjt7lcF^4gdEhI&x{iEI^j0#!PNkfq}Z6R~0-b}`#Ks5GMnn=We zI4;F(E-h{5H*7;qh1tV>5xjKp#pw~rdxC`*`{rW{=OVgmmx}p^6=#T2WRh;wF9Vo~ zabVx$^7ovSnAUHrPQ09gtAnOLZk^2(3a$;V-0oXl3=*`8d`(FozNcY~Wk>hl--}ic z4PX0;kBwL|#l|X%NHMwG{oWv2#TuzXyAx%D_%F<_i?EX*Q7_x0UDhraV;!kxt);Yb zTT53L&L6=RoMy=0OWP%p4wOLOgZrDY>6EU^&*oEMZdX$IC z7%Xo4_i9eq5P|i@J>{r+I>V(rJ^x}66R%wx$msrTH~NCYeZGsZw;aWEF}4 z7`92G;juVU8O|O`AioTUUCGgKbUJAcXK$sFN5kO=AzQ-X5GRC$RgNi(k{avW4V%|D zdN%UaZZZvsG>hd}j}dJ2hl#M?%9t?|`pQUHA}vOXf=$eceN~PA$k{mF$5-<;e&L)2 z&#}d*G(8u;nQvL&?DjSBIK}uBa3ahIPe?*};ga<(!ELe5A-xEN2&HI2_V2tfC9(>+ zQl)%e*NtmTsD}hug&~G>TKKqL0-bi__uU+hs9$Y)_*+ z4&iVKNDkIxx!oq(o=`bgSj8n4kLkMaP|xo|I9idC_Sulgy^o%j177{1Ukgs1!z>f94whuJ;dpT6|rG=rDHo4 zqe_KbCsPb6@oz7sp5tm8HW=J>ZX9w$XxInnr&^lgF>oYbvXFpSMwyzhN-r-H2Hk)3LhT9yS!c#fz{-1ZEe-xzHW+M|IuDs*LwGK%-@$zl)by5 z!QFx=;pJE=C^%jkPi>9gT~*V#5+xGk`7VM}slYP1f@(MI2u^qmGNEOBr+mSbnU3qM zbuy73Lz{6g+V1kjXs>=~U(-HKy}RQouurvuAGWnq=f18LxCg@PV?lL;I=pYxCWFah zFsbKNG*$V$YHx+r Date: Fri, 16 May 2025 14:00:45 -0400 Subject: [PATCH 25/25] Fix clippy. --- datafusion/core/src/datasource/physical_plan/parquet.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 23d849ede2e6..0da230682b19 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -1246,7 +1246,7 @@ mod tests { &state, &ParquetFormat::default().with_coerce_int96(Some("us".to_string())), None, - &testdata, + testdata, filename, None, None,