diff --git a/src/rewrite/exploitation.rs b/src/rewrite/exploitation.rs index 8bc2cd8..159d84b 100644 --- a/src/rewrite/exploitation.rs +++ b/src/rewrite/exploitation.rs @@ -245,10 +245,21 @@ impl ExtensionPlanner for ViewExploitationPlanner { return Ok(None); } + // Compare schemas ignoring nullability differences. + // Different table types (FileScanTable, LiveTable, MV) may expose + // different nullability for the same column. For example, a partition + // column in one table is non-nullable, but the same column is a file + // column in a rollup MV (forced nullable for DF 52 RecordBatch + // validation compatibility). Field names and data types must match. if logical_inputs .iter() .map(|plan| plan.schema()) - .any(|schema| schema != logical_inputs[0].schema()) + .any(|schema| { + !schemas_equal_ignoring_nullability( + schema.as_arrow(), + logical_inputs[0].schema().as_arrow(), + ) + }) { return Err(DataFusionError::Plan( "candidate logical plans should have the same schema".to_string(), @@ -258,7 +269,9 @@ impl ExtensionPlanner for ViewExploitationPlanner { if physical_inputs .iter() .map(|plan| plan.schema()) - .any(|schema| schema != physical_inputs[0].schema()) + .any(|schema| { + !schemas_equal_ignoring_nullability(&schema, &physical_inputs[0].schema()) + }) { return Err(DataFusionError::Plan( "candidate physical plans should have the same schema".to_string(), @@ -522,3 +535,63 @@ impl PhysicalOptimizerRule for PruneCandidates { true } } + +/// Compare two Arrow schemas ignoring field nullability. +/// +/// Returns true if field count, field names, and data types all match. +/// Nullability differences are ignored because different table types +/// (e.g. FileScanTable with forced-nullable file columns vs a table +/// where the same column is a non-nullable partition column) can +/// legitimately differ in nullability while being semantically equivalent. +fn schemas_equal_ignoring_nullability(a: &arrow_schema::Schema, b: &arrow_schema::Schema) -> bool { + a.fields().len() == b.fields().len() + && a.fields() + .iter() + .zip(b.fields().iter()) + .all(|(f1, f2)| f1.name() == f2.name() && f1.data_type() == f2.data_type()) +} + +#[cfg(test)] +mod tests_nullability { + use super::*; + use arrow_schema::{DataType, Field, Schema}; + + #[test] + fn schemas_equal_when_only_nullability_differs() { + let a = Schema::new(vec![ + Field::new("ticker", DataType::Utf8, false), + Field::new("date", DataType::Utf8, false), + Field::new("price", DataType::Float64, false), + ]); + let b = Schema::new(vec![ + Field::new("ticker", DataType::Utf8, true), + Field::new("date", DataType::Utf8, true), + Field::new("price", DataType::Float64, true), + ]); + assert!(schemas_equal_ignoring_nullability(&a, &b)); + } + + #[test] + fn schemas_not_equal_when_types_differ() { + let a = Schema::new(vec![Field::new("x", DataType::Int32, false)]); + let b = Schema::new(vec![Field::new("x", DataType::Int64, false)]); + assert!(!schemas_equal_ignoring_nullability(&a, &b)); + } + + #[test] + fn schemas_not_equal_when_names_differ() { + let a = Schema::new(vec![Field::new("ticker", DataType::Utf8, true)]); + let b = Schema::new(vec![Field::new("symbol", DataType::Utf8, true)]); + assert!(!schemas_equal_ignoring_nullability(&a, &b)); + } + + #[test] + fn schemas_not_equal_when_field_count_differs() { + let a = Schema::new(vec![ + Field::new("x", DataType::Int32, false), + Field::new("y", DataType::Int32, false), + ]); + let b = Schema::new(vec![Field::new("x", DataType::Int32, false)]); + assert!(!schemas_equal_ignoring_nullability(&a, &b)); + } +}