Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 75 additions & 2 deletions src/rewrite/exploitation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,21 @@ impl ExtensionPlanner for ViewExploitationPlanner {
return Ok(None);
}

// Compare schemas ignoring nullability differences.
// Different table types (FileScanTable, LiveTable, MV) may expose
// different nullability for the same column. For example, a partition
// column in one table is non-nullable, but the same column is a file
// column in a rollup MV (forced nullable for DF 52 RecordBatch
// validation compatibility). Field names and data types must match.
if logical_inputs
.iter()
.map(|plan| plan.schema())
.any(|schema| schema != logical_inputs[0].schema())
.any(|schema| {
!schemas_equal_ignoring_nullability(
Comment on lines 254 to +258
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: logical_inputs[0].schema().as_arrow() is recomputed inside the .any(...) closure for every input. Consider binding the baseline schema once before the iterator (and similarly for the physical schema check) to reduce repeated work and make the comparison a bit easier to read.

Copilot uses AI. Check for mistakes.
schema.as_arrow(),
logical_inputs[0].schema().as_arrow(),
)
Comment on lines +257 to +261
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By relaxing the logical schema compatibility check to ignore nullability, OneOf can now contain branches whose DFSchema differs from branches[0].schema() (which is what OneOf::schema() reports). This makes the OneOf node’s output schema potentially stricter (non-nullable) than some candidate branches, which can violate DataFusion’s expectation that a logical node’s schema matches all its inputs and can enable incorrect optimizations based on non-nullability. Consider normalizing the OneOf schema to a merged/most-permissive schema (nullable if any branch is nullable) or otherwise ensuring the OneOf-reported schema is compatible with every branch when nullability differs.

Copilot uses AI. Check for mistakes.
})
{
return Err(DataFusionError::Plan(
"candidate logical plans should have the same schema".to_string(),
Expand All @@ -258,7 +269,9 @@ impl ExtensionPlanner for ViewExploitationPlanner {
if physical_inputs
.iter()
.map(|plan| plan.schema())
.any(|schema| schema != physical_inputs[0].schema())
.any(|schema| {
!schemas_equal_ignoring_nullability(&schema, &physical_inputs[0].schema())
})
{
return Err(DataFusionError::Plan(
"candidate physical plans should have the same schema".to_string(),
Expand Down Expand Up @@ -522,3 +535,63 @@ impl PhysicalOptimizerRule for PruneCandidates {
true
}
}

/// Compare two Arrow schemas ignoring field nullability.
///
/// Returns true if field count, field names, and data types all match.
/// Nullability differences are ignored because different table types
/// (e.g. FileScanTable with forced-nullable file columns vs a table
/// where the same column is a non-nullable partition column) can
/// legitimately differ in nullability while being semantically equivalent.
fn schemas_equal_ignoring_nullability(a: &arrow_schema::Schema, b: &arrow_schema::Schema) -> bool {
a.fields().len() == b.fields().len()
&& a.fields()
.iter()
.zip(b.fields().iter())
.all(|(f1, f2)| f1.name() == f2.name() && f1.data_type() == f2.data_type())
}

#[cfg(test)]
mod tests_nullability {
use super::*;
use arrow_schema::{DataType, Field, Schema};

#[test]
fn schemas_equal_when_only_nullability_differs() {
let a = Schema::new(vec![
Field::new("ticker", DataType::Utf8, false),
Field::new("date", DataType::Utf8, false),
Field::new("price", DataType::Float64, false),
]);
let b = Schema::new(vec![
Field::new("ticker", DataType::Utf8, true),
Field::new("date", DataType::Utf8, true),
Field::new("price", DataType::Float64, true),
]);
assert!(schemas_equal_ignoring_nullability(&a, &b));
}

#[test]
fn schemas_not_equal_when_types_differ() {
let a = Schema::new(vec![Field::new("x", DataType::Int32, false)]);
let b = Schema::new(vec![Field::new("x", DataType::Int64, false)]);
assert!(!schemas_equal_ignoring_nullability(&a, &b));
}

#[test]
fn schemas_not_equal_when_names_differ() {
let a = Schema::new(vec![Field::new("ticker", DataType::Utf8, true)]);
let b = Schema::new(vec![Field::new("symbol", DataType::Utf8, true)]);
assert!(!schemas_equal_ignoring_nullability(&a, &b));
}

#[test]
fn schemas_not_equal_when_field_count_differs() {
let a = Schema::new(vec![
Field::new("x", DataType::Int32, false),
Field::new("y", DataType::Int32, false),
]);
let b = Schema::new(vec![Field::new("x", DataType::Int32, false)]);
assert!(!schemas_equal_ignoring_nullability(&a, &b));
}
}
Loading