Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
274 changes: 274 additions & 0 deletions datafusion/core/tests/physical_optimizer/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4086,3 +4086,277 @@ async fn test_filter_with_projection_pushdown() {
];
assert_batches_eq!(expected, &result);
}

#[tokio::test]
async fn test_hashjoin_dynamic_filter_pushdown_left_join() {
use datafusion_common::JoinType;
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};

// Create build side with limited values
let build_batches = vec![
record_batch!(
("a", Utf8, ["aa", "ab"]),
("b", Utf8, ["ba", "bb"]),
("c", Float64, [1.0, 2.0])
)
.unwrap(),
];
let build_side_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Utf8, false),
Field::new("c", DataType::Float64, false),
]));
let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
.with_support(true)
.with_batches(build_batches)
.build();

// Create probe side with more values (some won't match)
let probe_batches = vec![
record_batch!(
("a", Utf8, ["aa", "ab", "ac", "ad"]),
("b", Utf8, ["ba", "bb", "bc", "bd"]),
("e", Float64, [1.0, 2.0, 3.0, 4.0])
)
.unwrap(),
];
let probe_side_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Utf8, false),
Field::new("e", DataType::Float64, false),
]));
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
.with_support(true)
.with_batches(probe_batches)
.build();

// Create HashJoinExec with Left join and CollectLeft mode
let on = vec![
(
col("a", &build_side_schema).unwrap(),
col("a", &probe_side_schema).unwrap(),
),
(
col("b", &build_side_schema).unwrap(),
col("b", &probe_side_schema).unwrap(),
),
];
let plan = Arc::new(
HashJoinExec::try_new(
build_scan,
Arc::clone(&probe_scan),
on,
None,
&JoinType::Left,
None,
PartitionMode::CollectLeft,
datafusion_common::NullEquality::NullEqualsNothing,
false,
)
.unwrap(),
) as Arc<dyn ExecutionPlan>;

// Expect the dynamic filter predicate to be pushed down into the probe side DataSource
insta::assert_snapshot!(
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true),
@r"
OptimizationTest:
input:
- HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@0), (b@1, b@1)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true
output:
Ok:
- HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@0), (b@1, b@1)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
",
);

// Actually apply the optimization and execute the plan
let mut config = ConfigOptions::default();
config.execution.parquet.pushdown_filters = true;
config.optimizer.enable_dynamic_filter_pushdown = true;
let plan = FilterPushdown::new_post_optimization()
.optimize(plan, &config)
.unwrap();

// Test that dynamic filter linking survives with_new_children
let children = plan.children().into_iter().map(Arc::clone).collect();
let plan = plan.with_new_children(children).unwrap();

let config = SessionConfig::new().with_batch_size(10);
let session_ctx = SessionContext::new_with_config(config);
session_ctx.register_object_store(
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
Arc::new(InMemory::new()),
);
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx))
.await
.unwrap();

// After execution, verify the dynamic filter was populated with bounds and IN-list
insta::assert_snapshot!(
format!("{}", format_plan_for_test(&plan)),
@r"
- HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@0), (b@1, b@1)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ]
"
);

// Verify result correctness: left join preserves all build (left) rows.
// All build rows match probe rows here, so we get 2 matched rows.
// The dynamic filter pruned unmatched probe rows (ac, ad) at scan time,
// which is safe because those probe rows would never match any build row.
let result = format!("{}", pretty_format_batches(&batches).unwrap());
insta::assert_snapshot!(
result,
@r"
+----+----+-----+----+----+-----+
| a | b | c | a | b | e |
+----+----+-----+----+----+-----+
| aa | ba | 1.0 | aa | ba | 1.0 |
| ab | bb | 2.0 | ab | bb | 2.0 |
+----+----+-----+----+----+-----+
"
);
}

#[tokio::test]
async fn test_hashjoin_dynamic_filter_pushdown_left_semi_join() {
use datafusion_common::JoinType;
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};

// Create build side with limited values
let build_batches = vec![
record_batch!(
("a", Utf8, ["aa", "ab"]),
("b", Utf8, ["ba", "bb"]),
("c", Float64, [1.0, 2.0])
)
.unwrap(),
];
let build_side_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Utf8, false),
Field::new("c", DataType::Float64, false),
]));
let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
.with_support(true)
.with_batches(build_batches)
.build();

// Create probe side with more values (some won't match)
let probe_batches = vec![
record_batch!(
("a", Utf8, ["aa", "ab", "ac", "ad"]),
("b", Utf8, ["ba", "bb", "bc", "bd"]),
("e", Float64, [1.0, 2.0, 3.0, 4.0])
)
.unwrap(),
];
let probe_side_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Utf8, false),
Field::new("e", DataType::Float64, false),
]));
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
.with_support(true)
.with_batches(probe_batches)
.build();

// Create HashJoinExec with LeftSemi join and CollectLeft mode
let on = vec![
(
col("a", &build_side_schema).unwrap(),
col("a", &probe_side_schema).unwrap(),
),
(
col("b", &build_side_schema).unwrap(),
col("b", &probe_side_schema).unwrap(),
),
];
let plan = Arc::new(
HashJoinExec::try_new(
build_scan,
Arc::clone(&probe_scan),
on,
None,
&JoinType::LeftSemi,
None,
PartitionMode::CollectLeft,
datafusion_common::NullEquality::NullEqualsNothing,
false,
)
.unwrap(),
) as Arc<dyn ExecutionPlan>;

// Expect the dynamic filter predicate to be pushed down into the probe side DataSource
insta::assert_snapshot!(
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true),
@r"
OptimizationTest:
input:
- HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(a@0, a@0), (b@1, b@1)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true
output:
Ok:
- HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(a@0, a@0), (b@1, b@1)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
",
);

// Actually apply the optimization and execute the plan
let mut config = ConfigOptions::default();
config.execution.parquet.pushdown_filters = true;
config.optimizer.enable_dynamic_filter_pushdown = true;
let plan = FilterPushdown::new_post_optimization()
.optimize(plan, &config)
.unwrap();

// Test that dynamic filter linking survives with_new_children
let children = plan.children().into_iter().map(Arc::clone).collect();
let plan = plan.with_new_children(children).unwrap();

let config = SessionConfig::new().with_batch_size(10);
let session_ctx = SessionContext::new_with_config(config);
session_ctx.register_object_store(
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
Arc::new(InMemory::new()),
);
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx))
.await
.unwrap();

// After execution, verify the dynamic filter was populated with bounds and IN-list
insta::assert_snapshot!(
format!("{}", format_plan_for_test(&plan)),
@r"
- HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(a@0, a@0), (b@1, b@1)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ]
"
);

// Verify result correctness: left semi join returns only build (left) rows
// that have at least one matching probe row. Output schema is build-side columns only.
let result = format!("{}", pretty_format_batches(&batches).unwrap());
insta::assert_snapshot!(
result,
@r"
+----+----+-----+
| a | b | c |
+----+----+-----+
| aa | ba | 1.0 |
| ab | bb | 2.0 |
+----+----+-----+
"
);
}
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,7 @@ impl HashJoinExec {
}

fn allow_join_dynamic_filter_pushdown(&self, config: &ConfigOptions) -> bool {
if self.join_type != JoinType::Inner
if !matches!(self.join_type, JoinType::Inner | JoinType::Left | JoinType::LeftSemi)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can JoinType::Right and JoinType::RightSemi also be applied based on the same rationale?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. The build side is always the left side and the probe side is always the right side. The filters are therefore always pushed to the right side. We need to retain all rows from the right side for a Right join, so there's nothing we can push down.

A right join query can still take advantage of this if the the optimizer decides to swap the sides to build the right side to the join, in that case it would become a left join here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I think it might be safe to add RightSemi and LeftAnti to this list.

The dynamic filter only removes probe rows that can't match any build row, and neither RightSemi (which outputs only matched probe rows) nor LeftAnti (which outputs only unmatched build rows) includes unmatched probe rows in its output. So I think it would be safe to add those here as well. Right and RightAnti would not be.

|| !config.optimizer.enable_join_dynamic_filter_pushdown
{
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,7 @@ ORDER BY l.id;
5 left5 right5

# RIGHT JOIN: optimizer swaps to physical Left join (build=right_parquet, probe=left_parquet).
# No self-generated dynamic filter (only Inner joins get that), but parent filters
# on the preserved (build) side can still push down.
# Physical Left join generates a self-generated dynamic filter on the probe side.
query TT
EXPLAIN SELECT l.*, r.info
FROM left_parquet l
Expand All @@ -240,7 +239,7 @@ physical_plan
01)ProjectionExec: expr=[id@1 as id, data@2 as data, info@0 as info]
02)--HashJoinExec: mode=CollectLeft, join_type=Left, on=[(id@0, id@0)], projection=[info@1, id@2, data@3]
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet
04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet
04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ]

# RIGHT JOIN correctness: all right rows appear, unmatched left rows produce NULLs
query ITT
Expand Down Expand Up @@ -292,6 +291,36 @@ physical_plan
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id], file_type=parquet
03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet

# LEFT SEMI JOIN (physical LeftSemi): reverse table roles so optimizer keeps LeftSemi
# (right_parquet has 3 rows < left_parquet has 5 rows, so no swap occurs).
# Physical LeftSemi generates a self-generated dynamic filter on the probe side.
query TT
EXPLAIN SELECT r.*
FROM right_parquet r
WHERE r.id IN (SELECT l.id FROM left_parquet l);
----
logical_plan
01)LeftSemi Join: r.id = __correlated_sq_1.id
02)--SubqueryAlias: r
03)----TableScan: right_parquet projection=[id, info]
04)--SubqueryAlias: __correlated_sq_1
05)----SubqueryAlias: l
06)------TableScan: left_parquet projection=[id]
physical_plan
01)HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(id@0, id@0)]
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet
03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ]

# LEFT SEMI (physical LeftSemi) correctness: only right rows with matching left ids
query IT rowsort
SELECT r.*
FROM right_parquet r
WHERE r.id IN (SELECT l.id FROM left_parquet l);
----
1 right1
3 right3
5 right5

# LEFT ANTI JOIN: no self-generated dynamic filter, but parent filters can push
# to the preserved (left/build) side.
query TT
Expand Down