@@ -4086,3 +4086,277 @@ async fn test_filter_with_projection_pushdown() {
40864086 ] ;
40874087 assert_batches_eq ! ( expected, & result) ;
40884088}
4089+
4090+ #[ tokio:: test]
4091+ async fn test_hashjoin_dynamic_filter_pushdown_left_join ( ) {
4092+ use datafusion_common:: JoinType ;
4093+ use datafusion_physical_plan:: joins:: { HashJoinExec , PartitionMode } ;
4094+
4095+ // Create build side with limited values
4096+ let build_batches = vec ! [
4097+ record_batch!(
4098+ ( "a" , Utf8 , [ "aa" , "ab" ] ) ,
4099+ ( "b" , Utf8 , [ "ba" , "bb" ] ) ,
4100+ ( "c" , Float64 , [ 1.0 , 2.0 ] )
4101+ )
4102+ . unwrap( ) ,
4103+ ] ;
4104+ let build_side_schema = Arc :: new ( Schema :: new ( vec ! [
4105+ Field :: new( "a" , DataType :: Utf8 , false ) ,
4106+ Field :: new( "b" , DataType :: Utf8 , false ) ,
4107+ Field :: new( "c" , DataType :: Float64 , false ) ,
4108+ ] ) ) ;
4109+ let build_scan = TestScanBuilder :: new ( Arc :: clone ( & build_side_schema) )
4110+ . with_support ( true )
4111+ . with_batches ( build_batches)
4112+ . build ( ) ;
4113+
4114+ // Create probe side with more values (some won't match)
4115+ let probe_batches = vec ! [
4116+ record_batch!(
4117+ ( "a" , Utf8 , [ "aa" , "ab" , "ac" , "ad" ] ) ,
4118+ ( "b" , Utf8 , [ "ba" , "bb" , "bc" , "bd" ] ) ,
4119+ ( "e" , Float64 , [ 1.0 , 2.0 , 3.0 , 4.0 ] )
4120+ )
4121+ . unwrap( ) ,
4122+ ] ;
4123+ let probe_side_schema = Arc :: new ( Schema :: new ( vec ! [
4124+ Field :: new( "a" , DataType :: Utf8 , false ) ,
4125+ Field :: new( "b" , DataType :: Utf8 , false ) ,
4126+ Field :: new( "e" , DataType :: Float64 , false ) ,
4127+ ] ) ) ;
4128+ let probe_scan = TestScanBuilder :: new ( Arc :: clone ( & probe_side_schema) )
4129+ . with_support ( true )
4130+ . with_batches ( probe_batches)
4131+ . build ( ) ;
4132+
4133+ // Create HashJoinExec with Left join and CollectLeft mode
4134+ let on = vec ! [
4135+ (
4136+ col( "a" , & build_side_schema) . unwrap( ) ,
4137+ col( "a" , & probe_side_schema) . unwrap( ) ,
4138+ ) ,
4139+ (
4140+ col( "b" , & build_side_schema) . unwrap( ) ,
4141+ col( "b" , & probe_side_schema) . unwrap( ) ,
4142+ ) ,
4143+ ] ;
4144+ let plan = Arc :: new (
4145+ HashJoinExec :: try_new (
4146+ build_scan,
4147+ Arc :: clone ( & probe_scan) ,
4148+ on,
4149+ None ,
4150+ & JoinType :: Left ,
4151+ None ,
4152+ PartitionMode :: CollectLeft ,
4153+ datafusion_common:: NullEquality :: NullEqualsNothing ,
4154+ false ,
4155+ )
4156+ . unwrap ( ) ,
4157+ ) as Arc < dyn ExecutionPlan > ;
4158+
4159+ // Expect the dynamic filter predicate to be pushed down into the probe side DataSource
4160+ insta:: assert_snapshot!(
4161+ OptimizationTest :: new( Arc :: clone( & plan) , FilterPushdown :: new_post_optimization( ) , true ) ,
4162+ @r"
4163+ OptimizationTest:
4164+ input:
4165+ - HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@0), (b@1, b@1)]
4166+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
4167+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true
4168+ output:
4169+ Ok:
4170+ - HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@0), (b@1, b@1)]
4171+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
4172+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
4173+ " ,
4174+ ) ;
4175+
4176+ // Actually apply the optimization and execute the plan
4177+ let mut config = ConfigOptions :: default ( ) ;
4178+ config. execution . parquet . pushdown_filters = true ;
4179+ config. optimizer . enable_dynamic_filter_pushdown = true ;
4180+ let plan = FilterPushdown :: new_post_optimization ( )
4181+ . optimize ( plan, & config)
4182+ . unwrap ( ) ;
4183+
4184+ // Test that dynamic filter linking survives with_new_children
4185+ let children = plan. children ( ) . into_iter ( ) . map ( Arc :: clone) . collect ( ) ;
4186+ let plan = plan. with_new_children ( children) . unwrap ( ) ;
4187+
4188+ let config = SessionConfig :: new ( ) . with_batch_size ( 10 ) ;
4189+ let session_ctx = SessionContext :: new_with_config ( config) ;
4190+ session_ctx. register_object_store (
4191+ ObjectStoreUrl :: parse ( "test://" ) . unwrap ( ) . as_ref ( ) ,
4192+ Arc :: new ( InMemory :: new ( ) ) ,
4193+ ) ;
4194+ let state = session_ctx. state ( ) ;
4195+ let task_ctx = state. task_ctx ( ) ;
4196+ let batches = collect ( Arc :: clone ( & plan) , Arc :: clone ( & task_ctx) )
4197+ . await
4198+ . unwrap ( ) ;
4199+
4200+ // After execution, verify the dynamic filter was populated with bounds and IN-list
4201+ insta:: assert_snapshot!(
4202+ format!( "{}" , format_plan_for_test( & plan) ) ,
4203+ @r"
4204+ - HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@0), (b@1, b@1)]
4205+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
4206+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ]
4207+ "
4208+ ) ;
4209+
4210+ // Verify result correctness: left join preserves all build (left) rows.
4211+ // All build rows match probe rows here, so we get 2 matched rows.
4212+ // The dynamic filter pruned unmatched probe rows (ac, ad) at scan time,
4213+ // which is safe because those probe rows would never match any build row.
4214+ let result = format ! ( "{}" , pretty_format_batches( & batches) . unwrap( ) ) ;
4215+ insta:: assert_snapshot!(
4216+ result,
4217+ @r"
4218+ +----+----+-----+----+----+-----+
4219+ | a | b | c | a | b | e |
4220+ +----+----+-----+----+----+-----+
4221+ | aa | ba | 1.0 | aa | ba | 1.0 |
4222+ | ab | bb | 2.0 | ab | bb | 2.0 |
4223+ +----+----+-----+----+----+-----+
4224+ "
4225+ ) ;
4226+ }
4227+
4228+ #[ tokio:: test]
4229+ async fn test_hashjoin_dynamic_filter_pushdown_left_semi_join ( ) {
4230+ use datafusion_common:: JoinType ;
4231+ use datafusion_physical_plan:: joins:: { HashJoinExec , PartitionMode } ;
4232+
4233+ // Create build side with limited values
4234+ let build_batches = vec ! [
4235+ record_batch!(
4236+ ( "a" , Utf8 , [ "aa" , "ab" ] ) ,
4237+ ( "b" , Utf8 , [ "ba" , "bb" ] ) ,
4238+ ( "c" , Float64 , [ 1.0 , 2.0 ] )
4239+ )
4240+ . unwrap( ) ,
4241+ ] ;
4242+ let build_side_schema = Arc :: new ( Schema :: new ( vec ! [
4243+ Field :: new( "a" , DataType :: Utf8 , false ) ,
4244+ Field :: new( "b" , DataType :: Utf8 , false ) ,
4245+ Field :: new( "c" , DataType :: Float64 , false ) ,
4246+ ] ) ) ;
4247+ let build_scan = TestScanBuilder :: new ( Arc :: clone ( & build_side_schema) )
4248+ . with_support ( true )
4249+ . with_batches ( build_batches)
4250+ . build ( ) ;
4251+
4252+ // Create probe side with more values (some won't match)
4253+ let probe_batches = vec ! [
4254+ record_batch!(
4255+ ( "a" , Utf8 , [ "aa" , "ab" , "ac" , "ad" ] ) ,
4256+ ( "b" , Utf8 , [ "ba" , "bb" , "bc" , "bd" ] ) ,
4257+ ( "e" , Float64 , [ 1.0 , 2.0 , 3.0 , 4.0 ] )
4258+ )
4259+ . unwrap( ) ,
4260+ ] ;
4261+ let probe_side_schema = Arc :: new ( Schema :: new ( vec ! [
4262+ Field :: new( "a" , DataType :: Utf8 , false ) ,
4263+ Field :: new( "b" , DataType :: Utf8 , false ) ,
4264+ Field :: new( "e" , DataType :: Float64 , false ) ,
4265+ ] ) ) ;
4266+ let probe_scan = TestScanBuilder :: new ( Arc :: clone ( & probe_side_schema) )
4267+ . with_support ( true )
4268+ . with_batches ( probe_batches)
4269+ . build ( ) ;
4270+
4271+ // Create HashJoinExec with LeftSemi join and CollectLeft mode
4272+ let on = vec ! [
4273+ (
4274+ col( "a" , & build_side_schema) . unwrap( ) ,
4275+ col( "a" , & probe_side_schema) . unwrap( ) ,
4276+ ) ,
4277+ (
4278+ col( "b" , & build_side_schema) . unwrap( ) ,
4279+ col( "b" , & probe_side_schema) . unwrap( ) ,
4280+ ) ,
4281+ ] ;
4282+ let plan = Arc :: new (
4283+ HashJoinExec :: try_new (
4284+ build_scan,
4285+ Arc :: clone ( & probe_scan) ,
4286+ on,
4287+ None ,
4288+ & JoinType :: LeftSemi ,
4289+ None ,
4290+ PartitionMode :: CollectLeft ,
4291+ datafusion_common:: NullEquality :: NullEqualsNothing ,
4292+ false ,
4293+ )
4294+ . unwrap ( ) ,
4295+ ) as Arc < dyn ExecutionPlan > ;
4296+
4297+ // Expect the dynamic filter predicate to be pushed down into the probe side DataSource
4298+ insta:: assert_snapshot!(
4299+ OptimizationTest :: new( Arc :: clone( & plan) , FilterPushdown :: new_post_optimization( ) , true ) ,
4300+ @r"
4301+ OptimizationTest:
4302+ input:
4303+ - HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(a@0, a@0), (b@1, b@1)]
4304+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
4305+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true
4306+ output:
4307+ Ok:
4308+ - HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(a@0, a@0), (b@1, b@1)]
4309+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
4310+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
4311+ " ,
4312+ ) ;
4313+
4314+ // Actually apply the optimization and execute the plan
4315+ let mut config = ConfigOptions :: default ( ) ;
4316+ config. execution . parquet . pushdown_filters = true ;
4317+ config. optimizer . enable_dynamic_filter_pushdown = true ;
4318+ let plan = FilterPushdown :: new_post_optimization ( )
4319+ . optimize ( plan, & config)
4320+ . unwrap ( ) ;
4321+
4322+ // Test that dynamic filter linking survives with_new_children
4323+ let children = plan. children ( ) . into_iter ( ) . map ( Arc :: clone) . collect ( ) ;
4324+ let plan = plan. with_new_children ( children) . unwrap ( ) ;
4325+
4326+ let config = SessionConfig :: new ( ) . with_batch_size ( 10 ) ;
4327+ let session_ctx = SessionContext :: new_with_config ( config) ;
4328+ session_ctx. register_object_store (
4329+ ObjectStoreUrl :: parse ( "test://" ) . unwrap ( ) . as_ref ( ) ,
4330+ Arc :: new ( InMemory :: new ( ) ) ,
4331+ ) ;
4332+ let state = session_ctx. state ( ) ;
4333+ let task_ctx = state. task_ctx ( ) ;
4334+ let batches = collect ( Arc :: clone ( & plan) , Arc :: clone ( & task_ctx) )
4335+ . await
4336+ . unwrap ( ) ;
4337+
4338+ // After execution, verify the dynamic filter was populated with bounds and IN-list
4339+ insta:: assert_snapshot!(
4340+ format!( "{}" , format_plan_for_test( & plan) ) ,
4341+ @r"
4342+ - HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(a@0, a@0), (b@1, b@1)]
4343+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
4344+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ]
4345+ "
4346+ ) ;
4347+
4348+ // Verify result correctness: left semi join returns only build (left) rows
4349+ // that have at least one matching probe row. Output schema is build-side columns only.
4350+ let result = format ! ( "{}" , pretty_format_batches( & batches) . unwrap( ) ) ;
4351+ insta:: assert_snapshot!(
4352+ result,
4353+ @r"
4354+ +----+----+-----+
4355+ | a | b | c |
4356+ +----+----+-----+
4357+ | aa | ba | 1.0 |
4358+ | ab | bb | 2.0 |
4359+ +----+----+-----+
4360+ "
4361+ ) ;
4362+ }
0 commit comments