2727import  org .apache .flink .types .RowKind ;
2828
2929import  java .util .Arrays ;
30+ import  java .util .HashMap ;
3031import  java .util .Map ;
3132import  java .util .stream .Collectors ;
3233import  java .util .stream .Stream ;
@@ -168,6 +169,70 @@ public class DeltaJoinTestPrograms {
168169                                    + "on a1 = b1 and a2 <> b2" )
169170                    .build ();
170171
172+     public  static  final  TableTestProgram  DELTA_JOIN_WITH_CALC_ON_SOURCE  =
173+             TableTestProgram .of (
174+                             "delta-join-with-calc-on-source" ,
175+                             "validates delta join with calc on source" )
176+                     .setupConfig (
177+                             OptimizerConfigOptions .TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY ,
178+                             OptimizerConfigOptions .DeltaJoinStrategy .FORCE )
179+                     .setupTableSources (
180+                             DELTA_JOIN_WITH_JOIN_KEY_EQUALS_INDEX .getSetupSourceTestSteps ())
181+                     .setupTableSink (
182+                             SinkTestStep .newBuilder ("snk" )
183+                                     .addSchema (addPk2Schema (SINK_TABLE_BASE_SCHEMA , "a0" , "b0" ))
184+                                     .addOptions (TABLE_BASE_OPTIONS )
185+                                     .testMaterializedData ()
186+                                     // deduplicate data by pk 
187+                                     .deduplicatedFieldIndices (new  int [] {1 , 3 })
188+                                     .consumedBeforeRestore (Row .of (6 , 5.0 , "l-5-1" , 5.0 , "r-5-1" , 6 ))
189+                                     .consumedAfterRestore (Row .of (6 , 5.0 , "l-5-2" , 5.0 , "r-5-1" , 6 ))
190+                                     .build ())
191+                     .runSql (
192+                             "insert into snk " 
193+                                     + "select new_a1, a0, a2, b0, b2, new_b1 from ( " 
194+                                     + "   select a0, a1, a2, a1 + 1 as new_a1 from leftSrc " 
195+                                     + "       where a1 = 1 or a1 = 5 " 
196+                                     + ") join (" 
197+                                     + "   select b0, b1, b1 + 1 as new_b1, b2 from rightSrc " 
198+                                     + "       where b0 = cast(3.0 as double) or b0 = cast(5.0 as double) " 
199+                                     + ") " 
200+                                     + "on a1 = b1 and a0 = b0" )
201+                     .build ();
202+ 
203+     public  static  final  TableTestProgram  DELTA_JOIN_WITH_CALC_ON_SOURCE_AND_FILTER_PUSHED_DOWN  =
204+             TableTestProgram .of (
205+                             "delta-join-with-calc-on-source-and-filter-pushed-down" ,
206+                             "validates delta join with calc on source and filter pushed down" )
207+                     .setupConfig (
208+                             OptimizerConfigOptions .TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY ,
209+                             OptimizerConfigOptions .DeltaJoinStrategy .FORCE )
210+                     .setupConfig (ExecutionConfigOptions .TABLE_EXEC_DELTA_JOIN_CACHE_ENABLED , true )
211+                     .setupTableSources (
212+                             DELTA_JOIN_WITH_CALC_ON_SOURCE .getSetupSourceTestSteps ().stream ()
213+                                     .map (
214+                                             sourceTestStep  -> {
215+                                                 String  filterableFields ;
216+                                                 if  (sourceTestStep .name .equals ("leftSrc" )) {
217+                                                     filterableFields  = "a1" ;
218+                                                 } else  if  (sourceTestStep .name .equals ("rightSrc" )) {
219+                                                     filterableFields  = "b0" ;
220+                                                 } else  {
221+                                                     throw  new  IllegalStateException (
222+                                                             "Unknown test table name: " 
223+                                                                     + sourceTestStep .name );
224+                                                 }
225+                                                 Map <String , String > oldOptions  =
226+                                                         new  HashMap <>(sourceTestStep .options );
227+                                                 oldOptions .put (
228+                                                         "filterable-fields" , filterableFields );
229+                                                 return  sourceTestStep .withNewOptions (oldOptions );
230+                                             })
231+                                     .collect (Collectors .toList ()))
232+                     .setupTableSinks (DELTA_JOIN_WITH_CALC_ON_SOURCE .getSetupSinkTestSteps ())
233+                     .runSql (DELTA_JOIN_WITH_CALC_ON_SOURCE .getRunSqlTestStep ().sql )
234+                     .build ();
235+ 
171236    public  static  final  TableTestProgram  DELTA_JOIN_WITH_CACHE  =
172237            TableTestProgram .of ("delta-join-with-cache" , "validates delta join with cache" )
173238                    .setupConfig (
@@ -180,6 +245,19 @@ public class DeltaJoinTestPrograms {
180245                    .runSql (DELTA_JOIN_WITH_NON_EQUIV_CONDITION .getRunSqlTestStep ().sql )
181246                    .build ();
182247
248+     public  static  final  TableTestProgram  DELTA_JOIN_WITH_CACHE_AND_CALC_ON_SOURCE  =
249+             TableTestProgram .of (
250+                             "delta-join-with-cache-and-calc-on-source" ,
251+                             "validates delta join with cache and calc on source" )
252+                     .setupConfig (
253+                             OptimizerConfigOptions .TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY ,
254+                             OptimizerConfigOptions .DeltaJoinStrategy .FORCE )
255+                     .setupConfig (ExecutionConfigOptions .TABLE_EXEC_DELTA_JOIN_CACHE_ENABLED , true )
256+                     .setupTableSources (DELTA_JOIN_WITH_CALC_ON_SOURCE .getSetupSourceTestSteps ())
257+                     .setupTableSinks (DELTA_JOIN_WITH_CALC_ON_SOURCE .getSetupSinkTestSteps ())
258+                     .runSql (DELTA_JOIN_WITH_CALC_ON_SOURCE .getRunSqlTestStep ().sql )
259+                     .build ();
260+ 
183261    public  static  final  TableTestProgram  DELTA_JOIN_WITH_CDC_SOURCE_WITHOUT_DELETE  =
184262            TableTestProgram .of (
185263                            "delta-join-with-cdc-source-without-delete" ,
@@ -259,6 +337,44 @@ public class DeltaJoinTestPrograms {
259337                                    + "on a1 = b1" )
260338                    .build ();
261339
340+     public  static  final  TableTestProgram  DELTA_JOIN_WITH_CALC_ON_CDC_SOURCE_WITHOUT_DELETE  =
341+             TableTestProgram .of (
342+                             "delta-join-with-calc-on-cdc-source-without-delete" ,
343+                             "validates delta join with calc on cdc source without delete " )
344+                     .setupConfig (
345+                             OptimizerConfigOptions .TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY ,
346+                             OptimizerConfigOptions .DeltaJoinStrategy .FORCE )
347+                     .setupConfig (ExecutionConfigOptions .TABLE_EXEC_DELTA_JOIN_CACHE_ENABLED , true )
348+                     .setupTableSources (
349+                             DELTA_JOIN_WITH_CDC_SOURCE_WITHOUT_DELETE .getSetupSourceTestSteps ())
350+                     .setupTableSink (
351+                             SinkTestStep .newBuilder ("snk" )
352+                                     .addSchema (
353+                                             addPk2Schema (
354+                                                     SINK_TABLE_BASE_SCHEMA , "a0" , "b0" , "a1" , "b1" ))
355+                                     .addOptions (TABLE_BASE_OPTIONS )
356+                                     .testMaterializedData ()
357+                                     // deduplicate data by pk 
358+                                     .deduplicatedFieldIndices (new  int [] {0 , 1 , 3 , 5 })
359+                                     .consumedBeforeRestore (
360+                                             Row .of (1 , 1.0 , "left-pk1-2-s" , 2.0 , "right-pk1-1-s" , 1 ),
361+                                             Row .of (1 , 2.0 , "left-pk2-1-s" , 2.0 , "right-pk1-1-s" , 1 ))
362+                                     .consumedAfterRestore (
363+                                             Row .of (1 , 1.0 , "left-pk1-2-s" , 2.0 , "right-pk1-2-s" , 1 ),
364+                                             Row .of (1 , 2.0 , "left-pk2-2-s" , 2.0 , "right-pk1-2-s" , 1 ))
365+                                     .build ())
366+                     .runSql (
367+                             "insert into snk " 
368+                                     + "select a1, a0, new_a2, b0, new_b2, b1 from ( " 
369+                                     + "   select a0, a1, a2, concat(a2, '-s') as new_a2 from leftSrc " 
370+                                     + "       where a0 = cast(1.0 as double) or a0 = cast(2.0 as double) " 
371+                                     + ") join (" 
372+                                     + "   select b0, b1, concat(b2, '-s') as new_b2, b2 from rightSrc " 
373+                                     + "       where b0 = cast(2.0 as double) or b0 = cast(3.0 as double) " 
374+                                     + ") " 
375+                                     + "on a1 = b1" )
376+                     .build ();
377+ 
262378    public  static  final  TableTestProgram  DELTA_JOIN_WITH_CACHE_AND_CDC_SOURCE_WITHOUT_DELETE  =
263379            TableTestProgram .of (
264380                            "delta-join-with-cache-and-cdc-source-without-delete" ,
@@ -274,6 +390,29 @@ public class DeltaJoinTestPrograms {
274390                    .runSql (DELTA_JOIN_WITH_CDC_SOURCE_WITHOUT_DELETE .getRunSqlTestStep ().sql )
275391                    .build ();
276392
393+     public  static  final  TableTestProgram 
394+             DELTA_JOIN_WITH_CACHE_AND_CALC_ON_CDC_SOURCE_WITHOUT_DELETE  =
395+                     TableTestProgram .of (
396+                                     "delta-join-with-cache-and-calc-on-cdc-source-without-delete" ,
397+                                     "validates delta join with cache and calc on cdc source without delete" )
398+                             .setupConfig (
399+                                     OptimizerConfigOptions .TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY ,
400+                                     OptimizerConfigOptions .DeltaJoinStrategy .FORCE )
401+                             .setupConfig (
402+                                     ExecutionConfigOptions .TABLE_EXEC_DELTA_JOIN_CACHE_ENABLED ,
403+                                     true )
404+                             .setupTableSources (
405+                                     DELTA_JOIN_WITH_CALC_ON_CDC_SOURCE_WITHOUT_DELETE 
406+                                             .getSetupSourceTestSteps ())
407+                             .setupTableSinks (
408+                                     DELTA_JOIN_WITH_CALC_ON_CDC_SOURCE_WITHOUT_DELETE 
409+                                             .getSetupSinkTestSteps ())
410+                             .runSql (
411+                                     DELTA_JOIN_WITH_CALC_ON_CDC_SOURCE_WITHOUT_DELETE 
412+                                             .getRunSqlTestStep ()
413+                                             .sql )
414+                             .build ();
415+ 
277416    private  static  String [] addPk2Schema (String [] originalSchema , String ... pkCols ) {
278417        return  Stream .concat (
279418                        Arrays .stream (originalSchema ),
0 commit comments