Skip to content

Commit 67efa2b

Browse files
committed
use bloom filters to push down hash table lookups in HashJoinExec
1 parent 4c4b24a commit 67efa2b

File tree

5 files changed

+117
-44
lines changed

5 files changed

+117
-44
lines changed

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() {
278278
- SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false], filter=[e@4 IS NULL OR e@4 < bb]
279279
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
280280
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
281-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ d@0 >= aa AND d@0 <= ab ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ]
281+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ d@0 >= aa AND d@0 <= ab AND d@0 IN BLOOM_FILTER ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ]
282282
"
283283
);
284284
}
@@ -1078,7 +1078,7 @@ async fn test_hashjoin_dynamic_filter_pushdown() {
10781078
@r"
10791079
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
10801080
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
1081-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ]
1081+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND a@0 IN BLOOM_FILTER AND b@1 >= ba AND b@1 <= bb AND b@1 IN BLOOM_FILTER ]
10821082
"
10831083
);
10841084
}
@@ -1309,7 +1309,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
13091309
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
13101310
- CoalesceBatchesExec: target_batch_size=8192
13111311
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1312-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb OR a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba ]
1312+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= ab AND a@0 <= ab AND a@0 IN BLOOM_FILTER AND b@1 >= bb AND b@1 <= bb AND b@1 IN BLOOM_FILTER OR a@0 >= aa AND a@0 <= aa AND a@0 IN BLOOM_FILTER AND b@1 >= ba AND b@1 <= ba AND b@1 IN BLOOM_FILTER ]
13131313
"
13141314
);
13151315

@@ -1503,7 +1503,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_collect_left() {
15031503
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
15041504
- CoalesceBatchesExec: target_batch_size=8192
15051505
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1506-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ]
1506+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND a@0 IN BLOOM_FILTER AND b@1 >= ba AND b@1 <= bb AND b@1 IN BLOOM_FILTER ]
15071507
"
15081508
);
15091509

@@ -1671,8 +1671,8 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() {
16711671
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)]
16721672
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true
16731673
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)]
1674-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ b@0 >= aa AND b@0 <= ab ]
1675-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ d@0 >= ca AND d@0 <= cb ]
1674+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ b@0 >= aa AND b@0 <= ab AND b@0 IN BLOOM_FILTER ]
1675+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ d@0 >= ca AND d@0 <= cb AND d@0 IN BLOOM_FILTER ]
16761676
"
16771677
);
16781678
}

datafusion/physical-expr/src/expressions/bloom_filter_expr.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,11 @@ impl BloomFilterExpr {
257257
}
258258
}
259259

260+
/// Get a reference to the underlying bloom filter
261+
pub fn bloom_filter(&self) -> &Sbbf {
262+
&self.bloom_filter
263+
}
264+
260265
/// Check a scalar value against the bloom filter
261266
fn check_scalar(&self, value: &ScalarValue) -> bool {
262267
if value.is_null() {

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ use crate::{
5353
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
5454
PlanProperties, SendableRecordBatchStream, Statistics,
5555
};
56+
use datafusion_physical_expr::bloom_filter::Sbbf;
5657

5758
use arrow::array::{ArrayRef, BooleanBufferBuilder};
5859
use arrow::compute::concat_batches;
@@ -72,7 +73,9 @@ use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumula
7273
use datafusion_physical_expr::equivalence::{
7374
join_equivalence_properties, ProjectionMapping,
7475
};
75-
use datafusion_physical_expr::expressions::{lit, DynamicFilterPhysicalExpr};
76+
use datafusion_physical_expr::expressions::{
77+
lit, BloomFilterBuilder, DynamicFilterPhysicalExpr,
78+
};
7679
use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
7780

7881
use ahash::RandomState;
@@ -104,10 +107,13 @@ pub(super) struct JoinLeftData {
104107
_reservation: MemoryReservation,
105108
/// Bounds computed from the build side for dynamic filter pushdown
106109
pub(super) bounds: Option<Vec<ColumnBounds>>,
110+
/// Bloom filters computed from the build side for dynamic filter pushdown
111+
pub(super) bloom_filters: Option<Vec<Sbbf>>,
107112
}
108113

109114
impl JoinLeftData {
110115
/// Create a new `JoinLeftData` from its parts
116+
#[allow(clippy::too_many_arguments)]
111117
pub(super) fn new(
112118
hash_map: Box<dyn JoinHashMapType>,
113119
batch: RecordBatch,
@@ -116,6 +122,7 @@ impl JoinLeftData {
116122
probe_threads_counter: AtomicUsize,
117123
reservation: MemoryReservation,
118124
bounds: Option<Vec<ColumnBounds>>,
125+
bloom_filters: Option<Vec<Sbbf>>,
119126
) -> Self {
120127
Self {
121128
hash_map,
@@ -125,6 +132,7 @@ impl JoinLeftData {
125132
probe_threads_counter,
126133
_reservation: reservation,
127134
bounds,
135+
bloom_filters,
128136
}
129137
}
130138

@@ -1207,14 +1215,14 @@ impl ExecutionPlan for HashJoinExec {
12071215
}
12081216
}
12091217

1210-
/// Accumulator for collecting min/max bounds from build-side data during hash join.
1218+
/// Accumulator for collecting min/max bounds and bloom filters from build-side data during hash join.
12111219
///
12121220
/// This struct encapsulates the logic for progressively computing column bounds
1213-
/// (minimum and maximum values) for a specific join key expression as batches
1221+
/// (minimum and maximum values) and bloom filters for a specific join key expression as batches
12141222
/// are processed during the build phase of a hash join.
12151223
///
1216-
/// The bounds are used for dynamic filter pushdown optimization, where filters
1217-
/// based on the actual data ranges can be pushed down to the probe side to
1224+
/// The bounds and bloom filters are used for dynamic filter pushdown optimization, where filters
1225+
/// based on the actual data ranges and membership can be pushed down to the probe side to
12181226
/// eliminate unnecessary data early.
12191227
struct CollectLeftAccumulator {
12201228
/// The physical expression to evaluate for each batch
@@ -1223,6 +1231,8 @@ struct CollectLeftAccumulator {
12231231
min: MinAccumulator,
12241232
/// Accumulator for tracking the maximum value across all batches
12251233
max: MaxAccumulator,
1234+
/// Bloom filter builder for membership testing
1235+
bloom_filter: BloomFilterBuilder,
12261236
}
12271237

12281238
impl CollectLeftAccumulator {
@@ -1249,17 +1259,23 @@ impl CollectLeftAccumulator {
12491259
.data_type(schema)
12501260
// Min/Max can operate on dictionary data but expect to be initialized with the underlying value type
12511261
.map(|dt| dictionary_value_type(&dt))?;
1262+
1263+
// Create bloom filter with default parameters
1264+
// NDV (number of distinct values) = 10000, FPP (false positive probability) = 0.01 (1%)
1265+
let bloom_filter = BloomFilterBuilder::new(10000, 0.01)?;
1266+
12521267
Ok(Self {
12531268
expr,
12541269
min: MinAccumulator::try_new(&data_type)?,
12551270
max: MaxAccumulator::try_new(&data_type)?,
1271+
bloom_filter,
12561272
})
12571273
}
12581274

12591275
/// Updates the accumulators with values from a new batch.
12601276
///
1261-
/// Evaluates the expression on the batch and updates both min and max
1262-
/// accumulators with the resulting values.
1277+
/// Evaluates the expression on the batch and updates min, max, and bloom filter
1278+
/// with the resulting values.
12631279
///
12641280
/// # Arguments
12651281
/// * `batch` - The record batch to process
@@ -1270,20 +1286,24 @@ impl CollectLeftAccumulator {
12701286
let array = self.expr.evaluate(batch)?.into_array(batch.num_rows())?;
12711287
self.min.update_batch(std::slice::from_ref(&array))?;
12721288
self.max.update_batch(std::slice::from_ref(&array))?;
1289+
// Insert values into bloom filter
1290+
self.bloom_filter.insert_array(&array)?;
12731291
Ok(())
12741292
}
12751293

1276-
/// Finalizes the accumulation and returns the computed bounds.
1294+
/// Finalizes the accumulation and returns the computed bounds and bloom filter.
12771295
///
1278-
/// Consumes self to extract the final min and max values from the accumulators.
1296+
/// Consumes self to extract the final min and max values from the accumulators
1297+
/// and the built bloom filter.
12791298
///
12801299
/// # Returns
1281-
/// The `ColumnBounds` containing the minimum and maximum values observed
1282-
fn evaluate(mut self) -> Result<ColumnBounds> {
1283-
Ok(ColumnBounds::new(
1284-
self.min.evaluate()?,
1285-
self.max.evaluate()?,
1286-
))
1300+
/// A tuple of (`ColumnBounds`, `Sbbf`) containing the minimum/maximum values and bloom filter
1301+
fn evaluate(mut self) -> Result<(ColumnBounds, Sbbf)> {
1302+
let bounds = ColumnBounds::new(self.min.evaluate()?, self.max.evaluate()?);
1303+
let bloom_filter_expr = self.bloom_filter.finish(Arc::clone(&self.expr));
1304+
// Extract the Sbbf from the BloomFilterExpr
1305+
let bloom_filter = bloom_filter_expr.bloom_filter().clone();
1306+
Ok((bounds, bloom_filter))
12871307
}
12881308
}
12891309

@@ -1475,16 +1495,18 @@ async fn collect_left_input(
14751495
})
14761496
.collect::<Result<Vec<_>>>()?;
14771497

1478-
// Compute bounds for dynamic filter if enabled
1479-
let bounds = match bounds_accumulators {
1498+
// Compute bounds and bloom filters for dynamic filter if enabled
1499+
let (bounds, bloom_filters) = match bounds_accumulators {
14801500
Some(accumulators) if num_rows > 0 => {
1481-
let bounds = accumulators
1501+
let results: Vec<_> = accumulators
14821502
.into_iter()
14831503
.map(CollectLeftAccumulator::evaluate)
14841504
.collect::<Result<Vec<_>>>()?;
1485-
Some(bounds)
1505+
// Separate bounds and bloom filters
1506+
let (bounds, bloom_filters): (Vec<_>, Vec<_>) = results.into_iter().unzip();
1507+
(Some(bounds), Some(bloom_filters))
14861508
}
1487-
_ => None,
1509+
_ => (None, None),
14881510
};
14891511

14901512
let data = JoinLeftData::new(
@@ -1495,6 +1517,7 @@ async fn collect_left_input(
14951517
AtomicUsize::new(probe_threads_count),
14961518
reservation,
14971519
bounds,
1520+
bloom_filters,
14981521
);
14991522

15001523
Ok(data)

datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs

Lines changed: 57 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ use crate::ExecutionPlanProperties;
2727

2828
use datafusion_common::{Result, ScalarValue};
2929
use datafusion_expr::Operator;
30-
use datafusion_physical_expr::expressions::{lit, BinaryExpr, DynamicFilterPhysicalExpr};
30+
use datafusion_physical_expr::bloom_filter::Sbbf;
31+
use datafusion_physical_expr::expressions::{
32+
lit, BinaryExpr, BloomFilterExpr, DynamicFilterPhysicalExpr,
33+
};
3134
use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
3235

3336
use itertools::Itertools;
@@ -51,21 +54,29 @@ impl ColumnBounds {
5154
}
5255

5356
/// Represents the bounds for all join key columns from a single partition.
54-
/// This contains the min/max values computed from one partition's build-side data.
57+
/// This contains the min/max values and bloom filters computed from one partition's build-side data.
5558
#[derive(Debug, Clone)]
5659
pub(crate) struct PartitionBounds {
5760
/// Partition identifier for debugging and determinism (not strictly necessary)
5861
partition: usize,
5962
/// Min/max bounds for each join key column in this partition.
6063
/// Index corresponds to the join key expression index.
6164
column_bounds: Vec<ColumnBounds>,
65+
/// Bloom filters for each join key column in this partition.
66+
/// Index corresponds to the join key expression index.
67+
bloom_filters: Vec<Sbbf>,
6268
}
6369

6470
impl PartitionBounds {
65-
pub(crate) fn new(partition: usize, column_bounds: Vec<ColumnBounds>) -> Self {
71+
pub(crate) fn new(
72+
partition: usize,
73+
column_bounds: Vec<ColumnBounds>,
74+
bloom_filters: Vec<Sbbf>,
75+
) -> Self {
6676
Self {
6777
partition,
6878
column_bounds,
79+
bloom_filters,
6980
}
7081
}
7182

@@ -76,6 +87,10 @@ impl PartitionBounds {
7687
pub(crate) fn get_column_bounds(&self, index: usize) -> Option<&ColumnBounds> {
7788
self.column_bounds.get(index)
7889
}
90+
91+
pub(crate) fn get_bloom_filter(&self, index: usize) -> Option<&Sbbf> {
92+
self.bloom_filters.get(index)
93+
}
7994
}
8095

8196
/// Coordinates dynamic filter bounds collection across multiple partitions
@@ -175,15 +190,15 @@ impl SharedBoundsAccumulator {
175190
}
176191
}
177192

178-
/// Create a filter expression from individual partition bounds using OR logic.
193+
/// Create a filter expression from individual partition bounds and bloom filters using OR logic.
179194
///
180-
/// This creates a filter where each partition's bounds form a conjunction (AND)
181-
/// of column range predicates, and all partitions are combined with OR.
195+
/// This creates a filter where each partition's bounds and bloom filters form a conjunction (AND)
196+
/// of column range predicates and bloom filter checks, and all partitions are combined with OR.
182197
///
183198
/// For example, with 2 partitions and 2 columns:
184-
/// ((col0 >= p0_min0 AND col0 <= p0_max0 AND col1 >= p0_min1 AND col1 <= p0_max1)
199+
/// ((col0 >= p0_min0 AND col0 <= p0_max0 AND col0 IN BLOOM_FILTER_0 AND col1 >= p0_min1 AND col1 <= p0_max1 AND col1 IN BLOOM_FILTER_1)
185200
/// OR
186-
/// (col0 >= p1_min0 AND col0 <= p1_max0 AND col1 >= p1_min1 AND col1 <= p1_max1))
201+
/// (col0 >= p1_min0 AND col0 <= p1_max0 AND col0 IN BLOOM_FILTER_0 AND col1 >= p1_min1 AND col1 <= p1_max1 AND col1 IN BLOOM_FILTER_1))
187202
pub(crate) fn create_filter_from_partition_bounds(
188203
&self,
189204
bounds: &[PartitionBounds],
@@ -196,7 +211,7 @@ impl SharedBoundsAccumulator {
196211
let mut partition_predicates = Vec::with_capacity(bounds.len());
197212

198213
for partition_bounds in bounds.iter().sorted_by_key(|b| b.partition) {
199-
// Create range predicates for each join key in this partition
214+
// Create range predicates and bloom filter checks for each join key in this partition
200215
let mut column_predicates = Vec::with_capacity(partition_bounds.len());
201216

202217
for (col_idx, right_expr) in self.on_right.iter().enumerate() {
@@ -215,7 +230,28 @@ impl SharedBoundsAccumulator {
215230
let range_expr =
216231
Arc::new(BinaryExpr::new(min_expr, Operator::And, max_expr))
217232
as Arc<dyn PhysicalExpr>;
218-
column_predicates.push(range_expr);
233+
234+
// Create bloom filter check: col IN BLOOM_FILTER
235+
if let Some(bloom_filter) = partition_bounds.get_bloom_filter(col_idx)
236+
{
237+
let bloom_expr = Arc::new(BloomFilterExpr::new(
238+
Arc::clone(right_expr),
239+
bloom_filter.clone(),
240+
))
241+
as Arc<dyn PhysicalExpr>;
242+
243+
// Combine range and bloom filter: (range_expr AND bloom_expr)
244+
let combined_expr = Arc::new(BinaryExpr::new(
245+
range_expr,
246+
Operator::And,
247+
bloom_expr,
248+
))
249+
as Arc<dyn PhysicalExpr>;
250+
column_predicates.push(combined_expr);
251+
} else {
252+
// If no bloom filter, just use range expression
253+
column_predicates.push(range_expr);
254+
}
219255
}
220256
}
221257

@@ -247,8 +283,8 @@ impl SharedBoundsAccumulator {
247283
/// Report bounds from a completed partition and update dynamic filter if all partitions are done
248284
///
249285
/// This method coordinates the dynamic filter updates across all partitions. It stores the
250-
/// bounds from the current partition, increments the completion counter, and when all
251-
/// partitions have reported, creates an OR'd filter from individual partition bounds.
286+
/// bounds and bloom filters from the current partition, increments the completion counter, and when all
287+
/// partitions have reported, creates an OR'd filter from individual partition bounds and bloom filters.
252288
///
253289
/// This method is async and uses a [`tokio::sync::Barrier`] to wait for all partitions
254290
/// to report their bounds. Once that occurs, the method will resolve for all callers and the
@@ -264,16 +300,18 @@ impl SharedBoundsAccumulator {
264300
/// # Arguments
265301
/// * `left_side_partition_id` - The identifier for the **left-side** partition reporting its bounds
266302
/// * `partition_bounds` - The bounds computed by this partition (if any)
303+
/// * `bloom_filters` - The bloom filters computed by this partition (if any)
267304
///
268305
/// # Returns
269306
/// * `Result<()>` - Ok if successful, Err if filter update failed
270307
pub(crate) async fn report_partition_bounds(
271308
&self,
272309
left_side_partition_id: usize,
273310
partition_bounds: Option<Vec<ColumnBounds>>,
311+
bloom_filters: Option<Vec<Sbbf>>,
274312
) -> Result<()> {
275-
// Store bounds in the accumulator - this runs once per partition
276-
if let Some(bounds) = partition_bounds {
313+
// Store bounds and bloom filters in the accumulator - this runs once per partition
314+
if let (Some(bounds), Some(filters)) = (partition_bounds, bloom_filters) {
277315
let mut guard = self.inner.lock();
278316

279317
let should_push = if let Some(last_bound) = guard.bounds.last() {
@@ -286,9 +324,11 @@ impl SharedBoundsAccumulator {
286324
};
287325

288326
if should_push {
289-
guard
290-
.bounds
291-
.push(PartitionBounds::new(left_side_partition_id, bounds));
327+
guard.bounds.push(PartitionBounds::new(
328+
left_side_partition_id,
329+
bounds,
330+
filters,
331+
));
292332
}
293333
}
294334

0 commit comments

Comments
 (0)