- 
                Notifications
    You must be signed in to change notification settings 
- Fork 13.8k
[FLINK-38556][table] Support filter and project between source and delta join #27159
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
        
          
                ...ink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java
          
            Show resolved
            Hide resolved
        
              
          
                ...ink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
              
          
                ...ink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
      | Thanks for advancing the feature. Let me leave some comments | 
| @Au-Miner Thanks for the review. Updated! | 
        
          
                ...ink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
      | } | ||
|  | ||
| ChangelogMode changelogMode = getChangelogMode((StreamPhysicalRel) tableScan); | ||
| if (changelogMode.containsOnly(RowKind.INSERT)) { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember that CDC source was allowed before, so why is it only allowed to use insert here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It also allows cdc source. This logic is:
- if the source is INSERT-ONLY source, then pass and return true
- else the source is CDC source
 2.1 if the filter pushed down into the source is not applied on one set of upsert key, return false
 2.2 else pass and return true
About the logic why the filter must be applied on one set of upsert key when consuming cdc, we have talked before
#27111 (comment), and I also raise a jira for it.
        
          
                .../main/java/org/apache/flink/table/runtime/operators/join/deltajoin/AsyncDeltaJoinRunner.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
      There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing the comment. It LGTM.
What is the purpose of the change
Support filter and project between source and delta join. After this pr, the join with pattern "source -> calc -> join" can be optimized into delta join.
Brief change log
Verifying this change
UT, IT and harness tests are added.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
@Public(Evolving): no- The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
- The S3 file system connector: no
## Documentation
- Does this pull request introduce a new feature? no
- If yes, how is the feature documented?