Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
707e1a6
feat: wrong version which did weird stuff
discord9 Dec 19, 2025
c3df555
feat: gather filter support alias
discord9 Dec 25, 2025
3c68a63
feat: add support for detecting unknown columns in filter pushdown & …
discord9 Dec 25, 2025
337e32a
feat: update projection alias handling and enhance PhysicalColumnRewr…
discord9 Dec 25, 2025
86577a8
feat: update deeply nested expression helper function and enhance tes…
discord9 Dec 25, 2025
b64c871
chore: clippy
discord9 Dec 25, 2025
aa0f088
typo
discord9 Dec 25, 2025
39373e1
feat: update test assertions for filter pushdown to reflect expected …
discord9 Dec 25, 2025
5b1b9d9
c
discord9 Dec 25, 2025
c06c51b
c
discord9 Dec 25, 2025
667dc31
clippy
discord9 Dec 26, 2025
70460e6
test: update sqllogic test result
discord9 Dec 26, 2025
e3198d1
test: more complex dyn filter
discord9 Dec 26, 2025
36faeaf
c
discord9 Dec 26, 2025
1e29b36
refactor: rename function have_unknown_columns to has_unknown_columns…
discord9 Jan 4, 2026
8ca9e2d
test: topk with projection
discord9 Jan 5, 2026
c7e8ef7
test: slt test for projection dyn filter
discord9 Jan 5, 2026
f77763b
chore
discord9 Jan 5, 2026
05e0d78
test: ignore time
discord9 Jan 5, 2026
127f5cb
chore: fmt
discord9 Jan 5, 2026
cb04bc5
test: more slt test
discord9 Jan 5, 2026
aca4d8b
test: fix
discord9 Jan 7, 2026
a3208dc
test: more ignore
discord9 Jan 7, 2026
11cf32d
test: more ignore&proper sql
discord9 Jan 7, 2026
accad54
feat: unmap column not pushdown
discord9 Jan 9, 2026
697098e
clippy
discord9 Jan 9, 2026
6c69aec
chore
discord9 Jan 9, 2026
8adee95
test: add pushdown assert
discord9 Jan 9, 2026
01c783c
refactor: ref column map
discord9 Jan 9, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ env_logger = { workspace = true }
glob = { workspace = true }
insta = { workspace = true }
paste = { workspace = true }
pretty_assertions = "1.0"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears to already be used elsewhere (this is not a net new depednecy), so I think it is ok to add

rand = { workspace = true, features = ["small_rng"] }
rand_distr = "0.5"
regex = { workspace = true }
Expand Down
229 changes: 229 additions & 0 deletions datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use datafusion_physical_plan::{
coalesce_partitions::CoalescePartitionsExec,
collect,
filter::FilterExec,
projection::ProjectionExec,
repartition::RepartitionExec,
sorts::sort::SortExec,
};
Expand Down Expand Up @@ -1815,6 +1816,234 @@ fn schema() -> SchemaRef {
Arc::clone(&TEST_SCHEMA)
}

struct ProjectionDynFilterTestCase {
schema: SchemaRef,
batches: Vec<RecordBatch>,
projection: Vec<(Arc<dyn PhysicalExpr>, String)>,
sort_expr: PhysicalSortExpr,
expected_plans: Vec<String>,
}

async fn run_projection_dyn_filter_case(case: ProjectionDynFilterTestCase) {
let ProjectionDynFilterTestCase {
schema,
batches,
projection,
sort_expr,
expected_plans,
} = case;

let scan = TestScanBuilder::new(Arc::clone(&schema))
.with_support(true)
.with_batches(batches)
.build();

let projection_exec = Arc::new(ProjectionExec::try_new(projection, scan).unwrap());

let sort = Arc::new(
SortExec::new(LexOrdering::new(vec![sort_expr]).unwrap(), projection_exec)
.with_fetch(Some(2)),
) as Arc<dyn ExecutionPlan>;

let mut config = ConfigOptions::default();
config.execution.parquet.pushdown_filters = true;
config.optimizer.enable_dynamic_filter_pushdown = true;

let optimized_plan = FilterPushdown::new_post_optimization()
.optimize(Arc::clone(&sort), &config)
.unwrap();

pretty_assertions::assert_eq!(
format_plan_for_test(&optimized_plan).trim(),
expected_plans[0].trim()
);

let config = SessionConfig::new().with_batch_size(2);
let session_ctx = SessionContext::new_with_config(config);
session_ctx.register_object_store(
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
Arc::new(InMemory::new()),
);
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let mut stream = optimized_plan.execute(0, Arc::clone(&task_ctx)).unwrap();
for (idx, expected_plan) in expected_plans.iter().enumerate().skip(1) {
stream.next().await.unwrap().unwrap();
let formatted_plan = format_plan_for_test(&optimized_plan);
pretty_assertions::assert_eq!(
formatted_plan.trim(),
expected_plan.trim(),
"Mismatch at iteration {}",
idx
);
}
}

#[tokio::test]
async fn test_topk_with_projection_transformation_on_dyn_filter() {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Utf8, false),
Field::new("c", DataType::Float64, false),
]));
let simple_abc = vec![
record_batch!(
("a", Int32, [1, 2, 3]),
("b", Utf8, ["x", "y", "z"]),
("c", Float64, [1.0, 2.0, 3.0])
)
.unwrap(),
];

// Case 1: Reordering [b, a]
run_projection_dyn_filter_case(ProjectionDynFilterTestCase {
schema: Arc::clone(&schema),
batches: simple_abc.clone(),
projection: vec![
(col("b", &schema).unwrap(), "b".to_string()),
(col("a", &schema).unwrap(), "a".to_string()),
],
sort_expr: PhysicalSortExpr::new(
Arc::new(Column::new("a", 1)),
SortOptions::default(),
),
expected_plans: vec![
r#" - SortExec: TopK(fetch=2), expr=[a@1 ASC], preserve_partitioning=[false]
- ProjectionExec: expr=[b@1 as b, a@0 as a]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]"#.to_string(),
r#" - SortExec: TopK(fetch=2), expr=[a@1 ASC], preserve_partitioning=[false], filter=[a@1 IS NULL OR a@1 < 2]
- ProjectionExec: expr=[b@1 as b, a@0 as a]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 IS NULL OR a@0 < 2 ]"#.to_string()]
})
.await;

// Case 2: Pruning [a]
run_projection_dyn_filter_case(ProjectionDynFilterTestCase {
schema: Arc::clone(&schema),
batches: simple_abc.clone(),
projection: vec![(col("a", &schema).unwrap(), "a".to_string())],
sort_expr: PhysicalSortExpr::new(
Arc::new(Column::new("a", 0)),
SortOptions::default(),
),
expected_plans: vec![
r#" - SortExec: TopK(fetch=2), expr=[a@0 ASC], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 as a]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]"#.to_string(),
r#" - SortExec: TopK(fetch=2), expr=[a@0 ASC], preserve_partitioning=[false], filter=[a@0 IS NULL OR a@0 < 2]
- ProjectionExec: expr=[a@0 as a]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 IS NULL OR a@0 < 2 ]"#.to_string(),
],
})
.await;

// Case 3: Identity [a, b]
run_projection_dyn_filter_case(ProjectionDynFilterTestCase {
schema: Arc::clone(&schema),
batches: simple_abc.clone(),
projection: vec![
(col("a", &schema).unwrap(), "a".to_string()),
(col("b", &schema).unwrap(), "b".to_string()),
],
sort_expr: PhysicalSortExpr::new(
Arc::new(Column::new("a", 0)),
SortOptions::default(),
),
expected_plans: vec![
r#" - SortExec: TopK(fetch=2), expr=[a@0 ASC], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 as a, b@1 as b]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]"#.to_string(),
r#" - SortExec: TopK(fetch=2), expr=[a@0 ASC], preserve_partitioning=[false], filter=[a@0 IS NULL OR a@0 < 2]
- ProjectionExec: expr=[a@0 as a, b@1 as b]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 IS NULL OR a@0 < 2 ]"#.to_string(),
],
})
.await;

// Case 4: Expressions [a + 1, b]
run_projection_dyn_filter_case(ProjectionDynFilterTestCase {
schema: Arc::clone(&schema),
batches: simple_abc.clone(),
projection: vec![
(
Arc::new(BinaryExpr::new(
col("a", &schema).unwrap(),
Operator::Plus,
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
)),
"a_plus_1".to_string(),
),
(col("b", &schema).unwrap(), "b".to_string()),
],
sort_expr: PhysicalSortExpr::new(
Arc::new(Column::new("a_plus_1", 0)),
SortOptions::default(),
),
expected_plans: vec![
r#" - SortExec: TopK(fetch=2), expr=[a_plus_1@0 ASC], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 + 1 as a_plus_1, b@1 as b]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]"#.to_string(),
r#" - SortExec: TopK(fetch=2), expr=[a_plus_1@0 ASC], preserve_partitioning=[false], filter=[a_plus_1@0 IS NULL OR a_plus_1@0 < 3]
- ProjectionExec: expr=[a@0 + 1 as a_plus_1, b@1 as b]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 + 1 IS NULL OR a@0 + 1 < 3 ]"#.to_string(),
],
})
.await;

// Case 5: [a as b, b as a] (swapped columns)
run_projection_dyn_filter_case(ProjectionDynFilterTestCase {
schema: Arc::clone(&schema),
batches: simple_abc.clone(),
projection: vec![
(col("a", &schema).unwrap(), "b".to_string()),
(col("b", &schema).unwrap(), "a".to_string()),
],
sort_expr: PhysicalSortExpr::new(
Arc::new(Column::new("b", 0)),
SortOptions::default(),
),
expected_plans: vec![
r#" - SortExec: TopK(fetch=2), expr=[b@0 ASC], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 as b, b@1 as a]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]"#.to_string(),
r#" - SortExec: TopK(fetch=2), expr=[b@0 ASC], preserve_partitioning=[false], filter=[b@0 IS NULL OR b@0 < 2]
- ProjectionExec: expr=[a@0 as b, b@1 as a]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 IS NULL OR a@0 < 2 ]"#.to_string(),
],
})
.await;

// Case 6: Confusing expr [a + 1 as a, b]
run_projection_dyn_filter_case(ProjectionDynFilterTestCase {
schema: Arc::clone(&schema),
batches: simple_abc.clone(),
projection: vec![
(
Arc::new(BinaryExpr::new(
col("a", &schema).unwrap(),
Operator::Plus,
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
)),
"a".to_string(),
),
(col("b", &schema).unwrap(), "b".to_string()),
],
sort_expr: PhysicalSortExpr::new(
Arc::new(Column::new("a", 0)),
SortOptions::default(),
),
expected_plans: vec![
r#" - SortExec: TopK(fetch=2), expr=[a@0 ASC], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 + 1 as a, b@1 as b]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]"#.to_string(),
r#" - SortExec: TopK(fetch=2), expr=[a@0 ASC], preserve_partitioning=[false], filter=[a@0 IS NULL OR a@0 < 3]
- ProjectionExec: expr=[a@0 + 1 as a, b@1 as b]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 + 1 IS NULL OR a@0 + 1 < 3 ]"#.to_string(),
],
})
.await;
}

/// Returns a predicate that is a binary expression col = lit
fn col_lit_predicate(
column_name: &str,
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ pub mod streaming;
pub mod tree_node;
pub mod union;
pub mod unnest;
pub mod util;
pub mod windows;
pub mod work_table;
pub mod udaf {
Expand Down
Loading