diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs index 99db81d34d8f..ac6472748114 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs @@ -4086,3 +4086,277 @@ async fn test_filter_with_projection_pushdown() { ]; assert_batches_eq!(expected, &result); } + +#[tokio::test] +async fn test_hashjoin_dynamic_filter_pushdown_left_join() { + use datafusion_common::JoinType; + use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; + + // Create build side with limited values + let build_batches = vec![ + record_batch!( + ("a", Utf8, ["aa", "ab"]), + ("b", Utf8, ["ba", "bb"]), + ("c", Float64, [1.0, 2.0]) + ) + .unwrap(), + ]; + let build_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + Field::new("c", DataType::Float64, false), + ])); + let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema)) + .with_support(true) + .with_batches(build_batches) + .build(); + + // Create probe side with more values (some won't match) + let probe_batches = vec![ + record_batch!( + ("a", Utf8, ["aa", "ab", "ac", "ad"]), + ("b", Utf8, ["ba", "bb", "bc", "bd"]), + ("e", Float64, [1.0, 2.0, 3.0, 4.0]) + ) + .unwrap(), + ]; + let probe_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + Field::new("e", DataType::Float64, false), + ])); + let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema)) + .with_support(true) + .with_batches(probe_batches) + .build(); + + // Create HashJoinExec with Left join and CollectLeft mode + let on = vec![ + ( + col("a", &build_side_schema).unwrap(), + col("a", &probe_side_schema).unwrap(), + ), + ( + col("b", &build_side_schema).unwrap(), + col("b", &probe_side_schema).unwrap(), + ), + ]; + let plan = Arc::new( + HashJoinExec::try_new( + build_scan, + Arc::clone(&probe_scan), + on, + None, + &JoinType::Left, + None, + PartitionMode::CollectLeft, + datafusion_common::NullEquality::NullEqualsNothing, + false, + ) + .unwrap(), + ) as Arc; + + // Expect the dynamic filter predicate to be pushed down into the probe side DataSource + insta::assert_snapshot!( + OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true), + @r" + OptimizationTest: + input: + - HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@0), (b@1, b@1)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true + output: + Ok: + - HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@0), (b@1, b@1)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ] + ", + ); + + // Actually apply the optimization and execute the plan + let mut config = ConfigOptions::default(); + config.execution.parquet.pushdown_filters = true; + config.optimizer.enable_dynamic_filter_pushdown = true; + let plan = FilterPushdown::new_post_optimization() + .optimize(plan, &config) + .unwrap(); + + // Test that dynamic filter linking survives with_new_children + let children = plan.children().into_iter().map(Arc::clone).collect(); + let plan = plan.with_new_children(children).unwrap(); + + let config = SessionConfig::new().with_batch_size(10); + let session_ctx = SessionContext::new_with_config(config); + session_ctx.register_object_store( + ObjectStoreUrl::parse("test://").unwrap().as_ref(), + Arc::new(InMemory::new()), + ); + let state = session_ctx.state(); + let task_ctx = state.task_ctx(); + let batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx)) + .await + .unwrap(); + + // After execution, verify the dynamic filter was populated with bounds and IN-list + insta::assert_snapshot!( + format!("{}", format_plan_for_test(&plan)), + @r" + - HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@0), (b@1, b@1)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ] + " + ); + + // Verify result correctness: left join preserves all build (left) rows. + // All build rows match probe rows here, so we get 2 matched rows. + // The dynamic filter pruned unmatched probe rows (ac, ad) at scan time, + // which is safe because those probe rows would never match any build row. + let result = format!("{}", pretty_format_batches(&batches).unwrap()); + insta::assert_snapshot!( + result, + @r" + +----+----+-----+----+----+-----+ + | a | b | c | a | b | e | + +----+----+-----+----+----+-----+ + | aa | ba | 1.0 | aa | ba | 1.0 | + | ab | bb | 2.0 | ab | bb | 2.0 | + +----+----+-----+----+----+-----+ + " + ); +} + +#[tokio::test] +async fn test_hashjoin_dynamic_filter_pushdown_left_semi_join() { + use datafusion_common::JoinType; + use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; + + // Create build side with limited values + let build_batches = vec![ + record_batch!( + ("a", Utf8, ["aa", "ab"]), + ("b", Utf8, ["ba", "bb"]), + ("c", Float64, [1.0, 2.0]) + ) + .unwrap(), + ]; + let build_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + Field::new("c", DataType::Float64, false), + ])); + let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema)) + .with_support(true) + .with_batches(build_batches) + .build(); + + // Create probe side with more values (some won't match) + let probe_batches = vec![ + record_batch!( + ("a", Utf8, ["aa", "ab", "ac", "ad"]), + ("b", Utf8, ["ba", "bb", "bc", "bd"]), + ("e", Float64, [1.0, 2.0, 3.0, 4.0]) + ) + .unwrap(), + ]; + let probe_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + Field::new("e", DataType::Float64, false), + ])); + let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema)) + .with_support(true) + .with_batches(probe_batches) + .build(); + + // Create HashJoinExec with LeftSemi join and CollectLeft mode + let on = vec![ + ( + col("a", &build_side_schema).unwrap(), + col("a", &probe_side_schema).unwrap(), + ), + ( + col("b", &build_side_schema).unwrap(), + col("b", &probe_side_schema).unwrap(), + ), + ]; + let plan = Arc::new( + HashJoinExec::try_new( + build_scan, + Arc::clone(&probe_scan), + on, + None, + &JoinType::LeftSemi, + None, + PartitionMode::CollectLeft, + datafusion_common::NullEquality::NullEqualsNothing, + false, + ) + .unwrap(), + ) as Arc; + + // Expect the dynamic filter predicate to be pushed down into the probe side DataSource + insta::assert_snapshot!( + OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true), + @r" + OptimizationTest: + input: + - HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(a@0, a@0), (b@1, b@1)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true + output: + Ok: + - HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(a@0, a@0), (b@1, b@1)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ] + ", + ); + + // Actually apply the optimization and execute the plan + let mut config = ConfigOptions::default(); + config.execution.parquet.pushdown_filters = true; + config.optimizer.enable_dynamic_filter_pushdown = true; + let plan = FilterPushdown::new_post_optimization() + .optimize(plan, &config) + .unwrap(); + + // Test that dynamic filter linking survives with_new_children + let children = plan.children().into_iter().map(Arc::clone).collect(); + let plan = plan.with_new_children(children).unwrap(); + + let config = SessionConfig::new().with_batch_size(10); + let session_ctx = SessionContext::new_with_config(config); + session_ctx.register_object_store( + ObjectStoreUrl::parse("test://").unwrap().as_ref(), + Arc::new(InMemory::new()), + ); + let state = session_ctx.state(); + let task_ctx = state.task_ctx(); + let batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx)) + .await + .unwrap(); + + // After execution, verify the dynamic filter was populated with bounds and IN-list + insta::assert_snapshot!( + format!("{}", format_plan_for_test(&plan)), + @r" + - HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(a@0, a@0), (b@1, b@1)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ] + " + ); + + // Verify result correctness: left semi join returns only build (left) rows + // that have at least one matching probe row. Output schema is build-side columns only. + let result = format!("{}", pretty_format_batches(&batches).unwrap()); + insta::assert_snapshot!( + result, + @r" + +----+----+-----+ + | a | b | c | + +----+----+-----+ + | aa | ba | 1.0 | + | ab | bb | 2.0 | + +----+----+-----+ + " + ); +} diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index f39208bcb78d..854306280ea9 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -738,7 +738,7 @@ impl HashJoinExec { } fn allow_join_dynamic_filter_pushdown(&self, config: &ConfigOptions) -> bool { - if self.join_type != JoinType::Inner + if !matches!(self.join_type, JoinType::Inner | JoinType::Left | JoinType::LeftSemi) || !config.optimizer.enable_join_dynamic_filter_pushdown { return false; diff --git a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt index 275b0c9dd490..4bbdae7dee5c 100644 --- a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt +++ b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt @@ -222,8 +222,7 @@ ORDER BY l.id; 5 left5 right5 # RIGHT JOIN: optimizer swaps to physical Left join (build=right_parquet, probe=left_parquet). -# No self-generated dynamic filter (only Inner joins get that), but parent filters -# on the preserved (build) side can still push down. +# Physical Left join generates a self-generated dynamic filter on the probe side. query TT EXPLAIN SELECT l.*, r.info FROM left_parquet l @@ -240,7 +239,7 @@ physical_plan 01)ProjectionExec: expr=[id@1 as id, data@2 as data, info@0 as info] 02)--HashJoinExec: mode=CollectLeft, join_type=Left, on=[(id@0, id@0)], projection=[info@1, id@2, data@3] 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet -04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet +04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ] # RIGHT JOIN correctness: all right rows appear, unmatched left rows produce NULLs query ITT @@ -292,6 +291,36 @@ physical_plan 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id], file_type=parquet 03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet +# LEFT SEMI JOIN (physical LeftSemi): reverse table roles so optimizer keeps LeftSemi +# (right_parquet has 3 rows < left_parquet has 5 rows, so no swap occurs). +# Physical LeftSemi generates a self-generated dynamic filter on the probe side. +query TT +EXPLAIN SELECT r.* +FROM right_parquet r +WHERE r.id IN (SELECT l.id FROM left_parquet l); +---- +logical_plan +01)LeftSemi Join: r.id = __correlated_sq_1.id +02)--SubqueryAlias: r +03)----TableScan: right_parquet projection=[id, info] +04)--SubqueryAlias: __correlated_sq_1 +05)----SubqueryAlias: l +06)------TableScan: left_parquet projection=[id] +physical_plan +01)HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(id@0, id@0)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet +03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ] + +# LEFT SEMI (physical LeftSemi) correctness: only right rows with matching left ids +query IT rowsort +SELECT r.* +FROM right_parquet r +WHERE r.id IN (SELECT l.id FROM left_parquet l); +---- +1 right1 +3 right3 +5 right5 + # LEFT ANTI JOIN: no self-generated dynamic filter, but parent filters can push # to the preserved (left/build) side. query TT