Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 17 additions & 46 deletions native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,66 +131,37 @@ impl<const L_OUTER: bool, const R_OUTER: bool> Joiner for FullJoiner<L_OUTER, R_
cur_forward!(cur2);

// iterate both stream, find smaller one, use it for probing
let mut has_multi_equal = false;
let mut l_equal = true;
let mut r_equal = true;
while l_equal && r_equal {

while l_equal {
l_equal = !cur1.finished() && cur1.cur_key() == cur1.key(l_key_idx);
if l_equal {
l_equal = !cur1.finished() && cur1.cur_key() == cur1.key(l_key_idx);
if l_equal {
has_multi_equal = true;
equal_lindices.push(cur1.cur_idx());
cur_forward!(cur1);
}
equal_lindices.push(cur1.cur_idx());
cur_forward!(cur1);
}
}

while r_equal {
r_equal = !cur2.finished() && cur2.cur_key() == cur2.key(r_key_idx);
if r_equal {
r_equal = !cur2.finished() && cur2.cur_key() == cur2.key(r_key_idx);
if r_equal {
has_multi_equal = true;
equal_rindices.push(cur2.cur_idx());
cur_forward!(cur2);
}
equal_rindices.push(cur2.cur_idx());
cur_forward!(cur2);
}
}

// fast path for one-to-one join
if !has_multi_equal {
if equal_lindices.len() <= 1 && equal_rindices.len() <= 1 {
self.lindices.push(l_key_idx);
self.rindices.push(r_key_idx);
continue;
}

for (&lidx, &ridx) in equal_lindices.iter().cartesian_product(&equal_rindices) {
self.lindices.push(lidx);
self.rindices.push(ridx);
}

if r_equal {
// stream right side
while !cur2.finished() && cur2.cur_key() == cur1.key(l_key_idx) {
for &lidx in &equal_lindices {
self.lindices.push(lidx);
self.rindices.push(cur2.cur_idx());
}
cur_forward!(cur2);
if self.should_flush() || cur2.num_buffered_batches() > 1 {
self.as_mut().flush(cur1, cur2).await?;
cur2.clean_out_dated_batches();
}
}
}

if l_equal {
// stream left side
while !cur1.finished() && cur1.cur_key() == cur2.key(r_key_idx) {
for &ridx in &equal_rindices {
self.lindices.push(cur1.cur_idx());
self.rindices.push(ridx);
}
cur_forward!(cur1);
if self.should_flush() || cur1.num_buffered_batches() > 1 {
for &lidx in &equal_lindices {
Copy link
Contributor

@xumingming xumingming Dec 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:

// Choose a better name.
fn has_enough_room(&self, new_size: usize) -> bool {
  self.lindices.len() + new_size < self.join_params.batch_size
}

let new_size = equal_lindices.len() * equal_rindices.len()
if (self.has_enough_room(self, new_size)) {
     // old cartesian_product way
} else {
    // do more aggressive flush
}

And don't forget to clear the outdated batches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added this logic and unit test. But we should not clear the outdated batches here.

for &ridx in &equal_rindices {
self.lindices.push(lidx);
self.rindices.push(ridx);
if self.should_flush() {
self.as_mut().flush(cur1, cur2).await?;
Copy link

Copilot AI Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flush condition is incomplete and missing cleanup calls that are critical for memory management during joins with many duplicate keys:

  1. Missing check for cur1.num_buffered_batches() > 1 || cur2.num_buffered_batches() > 1 (present at lines 99-102)
  2. Missing calls to clean_out_dated_batches() after flushing (used consistently elsewhere at lines 104-105, 174-175, 186, 196)

Without these, memory will accumulate unbounded during cartesian product generation when there are many duplicate keys, defeating the purpose of this PR.

Should be:

if self.should_flush() || cur1.num_buffered_batches() > 1 || cur2.num_buffered_batches() > 1 {
    self.as_mut().flush(cur1, cur2).await?;
    cur1.clean_out_dated_batches();
    cur2.clean_out_dated_batches();
}
Suggested change
if self.should_flush() {
self.as_mut().flush(cur1, cur2).await?;
if self.should_flush()
|| cur1.num_buffered_batches() > 1
|| cur2.num_buffered_batches() > 1
{
self.as_mut().flush(cur1, cur2).await?;
cur1.clean_out_dated_batches();
cur2.clean_out_dated_batches();

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not call cur1 .clean_out_dated_batches(), because doing so will break the index correlation between cur1 and equal_lindices.

cur1.clean_out_dated_batches();
}
}
}
Expand Down
Loading