Skip to content

Commit b2ff63f

Browse files
viiryaalamb
andauthored
Use prep_null_mask_filter to handle nulls in selection mask (#9163)
* Use prep_null_mask_filter to handle nulls in selection mask * Update datafusion/physical-plan/src/joins/sort_merge_join.rs Co-authored-by: Andrew Lamb <[email protected]> * Avoid unwrap --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent a48e271 commit b2ff63f

File tree

1 file changed

+22
-1
lines changed

1 file changed

+22
-1
lines changed

datafusion/physical-plan/src/joins/sort_merge_join.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1209,7 +1209,15 @@ impl SMJStream {
12091209
) {
12101210
// The reverse of the selection mask. For the rows not pass join filter above,
12111211
// we need to join them (left or right) with null rows for outer joins.
1212-
let not_mask = compute::not(mask)?;
1212+
let not_mask = if mask.null_count() > 0 {
1213+
// If the mask contains nulls, we need to use `prep_null_mask_filter` to
1214+
// handle the nulls in the mask as false to produce rows where the mask
1215+
// was null itself.
1216+
compute::not(&compute::prep_null_mask_filter(mask))?
1217+
} else {
1218+
compute::not(mask)?
1219+
};
1220+
12131221
let null_joined_batch =
12141222
compute::filter_record_batch(&output_batch, &not_mask)?;
12151223

@@ -1254,6 +1262,19 @@ impl SMJStream {
12541262

12551263
// For full join, we also need to output the null joined rows from the buffered side
12561264
if matches!(self.join_type, JoinType::Full) {
1265+
// Handle not mask for buffered side further.
1266+
// For buffered side, we want to output the rows that are not null joined with
1267+
// the streamed side. i.e. the rows that are not null in the `buffered_indices`.
1268+
let not_mask = if let Some(nulls) = buffered_indices.nulls() {
1269+
let mask = not_mask.values() & nulls.inner();
1270+
BooleanArray::new(mask, None)
1271+
} else {
1272+
not_mask
1273+
};
1274+
1275+
let null_joined_batch =
1276+
compute::filter_record_batch(&output_batch, &not_mask)?;
1277+
12571278
let mut streamed_columns = self
12581279
.streamed_schema
12591280
.fields()

0 commit comments

Comments
 (0)