Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 56 additions & 1 deletion datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,11 @@ impl OptimizerRule for PushDownFilter {
let group_expr_columns = agg
.group_expr
.iter()
.map(|e| Ok(Column::from_qualified_name(e.schema_name().to_string())))
.map(|e| {
Ok(Column::from_qualified_name_ignore_case(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to study this more carefully, but this change to ignore case seems inconsistent with the rest of this file and also potentially incorrect

For example:

  • In for dialects where case is not important, universally ignoring the case could push down the wrong fields
  • For dialects where case is important, there seem to be many other places in this module that do not compare the case 🤔

So it seems:

  1. We should be checking the case sensitivity config setting before comparing
  2. Are there other places that should be changed to take name into account 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb thanks for taking a look. I agree the fix looks fishy, and would love to better understand the mechanics of case comparison. We can keep the tests and come up with a better fix that addresses your concerns.

It boils down to the ignore_case flag which is not very intuitive, as setting to true ends up keeping the UPPERCASE, while setting to false will lowercase the column name from the gby expression:
https://github.com/apache/datafusion/blob/main/datafusion/common/src/utils/mod.rs#L298-L299

The problem is introduced as we're roundtripping the correctly parsed column name

Column { relation: Some(Bare { table: "test" }), name: "A" }

through

Column::from_qualified_name(e.schema_name().to_string())

Which will lowercase it, as now there are no more double quotes around the field name. That's why I switched to Column::from_qualified_name_ignore_case, to avoid lowercasing as we're already starting from parsed/cased names.

Very open to suggestions

e.schema_name().to_string(),
))
})
.collect::<Result<HashSet<_>>>()?;

let predicates = split_conjunction_owned(filter.predicate);
Expand Down Expand Up @@ -4160,4 +4164,55 @@ mod tests {
"
)
}

/// Create a test table scan with uppercase column names for case sensitivity testing
fn test_table_scan_with_uppercase_columns() -> Result<LogicalPlan> {
let schema = Schema::new(vec![
Field::new("a", DataType::UInt32, false),
Field::new("A", DataType::UInt32, false),
Field::new("B", DataType::UInt32, false),
Field::new("C", DataType::UInt32, false),
]);
table_scan(Some("test"), &schema, None)?.build()
}

#[test]
fn filter_agg_case_insensitive() -> Result<()> {
let table_scan = test_table_scan_with_uppercase_columns()?;
Copy link
Member

@xudong963 xudong963 Jun 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the table also has a column named 'a', what'll happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question, just tried this and it works as expected for both uppercase and lower case col, even if both are present in the schema at the same time. I added another test, lmk if we should keep it or it's overkill.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also happy to see a related sqllogictest

let plan = LogicalPlanBuilder::from(table_scan)
.aggregate(
vec![col(r#""A""#)],
vec![sum(col(r#""B""#)).alias("total_salary")],
)?
.filter(col(r#""A""#).gt(lit(10i64)))?
.build()?;

assert_optimized_plan_equal!(
plan,
@r"
Aggregate: groupBy=[[test.A]], aggr=[[sum(test.B) AS total_salary]]
TableScan: test, full_filters=[test.A > Int64(10)]
"
)
}

#[test]
fn filter_agg_mix_case_insensitive() -> Result<()> {
let table_scan = test_table_scan_with_uppercase_columns()?;
let plan = LogicalPlanBuilder::from(table_scan)
.aggregate(
vec![col("a")],
vec![sum(col(r#""B""#)).alias("total_salary")],
)?
.filter(col("a").gt(lit(10i64)))?
.build()?;

assert_optimized_plan_equal!(
plan,
@r"
Aggregate: groupBy=[[test.a]], aggr=[[sum(test.B) AS total_salary]]
TableScan: test, full_filters=[test.a > Int64(10)]
"
)
}
}
46 changes: 46 additions & 0 deletions datafusion/sqllogictest/test_files/push_down_filter.slt
Original file line number Diff line number Diff line change
Expand Up @@ -288,3 +288,49 @@ physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/

statement ok
drop table t;

statement ok
create table test_uppercase_cols (a int, "A" int, "B" int, "C" int);

# test push down through aggregate for uppercase column name
query TT
explain
select "A", total_salary
from (
select "A", sum("B") as total_salary from test_uppercase_cols group by "A"
)
where "A" > 10;
----
physical_plan
01)ProjectionExec: expr=[A@0 as A, sum(test_uppercase_cols.B)@1 as total_salary]
02)--AggregateExec: mode=FinalPartitioned, gby=[A@0 as A], aggr=[sum(test_uppercase_cols.B)]
03)----CoalesceBatchesExec: target_batch_size=8192
04)------RepartitionExec: partitioning=Hash([A@0], 4), input_partitions=4
05)--------AggregateExec: mode=Partial, gby=[A@0 as A], aggr=[sum(test_uppercase_cols.B)]
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
07)------------CoalesceBatchesExec: target_batch_size=8192
08)--------------FilterExec: A@0 > 10
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the key fix, seeing the FilterExec pushed right on top of the datasource exec.
Without the fix, the plan would look like this (with the filter executed AFTER the aggregation):

+   02)--CoalesceBatchesExec: target_batch_size=8192
+   03)----FilterExec: A@0 > 10
+   04)------AggregateExec: mode=SinglePartitioned, gby=[A@0 as A], aggr=[sum(test_uppercase_cols.B)]
+   05)--------DataSourceExec: partitions=1, partition_sizes=[0]

09)----------------DataSourceExec: partitions=1, partition_sizes=[0]

# test push down through aggregate for mix of lowercase and uppercase column names
query TT
explain
select a, total_salary
from (
select a, sum("B") as total_salary from test_uppercase_cols group by a
)
where a > 10;
----
physical_plan
01)ProjectionExec: expr=[a@0 as a, sum(test_uppercase_cols.B)@1 as total_salary]
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[sum(test_uppercase_cols.B)]
03)----CoalesceBatchesExec: target_batch_size=8192
04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[sum(test_uppercase_cols.B)]
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
07)------------CoalesceBatchesExec: target_batch_size=8192
08)--------------FilterExec: a@0 > 10
09)----------------DataSourceExec: partitions=1, partition_sizes=[0]

statement ok
drop table test_uppercase_cols;