-
Notifications
You must be signed in to change notification settings - Fork 1.7k
feat: ClassicJoin
for PWMJ
#17482
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: ClassicJoin
for PWMJ
#17482
Conversation
@2010YOUY01 Would you like to take a look at if this is how you wanted to split up the work? I just wanted to put this out today then i'll clean it up better this week. Only failing one external test currently. |
if join_filter.is_none() && matches!(join_type, JoinType::Inner) { | ||
// cross join if there is no join conditions and no join filter set | ||
Arc::new(CrossJoinExec::new(physical_left, physical_right)) | ||
} else if num_range_filters == 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to refactor this in another pull request, just a refactor but it should be quite simple to do. Just wanted to get this version in first.
statement ok | ||
set datafusion.execution.batch_size = 8192; | ||
|
||
# TODO: partitioned PWMJ execution |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently doesn't allow partitioned execution, this would make reviewing the tests a little messy as many of the partitioned single range queries would switch to PWMJ. Another follow up, will be tracked in #17427
…jonathanc-n/datafusion into classic-join-physical-planner
cc @2010YOUY01 @comphead this pr is now ready! |
This is great! I have some suggestions for the planning part, and I'll review the execution part tomorrow. Refactor the in-equality extracting logicI suggest to move the inequality-extracting logic from The reason is we'd better put similar code into a single place, instead of let it scatter to multiple places. To do this I think we need to extend the logical plan join node with extra ie predicate field (maybe we can define a new struct for IE predicate with
To make it compatible for systems only use the
Perhaps we can open a PR only for this IE predicates extracting task, and during the initial planning we can simply move the IE predicates back to the filter with the above mentioned utility. Make it configurable to turn on/off PWMJI'll try to finish #17467 soon to make it easier, so let's put this on hold for now. |
Thanks @jonathanc-n and @2010YOUY01 #17467 definitely would be nice to have as PWMJ can start as optional experimental join, which would be separately documented, showing benefits and limitations for the end user. Actually the same happened for SMJ being experimental feature for quite some time. Another great point to identify bottlenecks in performance is to absorb some knowledge from #17488 and keep the join more stable. As optional feature it is pretty safe to go, again referring to SMJ there was a separate ticket which post launch checks to make sure it is safe to use like #9846 Let me know your thoughts? |
Yes I think the experimental flag should be added first and we can do the equality extraction logic as a follow up. WDYT @2010YOUY01 Do you think you want to get #17467 before this one? |
Yes, so let's do other work first. If I can't get #17467 done when this PR is ready, let's add |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have gone over the exec.rs
, and will continue with the stream implementation part soon.
ExecutionPlan, PlanProperties, | ||
}; | ||
use crate::{DisplayAs, DisplayFormatType, ExecutionPlanProperties}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is one of the best module comments I have seen.
datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs
Outdated
Show resolved
Hide resolved
datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs
Outdated
Show resolved
Hide resolved
datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs
Outdated
Show resolved
Hide resolved
datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs
Outdated
Show resolved
Hide resolved
datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs
Outdated
Show resolved
Hide resolved
datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs
Outdated
Show resolved
Hide resolved
datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs
Outdated
Show resolved
Hide resolved
@2010YOUY01 I have added the requested changes! Should be good for another go. |
@2010YOUY01 I have made quite a bit changes: New Changes
Current benchmarks use larger table on left side and smaller on right. Swap inputs will be supported in a follow up. BenchmarksBenchmarks look very good. To run it yourself you can replace the queries in These queries use Full JoinsQueries Click to expand
Results Click to expand
Left JoinsQueries Click to expand
Results: Click to expand
Right JoinsQueries Click to expand
Results Click to expand
Inner JoinsQueries Click to expand
Results Click to expand
|
I have generated some queries to benchmark, the result looks amazing 🚀 I think we can iterate from here, I'll review the implementation soon. In-equality join benchmarkTo run it, manually swap the queries in `benchmark/nlj.rs`
Note there are several slower queries, that's because one join side is very small, so the brute-force nested loop join become optimal, I suspect in some cases NLJ can even win Hash Join. When I was trying different queries, I noticed one query with full join case: piecewise merge join is slower than NLJ, but it should get faster, we should take a closer look:
NLJ is around 1.2s on my machine, while PWMJ is around 2.5s. |
How about first splitting the 'in-equality predicate extracting logic' into a different PR? The idea is described in #17482 (comment)
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The implementation looks great. I've left some suggestions to polish the code, I believe they're all optional.
datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs
Outdated
Show resolved
Hide resolved
datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs
Outdated
Show resolved
Hide resolved
datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs
Outdated
Show resolved
Hide resolved
datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs
Outdated
Show resolved
Hide resolved
datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs
Outdated
Show resolved
Hide resolved
datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs
Outdated
Show resolved
Hide resolved
datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs
Outdated
Show resolved
Hide resolved
datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs
Outdated
Show resolved
Hide resolved
datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here are some suggestions for additional test coverage. And I think we should also enable pwmj in the extended sqlite test.
I think this PR is ready to go after passing those tests.
# specific language governing permissions and limitations | ||
# under the License. | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For additional test coverage, we can include the following cases in this test file:
- nulls in the compare key column
- different projections like
select *
- Expressions in the join predicate like
on t1.v1 < (t2.v1+1)
, it should also be able to use pwmj
Currently for it doesnt support swapping inputs. it should be faster when the right side is smaller. |
Co-authored-by: Yongting You <[email protected]>
@2010YOUY01 I have resolved the requested changes, I have also added null handling so that it is handling nulls first, then calculating to the first index to start from based on the number of nulls in the array. Here are some follow up improvements:
|
…jonathanc-n/datafusion into classic-join-physical-planner
I think point 1 is a known bug, such case will be planned to PWMJ and panic. It's okay if it's not planned to PWMJ. |
The sqlite tests are passing
|
I think this PR is almost ready If anyone is interested to review, you can start with the comment in https://github.com/apache/datafusion/pull/17482/files#diff-bc8796293bb6ee5f3f2b43bfb2b43959726f0b458dd3475dbbb4ab6ea0fc5a8c for the high-level idea |
@2010YOUY01 I've added a fix for the bug. I think i will make an attempt to move it to the optimizer in a follow up due to some complications with passing the information to the physical planner. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks again, great job! I plan to wait a couple of days before merging, in case anyone wants to review again.
Sounds great. Let's update the update the epic issue for follow up tasks. I have a question. How do you plan to implement swapping inputs, is there anything beyond putting smaller table on the buffer side? Besides, I suggest to support additional predicate first, so that PWMJ can be applicable to a wider scenario. |
Thanks @jonathanc-n and @2010YOUY01 for a great job, I'm planning to have another look soon |
Swapping table would actually put the smaller table on the stream side. So I would probably do a follow up with that. I agree we should do the additional predicates first as follow up. |
Which issue does this PR close?
PiecewiseMergeJoin
work in Datafusion #17427Rationale for this change
Adds regular joins (left, right, full, inner) for PWMJ as they behave differently in the code path.
What changes are included in this PR?
Adds classic join + physical planner
Are these changes tested?
Yes SLT tests + unit tests
Follow up work to this pull request
next would be to implement the existence joins