From 7202ae6c9b4830acd334c588e955de118d61e3c0 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Wed, 1 Apr 2026 17:17:34 +0800 Subject: [PATCH] Relax OneOf schema comparison to ignore nullability differences Different table types (FileScanTable, LiveTable, MV rollups) may expose different nullability for the same column. For example: - A partition column in one table is non-nullable - The same column in a rollup MV is a file column, forced nullable for DF 52 RecordBatch validation compatibility Previously, OneOf rejected candidates with any schema difference including nullability. Now it only checks field names and data types, allowing nullable/non-nullable variants of the same column. --- src/rewrite/exploitation.rs | 77 ++++++++++++++++++++++++++++++++++++- 1 file changed, 75 insertions(+), 2 deletions(-) diff --git a/src/rewrite/exploitation.rs b/src/rewrite/exploitation.rs index 8bc2cd8..159d84b 100644 --- a/src/rewrite/exploitation.rs +++ b/src/rewrite/exploitation.rs @@ -245,10 +245,21 @@ impl ExtensionPlanner for ViewExploitationPlanner { return Ok(None); } + // Compare schemas ignoring nullability differences. + // Different table types (FileScanTable, LiveTable, MV) may expose + // different nullability for the same column. For example, a partition + // column in one table is non-nullable, but the same column is a file + // column in a rollup MV (forced nullable for DF 52 RecordBatch + // validation compatibility). Field names and data types must match. if logical_inputs .iter() .map(|plan| plan.schema()) - .any(|schema| schema != logical_inputs[0].schema()) + .any(|schema| { + !schemas_equal_ignoring_nullability( + schema.as_arrow(), + logical_inputs[0].schema().as_arrow(), + ) + }) { return Err(DataFusionError::Plan( "candidate logical plans should have the same schema".to_string(), @@ -258,7 +269,9 @@ impl ExtensionPlanner for ViewExploitationPlanner { if physical_inputs .iter() .map(|plan| plan.schema()) - .any(|schema| schema != physical_inputs[0].schema()) + .any(|schema| { + !schemas_equal_ignoring_nullability(&schema, &physical_inputs[0].schema()) + }) { return Err(DataFusionError::Plan( "candidate physical plans should have the same schema".to_string(), @@ -522,3 +535,63 @@ impl PhysicalOptimizerRule for PruneCandidates { true } } + +/// Compare two Arrow schemas ignoring field nullability. +/// +/// Returns true if field count, field names, and data types all match. +/// Nullability differences are ignored because different table types +/// (e.g. FileScanTable with forced-nullable file columns vs a table +/// where the same column is a non-nullable partition column) can +/// legitimately differ in nullability while being semantically equivalent. +fn schemas_equal_ignoring_nullability(a: &arrow_schema::Schema, b: &arrow_schema::Schema) -> bool { + a.fields().len() == b.fields().len() + && a.fields() + .iter() + .zip(b.fields().iter()) + .all(|(f1, f2)| f1.name() == f2.name() && f1.data_type() == f2.data_type()) +} + +#[cfg(test)] +mod tests_nullability { + use super::*; + use arrow_schema::{DataType, Field, Schema}; + + #[test] + fn schemas_equal_when_only_nullability_differs() { + let a = Schema::new(vec![ + Field::new("ticker", DataType::Utf8, false), + Field::new("date", DataType::Utf8, false), + Field::new("price", DataType::Float64, false), + ]); + let b = Schema::new(vec![ + Field::new("ticker", DataType::Utf8, true), + Field::new("date", DataType::Utf8, true), + Field::new("price", DataType::Float64, true), + ]); + assert!(schemas_equal_ignoring_nullability(&a, &b)); + } + + #[test] + fn schemas_not_equal_when_types_differ() { + let a = Schema::new(vec![Field::new("x", DataType::Int32, false)]); + let b = Schema::new(vec![Field::new("x", DataType::Int64, false)]); + assert!(!schemas_equal_ignoring_nullability(&a, &b)); + } + + #[test] + fn schemas_not_equal_when_names_differ() { + let a = Schema::new(vec![Field::new("ticker", DataType::Utf8, true)]); + let b = Schema::new(vec![Field::new("symbol", DataType::Utf8, true)]); + assert!(!schemas_equal_ignoring_nullability(&a, &b)); + } + + #[test] + fn schemas_not_equal_when_field_count_differs() { + let a = Schema::new(vec![ + Field::new("x", DataType::Int32, false), + Field::new("y", DataType::Int32, false), + ]); + let b = Schema::new(vec![Field::new("x", DataType::Int32, false)]); + assert!(!schemas_equal_ignoring_nullability(&a, &b)); + } +}