diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index b91c1732260c..800a0735ac3a 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -278,7 +278,7 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() { - SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false], filter=[e@4 IS NULL OR e@4 < bb] - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ d@0 >= aa AND d@0 <= ab ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN d@0 >= aa AND d@0 <= ab AND lookup_p0 ELSE false END ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ] " ); } @@ -1078,7 +1078,7 @@ async fn test_hashjoin_dynamic_filter_pushdown() { @r" - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND hash_lookup ] " ); } @@ -1309,7 +1309,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - CoalesceBatchesExec: target_batch_size=8192 - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb OR a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 12 WHEN 0 THEN lookup_p0 WHEN 1 THEN lookup_p1 WHEN 2 THEN a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb AND lookup_p2 WHEN 3 THEN lookup_p3 WHEN 4 THEN a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba AND lookup_p4 WHEN 5 THEN lookup_p5 WHEN 6 THEN lookup_p6 WHEN 7 THEN lookup_p7 WHEN 8 THEN lookup_p8 WHEN 9 THEN lookup_p9 WHEN 10 THEN lookup_p10 WHEN 11 THEN lookup_p11 ELSE false END ] " ); @@ -1326,7 +1326,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - CoalesceBatchesExec: target_batch_size=8192 - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 12 WHEN 0 THEN a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND lookup_p0 WHEN 1 THEN lookup_p1 WHEN 2 THEN lookup_p2 WHEN 3 THEN lookup_p3 WHEN 4 THEN lookup_p4 WHEN 5 THEN lookup_p5 WHEN 6 THEN lookup_p6 WHEN 7 THEN lookup_p7 WHEN 8 THEN lookup_p8 WHEN 9 THEN lookup_p9 WHEN 10 THEN lookup_p10 WHEN 11 THEN lookup_p11 ELSE false END ] " ); @@ -1503,7 +1503,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_collect_left() { - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - CoalesceBatchesExec: target_batch_size=8192 - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND hash_lookup ] " ); @@ -1671,8 +1671,8 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() { - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)] - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ b@0 >= aa AND b@0 <= ab ] - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ d@0 >= ca AND d@0 <= cb ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN b@0 >= aa AND b@0 <= ab AND lookup_p0 ELSE false END ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN d@0 >= ca AND d@0 <= cb AND lookup_p0 ELSE false END ] " ); } diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index b5fe5ee5cda1..701a37599d7a 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -26,7 +26,7 @@ use crate::filter_pushdown::{ ChildPushdownResult, FilterDescription, FilterPushdownPhase, FilterPushdownPropagation, }; -use crate::joins::hash_join::shared_bounds::{ColumnBounds, SharedBoundsAccumulator}; +use crate::joins::hash_join::shared_bounds::{ColumnBounds, SharedBuildAccumulator}; use crate::joins::hash_join::stream::{ BuildSide, BuildSideInitialState, HashJoinStream, HashJoinStreamState, }; @@ -87,7 +87,8 @@ const HASH_JOIN_SEED: RandomState = /// HashTable and input data for the left (build side) of a join pub(super) struct JoinLeftData { /// The hash table with indices into `batch` - pub(super) hash_map: Box, + /// Arc is used to allow sharing with SharedBuildAccumulator for hash map pushdown + pub(super) hash_map: Arc, /// The input rows for the build side batch: RecordBatch, /// The build side on expressions values @@ -109,7 +110,7 @@ pub(super) struct JoinLeftData { impl JoinLeftData { /// Create a new `JoinLeftData` from its parts pub(super) fn new( - hash_map: Box, + hash_map: Arc, batch: RecordBatch, values: Vec, visited_indices_bitmap: SharedBitmapBuilder, @@ -133,6 +134,11 @@ impl JoinLeftData { &*self.hash_map } + /// return an Arc clone of the hash map for sharing + pub(super) fn hash_map_arc(&self) -> Arc { + Arc::clone(&self.hash_map) + } + /// returns a reference to the build side batch pub(super) fn batch(&self) -> &RecordBatch { &self.batch @@ -364,9 +370,9 @@ pub struct HashJoinExec { struct HashJoinExecDynamicFilter { /// Dynamic filter that we'll update with the results of the build side once that is done. filter: Arc, - /// Bounds accumulator to keep track of the min/max bounds on the join keys for each partition. + /// Build accumulator to collect build-side information (hash maps and/or bounds) from each partition. /// It is lazily initialized during execution to make sure we use the actual execution time partition counts. - bounds_accumulator: OnceLock>, + build_accumulator: OnceLock>, } impl fmt::Debug for HashJoinExec { @@ -977,8 +983,10 @@ impl ExecutionPlan for HashJoinExec { let batch_size = context.session_config().batch_size(); - // Initialize bounds_accumulator lazily with runtime partition counts (only if enabled) - let bounds_accumulator = enable_dynamic_filter_pushdown + // Initialize build_accumulator lazily with runtime partition counts (only if enabled) + // Use RepartitionExec's random state (seeds: 0,0,0,0) for partition routing + let repartition_random_state = RandomState::with_seeds(0, 0, 0, 0); + let build_accumulator = enable_dynamic_filter_pushdown .then(|| { self.dynamic_filter.as_ref().map(|df| { let filter = Arc::clone(&df.filter); @@ -987,13 +995,14 @@ impl ExecutionPlan for HashJoinExec { .iter() .map(|(_, right_expr)| Arc::clone(right_expr)) .collect::>(); - Some(Arc::clone(df.bounds_accumulator.get_or_init(|| { - Arc::new(SharedBoundsAccumulator::new_from_partition_mode( + Some(Arc::clone(df.build_accumulator.get_or_init(|| { + Arc::new(SharedBuildAccumulator::new_from_partition_mode( self.mode, self.left.as_ref(), self.right.as_ref(), filter, on_right, + repartition_random_state, )) }))) }) @@ -1036,7 +1045,7 @@ impl ExecutionPlan for HashJoinExec { batch_size, vec![], self.right.output_ordering().is_some(), - bounds_accumulator, + build_accumulator, self.mode, ))) } @@ -1197,7 +1206,7 @@ impl ExecutionPlan for HashJoinExec { cache: self.cache.clone(), dynamic_filter: Some(HashJoinExecDynamicFilter { filter: dynamic_filter, - bounds_accumulator: OnceLock::new(), + build_accumulator: OnceLock::new(), }), }); result = result.with_updated_node(new_node as Arc); @@ -1346,7 +1355,7 @@ impl BuildSideState { /// When `should_compute_bounds` is true, this function computes the min/max bounds /// for each join key column but does NOT update the dynamic filter. Instead, the /// bounds are stored in the returned `JoinLeftData` and later coordinated by -/// `SharedBoundsAccumulator` to ensure all partitions contribute their bounds +/// `SharedBuildAccumulator` to ensure all partitions contribute their bounds /// before updating the filter exactly once. /// /// # Returns @@ -1417,6 +1426,7 @@ async fn collect_left_input( // Use `u32` indices for the JoinHashMap when num_rows ≤ u32::MAX, otherwise use the // `u64` indice variant + // Arc is used instead of Box to allow sharing with SharedBuildAccumulator for hash map pushdown let mut hashmap: Box = if num_rows > u32::MAX as usize { let estimated_hashtable_size = estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?; @@ -1487,8 +1497,11 @@ async fn collect_left_input( _ => None, }; + // Convert Box to Arc for sharing with SharedBuildAccumulator + let hashmap_arc: Arc = hashmap.into(); + let data = JoinLeftData::new( - hashmap, + hashmap_arc, single_batch, left_values.clone(), Mutex::new(visited_indices_bitmap), diff --git a/datafusion/physical-plan/src/joins/hash_join/mod.rs b/datafusion/physical-plan/src/joins/hash_join/mod.rs index 7f1e5cae13a3..6c073e7a9cff 100644 --- a/datafusion/physical-plan/src/joins/hash_join/mod.rs +++ b/datafusion/physical-plan/src/joins/hash_join/mod.rs @@ -20,5 +20,6 @@ pub use exec::HashJoinExec; mod exec; +mod partitioned_hash_eval; mod shared_bounds; mod stream; diff --git a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs new file mode 100644 index 000000000000..1e92111ea9d6 --- /dev/null +++ b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs @@ -0,0 +1,292 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Hash computation and hash table lookup expressions for dynamic filtering + +use std::{any::Any, fmt::Display, hash::Hash, sync::Arc}; + +use ahash::RandomState; +use arrow::{ + array::{BooleanArray, UInt64Array}, + buffer::MutableBuffer, + datatypes::{DataType, Schema}, + util::bit_util, +}; +use datafusion_common::{internal_datafusion_err, internal_err, Result}; +use datafusion_expr::ColumnarValue; +use datafusion_physical_expr_common::physical_expr::{ + DynHash, PhysicalExpr, PhysicalExprRef, +}; + +use crate::{hash_utils::create_hashes, joins::utils::JoinHashMapType}; + +/// Physical expression that computes hash values for a set of columns +/// +/// This expression computes the hash of join key columns using a specific RandomState. +/// It returns a UInt64Array containing the hash values. +/// +/// This is used for: +/// - Computing routing hashes (with RepartitionExec's 0,0,0,0 seeds) +/// - Computing lookup hashes (with HashJoin's 'J','O','I','N' seeds) +pub struct HashExpr { + /// Columns to hash + on_columns: Vec, + /// Random state for hashing + random_state: RandomState, + /// Description for display + description: String, +} + +impl HashExpr { + /// Create a new HashExpr + /// + /// # Arguments + /// * `on_columns` - Columns to hash + /// * `random_state` - RandomState for hashing + /// * `description` - Description for debugging (e.g., "hash_repartition", "hash_join") + pub fn new( + on_columns: Vec, + random_state: RandomState, + description: String, + ) -> Self { + Self { + on_columns, + random_state, + description, + } + } +} + +impl std::fmt::Debug for HashExpr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let cols = self + .on_columns + .iter() + .map(|e| e.to_string()) + .collect::>() + .join(", "); + write!(f, "{}({})", self.description, cols) + } +} + +impl Hash for HashExpr { + fn hash(&self, state: &mut H) { + self.on_columns.dyn_hash(state); + self.description.hash(state); + } +} + +impl PartialEq for HashExpr { + fn eq(&self, other: &Self) -> bool { + self.on_columns == other.on_columns && self.description == other.description + } +} + +impl Eq for HashExpr {} + +impl Display for HashExpr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.description) + } +} + +impl PhysicalExpr for HashExpr { + fn as_any(&self) -> &dyn Any { + self + } + + fn children(&self) -> Vec<&Arc> { + self.on_columns.iter().collect() + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + Ok(Arc::new(HashExpr::new( + children, + self.random_state.clone(), + self.description.clone(), + ))) + } + + fn data_type(&self, _input_schema: &Schema) -> Result { + Ok(DataType::UInt64) + } + + fn nullable(&self, _input_schema: &Schema) -> Result { + Ok(false) + } + + fn evaluate( + &self, + batch: &arrow::record_batch::RecordBatch, + ) -> Result { + let num_rows = batch.num_rows(); + + // Evaluate columns + let keys_values = self + .on_columns + .iter() + .map(|c| c.evaluate(batch)?.into_array(num_rows)) + .collect::>>()?; + + // Compute hashes + let mut hashes_buffer = vec![0; num_rows]; + create_hashes(&keys_values, &self.random_state, &mut hashes_buffer)?; + + Ok(ColumnarValue::Array(Arc::new(UInt64Array::from( + hashes_buffer, + )))) + } + + fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.description) + } +} + +/// Physical expression that checks if hash values exist in a hash table +/// +/// Takes a UInt64Array of hash values and checks membership in a hash table. +/// Returns a BooleanArray indicating which hashes exist. +pub struct HashTableLookupExpr { + /// Expression that computes hash values (should be a HashExpr) + hash_expr: PhysicalExprRef, + /// Hash table to check against + hash_map: Arc, + /// Description for display + description: String, +} + +impl HashTableLookupExpr { + /// Create a new HashTableLookupExpr + /// + /// # Arguments + /// * `hash_expr` - Expression that computes hash values + /// * `hash_map` - Hash table to check membership + /// * `description` - Description for debugging + pub fn new( + hash_expr: PhysicalExprRef, + hash_map: Arc, + description: String, + ) -> Self { + Self { + hash_expr, + hash_map, + description, + } + } +} + +impl std::fmt::Debug for HashTableLookupExpr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}({:?})", self.description, self.hash_expr) + } +} + +impl Hash for HashTableLookupExpr { + fn hash(&self, state: &mut H) { + self.hash_expr.dyn_hash(state); + self.description.hash(state); + } +} + +impl PartialEq for HashTableLookupExpr { + fn eq(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.hash_expr, &other.hash_expr) + && self.description == other.description + } +} + +impl Eq for HashTableLookupExpr {} + +impl Display for HashTableLookupExpr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.description) + } +} + +impl PhysicalExpr for HashTableLookupExpr { + fn as_any(&self) -> &dyn Any { + self + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.hash_expr] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + if children.len() != 1 { + return internal_err!( + "HashTableLookupExpr expects exactly 1 child, got {}", + children.len() + ); + } + Ok(Arc::new(HashTableLookupExpr::new( + Arc::clone(&children[0]), + Arc::clone(&self.hash_map), + self.description.clone(), + ))) + } + + fn data_type(&self, _input_schema: &Schema) -> Result { + Ok(DataType::Boolean) + } + + fn nullable(&self, _input_schema: &Schema) -> Result { + Ok(false) + } + + fn evaluate( + &self, + batch: &arrow::record_batch::RecordBatch, + ) -> Result { + let num_rows = batch.num_rows(); + + // Evaluate hash expression to get hash values + let hash_array = self.hash_expr.evaluate(batch)?.into_array(num_rows)?; + let hash_array = hash_array.as_any().downcast_ref::().ok_or( + internal_datafusion_err!( + "HashTableLookupExpr expects UInt64Array from hash expression" + ), + )?; + + // Check each hash against the hash table + let mut buf = MutableBuffer::from_len_zeroed(bit_util::ceil(num_rows, 8)); + for (idx, hash_value) in hash_array.values().iter().enumerate() { + // Use get_matched_indices to check - if it returns any indices, the hash exists + let (matched_indices, _) = self + .hash_map + .get_matched_indices(Box::new(std::iter::once((idx, hash_value))), None); + + if !matched_indices.is_empty() { + bit_util::set_bit(buf.as_slice_mut(), idx); + } + } + + Ok(ColumnarValue::Array(Arc::new( + BooleanArray::new_from_packed(buf, 0, num_rows), + ))) + } + + fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.description) + } +} diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index 25f7a0de31ac..38671431669a 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -15,25 +15,33 @@ // specific language governing permissions and limitations // under the License. -//! Utilities for shared bounds. Used in dynamic filter pushdown in Hash Joins. +//! Utilities for shared build-side information. Used in dynamic filter pushdown in Hash Joins. // TODO: include the link to the Dynamic Filter blog post. use std::fmt; use std::sync::Arc; +use crate::joins::hash_join::partitioned_hash_eval::{HashExpr, HashTableLookupExpr}; +use crate::joins::utils::JoinHashMapType; use crate::joins::PartitionMode; use crate::ExecutionPlan; use crate::ExecutionPlanProperties; +use ahash::RandomState; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::Operator; -use datafusion_physical_expr::expressions::{lit, BinaryExpr, DynamicFilterPhysicalExpr}; +use datafusion_physical_expr::expressions::{ + lit, BinaryExpr, CaseExpr, DynamicFilterPhysicalExpr, +}; use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef}; -use itertools::Itertools; use parking_lot::Mutex; use tokio::sync::Barrier; +/// Hash join seed - must match the one used when building hash tables +const HASH_JOIN_SEED: RandomState = + RandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64); + /// Represents the minimum and maximum values for a specific column. /// Used in dynamic filter pushdown to establish value boundaries. #[derive(Debug, Clone, PartialEq)] @@ -69,27 +77,30 @@ impl PartitionBounds { } } - pub(crate) fn len(&self) -> usize { - self.column_bounds.len() - } - pub(crate) fn get_column_bounds(&self, index: usize) -> Option<&ColumnBounds> { self.column_bounds.get(index) } } -/// Coordinates dynamic filter bounds collection across multiple partitions +/// Coordinates build-side information collection across multiple partitions /// -/// This structure ensures that dynamic filters are built with complete information from all -/// relevant partitions before being applied to probe-side scans. Incomplete filters would +/// This structure collects information from the build side (hash tables and/or bounds) and +/// ensures that dynamic filters are built with complete information from all relevant +/// partitions before being applied to probe-side scans. Incomplete filters would /// incorrectly eliminate valid join results. /// /// ## Synchronization Strategy /// -/// 1. Each partition computes bounds from its build-side data -/// 2. Bounds are stored in the shared vector -/// 3. A barrier tracks how many partitions have reported their bounds -/// 4. When the last partition reports, bounds are merged and the filter is updated exactly once +/// 1. Each partition computes information from its build-side data (hash maps and/or bounds) +/// 2. Information is stored in the shared state +/// 3. A barrier tracks how many partitions have reported +/// 4. When the last partition reports, information is merged and the filter is updated exactly once +/// +/// ## Hash Map vs Bounds +/// +/// - **Hash Maps (Partitioned mode)**: Collects Arc references to hash tables from each partition. +/// Creates a `PartitionedHashLookupPhysicalExpr` that routes rows to the correct partition's hash table. +/// - **Bounds (CollectLeft mode)**: Collects min/max bounds and creates range predicates. /// /// ## Partition Counting /// @@ -101,25 +112,33 @@ impl PartitionBounds { /// /// All fields use a single mutex to ensure correct coordination between concurrent /// partition executions. -pub(crate) struct SharedBoundsAccumulator { +pub(crate) struct SharedBuildAccumulator { /// Shared state protected by a single mutex to avoid ordering concerns - inner: Mutex, + inner: Mutex, barrier: Barrier, /// Dynamic filter for pushdown to probe side dynamic_filter: Arc, - /// Right side join expressions needed for creating filter bounds + /// Right side join expressions needed for creating filter expressions on_right: Vec, + /// Random state for partitioning (RepartitionExec's hash function with 0,0,0,0 seeds) + /// Used for PartitionedHashLookupPhysicalExpr + repartition_random_state: RandomState, } -/// State protected by SharedBoundsAccumulator's mutex -struct SharedBoundsState { - /// Bounds from completed partitions. +/// State protected by SharedBuildAccumulator's mutex +struct SharedBuildState { + /// Bounds from completed partitions (used in CollectLeft mode) /// Each element represents the column bounds computed by one partition. bounds: Vec, + /// Hash maps from completed partitions (used in Partitioned mode) + /// Index corresponds to partition number + hash_maps: Vec>>, + /// Single hash map for CollectLeft mode (shared across all partitions) + single_hash_map: Option>, } -impl SharedBoundsAccumulator { - /// Creates a new SharedBoundsAccumulator configured for the given partition mode +impl SharedBuildAccumulator { + /// Creates a new SharedBuildAccumulator configured for the given partition mode /// /// This method calculates how many times `collect_build_side` will be called based on the /// partition mode's execution pattern. This count is critical for determining when we have @@ -137,12 +156,12 @@ impl SharedBoundsAccumulator { /// `collect_build_side` once. Expected calls = number of build partitions. /// /// - **Auto**: Placeholder mode resolved during optimization. Uses 1 as safe default since - /// the actual mode will be determined and a new bounds_accumulator created before execution. + /// the actual mode will be determined and a new accumulator created before execution. /// /// ## Why This Matters /// /// We cannot build a partial filter from some partitions - it would incorrectly eliminate - /// valid join results. We must wait until we have complete bounds information from ALL + /// valid join results. We must wait until we have complete information from ALL /// relevant partitions before updating the dynamic filter. pub(crate) fn new_from_partition_mode( partition_mode: PartitionMode, @@ -150,6 +169,7 @@ impl SharedBoundsAccumulator { right_child: &dyn ExecutionPlan, dynamic_filter: Arc, on_right: Vec, + repartition_random_state: RandomState, ) -> Self { // Troubleshooting: If partition counts are incorrect, verify this logic matches // the actual execution pattern in collect_build_side() @@ -165,139 +185,313 @@ impl SharedBoundsAccumulator { // Default value, will be resolved during optimization (does not exist once `execute()` is called; will be replaced by one of the other two) PartitionMode::Auto => unreachable!("PartitionMode::Auto should not be present at execution time. This is a bug in DataFusion, please report it!"), }; + + let num_partitions = match partition_mode { + PartitionMode::Partitioned => { + left_child.output_partitioning().partition_count() + } + _ => 0, // Not used for CollectLeft + }; + Self { - inner: Mutex::new(SharedBoundsState { + inner: Mutex::new(SharedBuildState { bounds: Vec::with_capacity(expected_calls), + hash_maps: vec![None; num_partitions], + single_hash_map: None, }), barrier: Barrier::new(expected_calls), dynamic_filter, on_right, + repartition_random_state, } } - /// Create a filter expression from individual partition bounds using OR logic. + /// Report hash map and bounds from CollectLeft mode (single hash table shared by all partitions) + /// + /// This method is used for `PartitionMode::CollectLeft` to collect the single shared hash map + /// and bounds. When all partitions have reported (waited at barrier), it creates a simple filter + /// expression that combines min/max range checks with hash table lookups. + /// + /// Unlike Partitioned mode, this creates a simpler filter without CASE expression or partition routing: + /// `(col >= min AND col <= max AND ...) AND hash_lookup(hash_table, hash_join(join_keys))` /// - /// This creates a filter where each partition's bounds form a conjunction (AND) - /// of column range predicates, and all partitions are combined with OR. + /// # Arguments + /// * `hash_map` - Arc reference to the single shared hash table + /// * `partition_bounds` - Min/max bounds for the build side /// - /// For example, with 2 partitions and 2 columns: - /// ((col0 >= p0_min0 AND col0 <= p0_max0 AND col1 >= p0_min1 AND col1 <= p0_max1) - /// OR - /// (col0 >= p1_min0 AND col0 <= p1_max0 AND col1 >= p1_min1 AND col1 <= p1_max1)) - pub(crate) fn create_filter_from_partition_bounds( + /// # Returns + /// * `Result<()>` - Ok if successful, Err if filter update failed + pub(crate) async fn report_single_hash_map_and_bounds( &self, - bounds: &[PartitionBounds], - ) -> Result> { - if bounds.is_empty() { - return Ok(lit(true)); - } + hash_map: Arc, + partition_bounds: Option>, + ) -> Result<()> { + // Store hash map and bounds in the accumulator + { + let mut guard = self.inner.lock(); - // Create a predicate for each partition - let mut partition_predicates = Vec::with_capacity(bounds.len()); - - for partition_bounds in bounds.iter().sorted_by_key(|b| b.partition) { - // Create range predicates for each join key in this partition - let mut column_predicates = Vec::with_capacity(partition_bounds.len()); - - for (col_idx, right_expr) in self.on_right.iter().enumerate() { - if let Some(column_bounds) = partition_bounds.get_column_bounds(col_idx) { - // Create predicate: col >= min AND col <= max - let min_expr = Arc::new(BinaryExpr::new( - Arc::clone(right_expr), - Operator::GtEq, - lit(column_bounds.min.clone()), - )) as Arc; - let max_expr = Arc::new(BinaryExpr::new( - Arc::clone(right_expr), - Operator::LtEq, - lit(column_bounds.max.clone()), - )) as Arc; - let range_expr = - Arc::new(BinaryExpr::new(min_expr, Operator::And, max_expr)) - as Arc; - column_predicates.push(range_expr); - } + // Store the single hash map (only once, even though multiple partitions call this) + if guard.single_hash_map.is_none() { + guard.single_hash_map = Some(hash_map); } - // Combine all column predicates for this partition with AND - if !column_predicates.is_empty() { - let partition_predicate = column_predicates - .into_iter() - .reduce(|acc, pred| { - Arc::new(BinaryExpr::new(acc, Operator::And, pred)) - as Arc - }) - .unwrap(); - partition_predicates.push(partition_predicate); + if let Some(bounds) = partition_bounds { + // Use partition 0 for the single hash table + let should_push = if let Some(last_bound) = guard.bounds.last() { + // Deduplicate - all partitions report the same data in CollectLeft + last_bound.partition != 0 + } else { + true + }; + + if should_push { + guard.bounds.push(PartitionBounds::new(0, bounds)); + } } } - // Combine all partition predicates with OR - let combined_predicate = partition_predicates - .into_iter() - .reduce(|acc, pred| { - Arc::new(BinaryExpr::new(acc, Operator::Or, pred)) - as Arc - }) - .unwrap_or_else(|| lit(true)); + // Wait for all partitions to report + if self.barrier.wait().await.is_leader() { + // All partitions have reported, so we can create and update the filter + let inner = self.inner.lock(); + + if let Some(ref hash_map) = inner.single_hash_map { + // Create hash lookup expression + let lookup_hash_expr = Arc::new(HashExpr::new( + self.on_right.clone(), + HASH_JOIN_SEED, + "hash_join".to_string(), + )) as Arc; + + let hash_lookup_expr = Arc::new(HashTableLookupExpr::new( + lookup_hash_expr, + Arc::clone(hash_map), + "hash_lookup".to_string(), + )) as Arc; + + // Create bounds check expression (if bounds available) + let mut filter_expr = hash_lookup_expr; + + if let Some(partition_bounds) = inner.bounds.first() { + let mut column_predicates = Vec::new(); + + for (col_idx, right_expr) in self.on_right.iter().enumerate() { + if let Some(column_bounds) = + partition_bounds.get_column_bounds(col_idx) + { + // Create predicate: col >= min AND col <= max + let min_expr = Arc::new(BinaryExpr::new( + Arc::clone(right_expr), + Operator::GtEq, + lit(column_bounds.min.clone()), + )) + as Arc; + let max_expr = Arc::new(BinaryExpr::new( + Arc::clone(right_expr), + Operator::LtEq, + lit(column_bounds.max.clone()), + )) + as Arc; + let range_expr = Arc::new(BinaryExpr::new( + min_expr, + Operator::And, + max_expr, + )) + as Arc; + column_predicates.push(range_expr); + } + } - Ok(combined_predicate) + // Combine all column range predicates with AND + if !column_predicates.is_empty() { + let bounds_expr = column_predicates + .into_iter() + .reduce(|acc, pred| { + Arc::new(BinaryExpr::new(acc, Operator::And, pred)) + as Arc + }) + .unwrap(); + + // Combine bounds_expr AND hash_lookup_expr + filter_expr = Arc::new(BinaryExpr::new( + bounds_expr, + Operator::And, + filter_expr, + )) as Arc; + } + } + + self.dynamic_filter.update(filter_expr)?; + } + } + + Ok(()) } - /// Report bounds from a completed partition and update dynamic filter if all partitions are done - /// - /// This method coordinates the dynamic filter updates across all partitions. It stores the - /// bounds from the current partition, increments the completion counter, and when all - /// partitions have reported, creates an OR'd filter from individual partition bounds. + /// Report both hash map AND bounds from a completed partition /// - /// This method is async and uses a [`tokio::sync::Barrier`] to wait for all partitions - /// to report their bounds. Once that occurs, the method will resolve for all callers and the - /// dynamic filter will be updated exactly once. - /// - /// # Note - /// - /// As barriers are reusable, it is likely an error to call this method more times than the - /// total number of partitions - as it can lead to pending futures that never resolve. We rely - /// on correct usage from the caller rather than imposing additional checks here. If this is a concern, - /// consider making the resulting future shared so the ready result can be reused. + /// This method is used for `PartitionMode::Partitioned` to collect both hash maps and bounds + /// from each partition. When all partitions have reported, it creates a CASE expression that + /// combines min/max range checks with hash table lookups for maximum filtering efficiency. /// /// # Arguments - /// * `left_side_partition_id` - The identifier for the **left-side** partition reporting its bounds - /// * `partition_bounds` - The bounds computed by this partition (if any) + /// * `partition_id` - The partition number reporting + /// * `hash_map` - Arc reference to the partition's hash table + /// * `partition_bounds` - Min/max bounds for this partition /// /// # Returns /// * `Result<()>` - Ok if successful, Err if filter update failed - pub(crate) async fn report_partition_bounds( + pub(crate) async fn report_partition_hash_map_and_bounds( &self, - left_side_partition_id: usize, + partition_id: usize, + hash_map: Arc, partition_bounds: Option>, ) -> Result<()> { - // Store bounds in the accumulator - this runs once per partition - if let Some(bounds) = partition_bounds { + // Store both hash map and bounds in the accumulator + { let mut guard = self.inner.lock(); + guard.hash_maps[partition_id] = Some(hash_map); - let should_push = if let Some(last_bound) = guard.bounds.last() { - // In `PartitionMode::CollectLeft`, all streams on the left side share the same partition id (0). - // Since this function can be called multiple times for that same partition, we must deduplicate - // by checking against the last recorded bound. - last_bound.partition != left_side_partition_id - } else { - true - }; - - if should_push { + if let Some(bounds) = partition_bounds { guard .bounds - .push(PartitionBounds::new(left_side_partition_id, bounds)); + .push(PartitionBounds::new(partition_id, bounds)); } } + // Wait for all partitions to report if self.barrier.wait().await.is_leader() { - // All partitions have reported, so we can update the filter + // All partitions have reported, so we can create and update the filter let inner = self.inner.lock(); - if !inner.bounds.is_empty() { - let filter_expr = - self.create_filter_from_partition_bounds(&inner.bounds)?; + + // Collect all hash maps (they should all be Some at this point) + let hash_maps: Vec> = + inner.hash_maps.iter().filter_map(|hm| hm.clone()).collect(); + + if !hash_maps.is_empty() { + // Build a CASE expression that combines range checks AND hash lookups + // CASE (hash_repartition(join_keys) % num_partitions) + // WHEN 0 THEN (col >= min_0 AND col <= max_0 AND ...) AND hash_lookup(table_0, hash_join(join_keys)) + // WHEN 1 THEN (col >= min_1 AND col <= max_1 AND ...) AND hash_lookup(table_1, hash_join(join_keys)) + // ... + // ELSE false + // END + + let num_partitions = hash_maps.len(); + + // Create base expression: hash_repartition(join_keys) % num_partitions + let routing_hash_expr = Arc::new(HashExpr::new( + self.on_right.clone(), + self.repartition_random_state.clone(), + "hash_repartition".to_string(), + )) as Arc; + + let modulo_expr = Arc::new(BinaryExpr::new( + routing_hash_expr, + Operator::Modulo, + lit(ScalarValue::UInt64(Some(num_partitions as u64))), + )) as Arc; + + // Create WHEN branches for each partition + let when_then_branches: Vec<( + Arc, + Arc, + )> = hash_maps + .into_iter() + .enumerate() + .map(|(partition_id, hash_map)| { + // WHEN partition_id + let when_expr = + lit(ScalarValue::UInt64(Some(partition_id as u64))); + + // THEN: Combine bounds check AND hash lookup + + // 1. Create hash lookup expression + let lookup_hash_expr = Arc::new(HashExpr::new( + self.on_right.clone(), + HASH_JOIN_SEED, + format!("hash_join_p{partition_id}"), + )) + as Arc; + + let hash_lookup_expr = Arc::new(HashTableLookupExpr::new( + lookup_hash_expr, + hash_map, + format!("lookup_p{partition_id}"), + )) + as Arc; + + // 2. Create bounds check expression for this partition (if bounds available) + let mut then_expr = hash_lookup_expr; + + if let Some(partition_bounds) = + inner.bounds.iter().find(|pb| pb.partition == partition_id) + { + let mut column_predicates = Vec::new(); + + for (col_idx, right_expr) in self.on_right.iter().enumerate() + { + if let Some(column_bounds) = + partition_bounds.get_column_bounds(col_idx) + { + // Create predicate: col >= min AND col <= max + let min_expr = Arc::new(BinaryExpr::new( + Arc::clone(right_expr), + Operator::GtEq, + lit(column_bounds.min.clone()), + )) + as Arc; + let max_expr = Arc::new(BinaryExpr::new( + Arc::clone(right_expr), + Operator::LtEq, + lit(column_bounds.max.clone()), + )) + as Arc; + let range_expr = Arc::new(BinaryExpr::new( + min_expr, + Operator::And, + max_expr, + )) + as Arc; + column_predicates.push(range_expr); + } + } + + // Combine all column range predicates with AND + if !column_predicates.is_empty() { + let bounds_expr = column_predicates + .into_iter() + .reduce(|acc, pred| { + Arc::new(BinaryExpr::new( + acc, + Operator::And, + pred, + )) + as Arc + }) + .unwrap(); + + // Combine bounds_expr AND hash_lookup_expr + then_expr = Arc::new(BinaryExpr::new( + bounds_expr, + Operator::And, + then_expr, + )) + as Arc; + } + } + + (when_expr, then_expr) + }) + .collect(); + + // Create CASE expression + let filter_expr = Arc::new(CaseExpr::try_new( + Some(modulo_expr), + when_then_branches, + Some(lit(false)), // ELSE false + )?) as Arc; + self.dynamic_filter.update(filter_expr)?; } } @@ -306,8 +500,8 @@ impl SharedBoundsAccumulator { } } -impl fmt::Debug for SharedBoundsAccumulator { +impl fmt::Debug for SharedBuildAccumulator { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "SharedBoundsAccumulator") + write!(f, "SharedBuildAccumulator") } } diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 88c50c2eb2ce..e8fb6b7a55bd 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use std::task::Poll; use crate::joins::hash_join::exec::JoinLeftData; -use crate::joins::hash_join::shared_bounds::SharedBoundsAccumulator; +use crate::joins::hash_join::shared_bounds::SharedBuildAccumulator; use crate::joins::utils::{ equal_rows_arr, get_final_indices_from_shared_bitmap, OnceFut, }; @@ -206,11 +206,11 @@ pub(super) struct HashJoinStream { hashes_buffer: Vec, /// Specifies whether the right side has an ordering to potentially preserve right_side_ordered: bool, - /// Shared bounds accumulator for coordinating dynamic filter updates (optional) - bounds_accumulator: Option>, - /// Optional future to signal when bounds have been reported by all partitions + /// Shared build accumulator for coordinating dynamic filter updates (collects hash maps and/or bounds, optional) + build_accumulator: Option>, + /// Optional future to signal when build information has been reported by all partitions /// and the dynamic filter has been updated - bounds_waiter: Option>, + build_waiter: Option>, /// Partitioning mode to use mode: PartitionMode, @@ -315,7 +315,7 @@ impl HashJoinStream { batch_size: usize, hashes_buffer: Vec, right_side_ordered: bool, - bounds_accumulator: Option>, + build_accumulator: Option>, mode: PartitionMode, ) -> Self { Self { @@ -334,8 +334,8 @@ impl HashJoinStream { batch_size, hashes_buffer, right_side_ordered, - bounds_accumulator, - bounds_waiter: None, + build_accumulator, + build_waiter: None, mode, } } @@ -370,12 +370,12 @@ impl HashJoinStream { } } - /// Optional step to wait until bounds have been reported by all partitions. - /// This state is only entered if a bounds accumulator is present. + /// Optional step to wait until build-side information (hash maps or bounds) has been reported by all partitions. + /// This state is only entered if a build accumulator is present. /// /// ## Why wait? /// - /// The dynamic filter is only built once all partitions have reported their bounds. + /// The dynamic filter is only built once all partitions have reported their information (hash maps or bounds). /// If we do not wait here, the probe-side scan may start before the filter is ready. /// This can lead to the probe-side scan missing the opportunity to apply the filter /// and skip reading unnecessary data. @@ -383,7 +383,7 @@ impl HashJoinStream { &mut self, cx: &mut std::task::Context<'_>, ) -> Poll>>> { - if let Some(ref mut fut) = self.bounds_waiter { + if let Some(ref mut fut) = self.build_waiter { ready!(fut.get_shared(cx))?; } self.state = HashJoinStreamState::FetchProbeBatch; @@ -406,12 +406,13 @@ impl HashJoinStream { .get_shared(cx))?; build_timer.done(); - // Handle dynamic filter bounds accumulation + // Handle dynamic filter build-side information accumulation // // Dynamic filter coordination between partitions: - // Report bounds to the accumulator which will handle synchronization and filter updates - if let Some(ref bounds_accumulator) = self.bounds_accumulator { - let bounds_accumulator = Arc::clone(bounds_accumulator); + // Report hash maps (Partitioned mode) or bounds (CollectLeft mode) to the accumulator + // which will handle synchronization and filter updates + if let Some(ref build_accumulator) = self.build_accumulator { + let build_accumulator = Arc::clone(build_accumulator); let left_side_partition_id = match self.mode { PartitionMode::Partitioned => self.partition, @@ -419,12 +420,35 @@ impl HashJoinStream { PartitionMode::Auto => unreachable!("PartitionMode::Auto should not be present at execution time. This is a bug in DataFusion, please report it!"), }; - let left_data_bounds = left_data.bounds.clone(); - self.bounds_waiter = Some(OnceFut::new(async move { - bounds_accumulator - .report_partition_bounds(left_side_partition_id, left_data_bounds) - .await - })); + // For Partitioned mode, report both hash maps AND bounds for comprehensive filtering + // For CollectLeft mode, fall back to bounds-based filtering only + match self.mode { + PartitionMode::Partitioned => { + let hash_map = left_data.hash_map_arc(); + let left_data_bounds = left_data.bounds.clone(); + self.build_waiter = Some(OnceFut::new(async move { + build_accumulator + .report_partition_hash_map_and_bounds( + left_side_partition_id, + hash_map, + left_data_bounds, + ) + .await + })); + } + PartitionMode::CollectLeft => { + let hash_map = left_data.hash_map_arc(); + let left_data_bounds = left_data.bounds.clone(); + self.build_waiter = Some(OnceFut::new(async move { + build_accumulator + .report_single_hash_map_and_bounds(hash_map, left_data_bounds) + .await + })); + } + PartitionMode::Auto => unreachable!( + "PartitionMode::Auto should not be present at execution time" + ), + } self.state = HashJoinStreamState::WaitPartitionBoundsReport; } else { self.state = HashJoinStreamState::FetchProbeBatch;