Skip to content

Commit 8695ee5

Browse files
ctskWeijun-H
authored andcommitted
refactor(hash_join): Move JoinHashMap to separate mod (apache#15419)
* refactor(hash_join): Move JoinHashMap to separate mod * Add description to join_hash_map module Co-authored-by: Alex Huang <[email protected]> --------- Co-authored-by: Alex Huang <[email protected]>
1 parent d2b91f1 commit 8695ee5

File tree

4 files changed

+353
-325
lines changed

4 files changed

+353
-325
lines changed

datafusion/physical-plan/src/joins/hash_join.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,13 @@ use crate::{
4444
common::can_project,
4545
handle_state,
4646
hash_utils::create_hashes,
47+
joins::join_hash_map::JoinHashMapOffset,
4748
joins::utils::{
4849
adjust_indices_by_join_type, apply_join_filter_to_indices,
4950
build_batch_from_indices, build_join_schema, check_join_is_valid,
5051
estimate_join_statistics, need_produce_result_in_final,
5152
symmetric_join_output_partitioning, BuildProbeJoinMetrics, ColumnIndex,
52-
JoinFilter, JoinHashMap, JoinHashMapOffset, JoinHashMapType, JoinOn, JoinOnRef,
53+
JoinFilter, JoinHashMap, JoinHashMapType, JoinOn, JoinOnRef,
5354
StatefulStreamResult,
5455
},
5556
metrics::{ExecutionPlanMetricsSet, MetricsSet},
Lines changed: 347 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,347 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! This file contains the implementation of the `JoinHashMap` struct, which
19+
//! is used to store the mapping between hash values based on the build side
20+
//! ["on" values] to a list of indices with this key's value.
21+
22+
use std::fmt::{self, Debug};
23+
use std::ops::IndexMut;
24+
25+
use hashbrown::hash_table::Entry::{Occupied, Vacant};
26+
use hashbrown::HashTable;
27+
28+
/// Maps a `u64` hash value based on the build side ["on" values] to a list of indices with this key's value.
29+
///
30+
/// By allocating a `HashMap` with capacity for *at least* the number of rows for entries at the build side,
31+
/// we make sure that we don't have to re-hash the hashmap, which needs access to the key (the hash in this case) value.
32+
///
33+
/// E.g. 1 -> [3, 6, 8] indicates that the column values map to rows 3, 6 and 8 for hash value 1
34+
/// As the key is a hash value, we need to check possible hash collisions in the probe stage
35+
/// During this stage it might be the case that a row is contained the same hashmap value,
36+
/// but the values don't match. Those are checked in the `equal_rows_arr` method.
37+
///
38+
/// The indices (values) are stored in a separate chained list stored in the `Vec<u64>`.
39+
///
40+
/// The first value (+1) is stored in the hashmap, whereas the next value is stored in array at the position value.
41+
///
42+
/// The chain can be followed until the value "0" has been reached, meaning the end of the list.
43+
/// Also see chapter 5.3 of [Balancing vectorized query execution with bandwidth-optimized storage](https://dare.uva.nl/search?identifier=5ccbb60a-38b8-4eeb-858a-e7735dd37487)
44+
///
45+
/// # Example
46+
///
47+
/// ``` text
48+
/// See the example below:
49+
///
50+
/// Insert (10,1) <-- insert hash value 10 with row index 1
51+
/// map:
52+
/// ----------
53+
/// | 10 | 2 |
54+
/// ----------
55+
/// next:
56+
/// ---------------------
57+
/// | 0 | 0 | 0 | 0 | 0 |
58+
/// ---------------------
59+
/// Insert (20,2)
60+
/// map:
61+
/// ----------
62+
/// | 10 | 2 |
63+
/// | 20 | 3 |
64+
/// ----------
65+
/// next:
66+
/// ---------------------
67+
/// | 0 | 0 | 0 | 0 | 0 |
68+
/// ---------------------
69+
/// Insert (10,3) <-- collision! row index 3 has a hash value of 10 as well
70+
/// map:
71+
/// ----------
72+
/// | 10 | 4 |
73+
/// | 20 | 3 |
74+
/// ----------
75+
/// next:
76+
/// ---------------------
77+
/// | 0 | 0 | 0 | 2 | 0 | <--- hash value 10 maps to 4,2 (which means indices values 3,1)
78+
/// ---------------------
79+
/// Insert (10,4) <-- another collision! row index 4 ALSO has a hash value of 10
80+
/// map:
81+
/// ---------
82+
/// | 10 | 5 |
83+
/// | 20 | 3 |
84+
/// ---------
85+
/// next:
86+
/// ---------------------
87+
/// | 0 | 0 | 0 | 2 | 4 | <--- hash value 10 maps to 5,4,2 (which means indices values 4,3,1)
88+
/// ---------------------
89+
/// ```
90+
pub struct JoinHashMap {
91+
// Stores hash value to last row index
92+
map: HashTable<(u64, u64)>,
93+
// Stores indices in chained list data structure
94+
next: Vec<u64>,
95+
}
96+
97+
impl JoinHashMap {
98+
#[cfg(test)]
99+
pub(crate) fn new(map: HashTable<(u64, u64)>, next: Vec<u64>) -> Self {
100+
Self { map, next }
101+
}
102+
103+
pub(crate) fn with_capacity(capacity: usize) -> Self {
104+
JoinHashMap {
105+
map: HashTable::with_capacity(capacity),
106+
next: vec![0; capacity],
107+
}
108+
}
109+
}
110+
111+
// Type of offsets for obtaining indices from JoinHashMap.
112+
pub(crate) type JoinHashMapOffset = (usize, Option<u64>);
113+
114+
// Macro for traversing chained values with limit.
115+
// Early returns in case of reaching output tuples limit.
116+
macro_rules! chain_traverse {
117+
(
118+
$input_indices:ident, $match_indices:ident, $hash_values:ident, $next_chain:ident,
119+
$input_idx:ident, $chain_idx:ident, $deleted_offset:ident, $remaining_output:ident
120+
) => {
121+
let mut i = $chain_idx - 1;
122+
loop {
123+
let match_row_idx = if let Some(offset) = $deleted_offset {
124+
// This arguments means that we prune the next index way before here.
125+
if i < offset as u64 {
126+
// End of the list due to pruning
127+
break;
128+
}
129+
i - offset as u64
130+
} else {
131+
i
132+
};
133+
$match_indices.push(match_row_idx);
134+
$input_indices.push($input_idx as u32);
135+
$remaining_output -= 1;
136+
// Follow the chain to get the next index value
137+
let next = $next_chain[match_row_idx as usize];
138+
139+
if $remaining_output == 0 {
140+
// In case current input index is the last, and no more chain values left
141+
// returning None as whole input has been scanned
142+
let next_offset = if $input_idx == $hash_values.len() - 1 && next == 0 {
143+
None
144+
} else {
145+
Some(($input_idx, Some(next)))
146+
};
147+
return ($input_indices, $match_indices, next_offset);
148+
}
149+
if next == 0 {
150+
// end of list
151+
break;
152+
}
153+
i = next - 1;
154+
}
155+
};
156+
}
157+
158+
// Trait defining methods that must be implemented by a hash map type to be used for joins.
159+
pub trait JoinHashMapType {
160+
/// The type of list used to store the next list
161+
type NextType: IndexMut<usize, Output = u64>;
162+
/// Extend with zero
163+
fn extend_zero(&mut self, len: usize);
164+
/// Returns mutable references to the hash map and the next.
165+
fn get_mut(&mut self) -> (&mut HashTable<(u64, u64)>, &mut Self::NextType);
166+
/// Returns a reference to the hash map.
167+
fn get_map(&self) -> &HashTable<(u64, u64)>;
168+
/// Returns a reference to the next.
169+
fn get_list(&self) -> &Self::NextType;
170+
171+
/// Updates hashmap from iterator of row indices & row hashes pairs.
172+
fn update_from_iter<'a>(
173+
&mut self,
174+
iter: impl Iterator<Item = (usize, &'a u64)>,
175+
deleted_offset: usize,
176+
) {
177+
let (mut_map, mut_list) = self.get_mut();
178+
for (row, &hash_value) in iter {
179+
let entry = mut_map.entry(
180+
hash_value,
181+
|&(hash, _)| hash_value == hash,
182+
|&(hash, _)| hash,
183+
);
184+
185+
match entry {
186+
Occupied(mut occupied_entry) => {
187+
// Already exists: add index to next array
188+
let (_, index) = occupied_entry.get_mut();
189+
let prev_index = *index;
190+
// Store new value inside hashmap
191+
*index = (row + 1) as u64;
192+
// Update chained Vec at `row` with previous value
193+
mut_list[row - deleted_offset] = prev_index;
194+
}
195+
Vacant(vacant_entry) => {
196+
vacant_entry.insert((hash_value, (row + 1) as u64));
197+
// chained list at `row` is already initialized with 0
198+
// meaning end of list
199+
}
200+
}
201+
}
202+
}
203+
204+
/// Returns all pairs of row indices matched by hash.
205+
///
206+
/// This method only compares hashes, so additional further check for actual values
207+
/// equality may be required.
208+
fn get_matched_indices<'a>(
209+
&self,
210+
iter: impl Iterator<Item = (usize, &'a u64)>,
211+
deleted_offset: Option<usize>,
212+
) -> (Vec<u32>, Vec<u64>) {
213+
let mut input_indices = vec![];
214+
let mut match_indices = vec![];
215+
216+
let hash_map = self.get_map();
217+
let next_chain = self.get_list();
218+
for (row_idx, hash_value) in iter {
219+
// Get the hash and find it in the index
220+
if let Some((_, index)) =
221+
hash_map.find(*hash_value, |(hash, _)| *hash_value == *hash)
222+
{
223+
let mut i = *index - 1;
224+
loop {
225+
let match_row_idx = if let Some(offset) = deleted_offset {
226+
// This arguments means that we prune the next index way before here.
227+
if i < offset as u64 {
228+
// End of the list due to pruning
229+
break;
230+
}
231+
i - offset as u64
232+
} else {
233+
i
234+
};
235+
match_indices.push(match_row_idx);
236+
input_indices.push(row_idx as u32);
237+
// Follow the chain to get the next index value
238+
let next = next_chain[match_row_idx as usize];
239+
if next == 0 {
240+
// end of list
241+
break;
242+
}
243+
i = next - 1;
244+
}
245+
}
246+
}
247+
248+
(input_indices, match_indices)
249+
}
250+
251+
/// Matches hashes with taking limit and offset into account.
252+
/// Returns pairs of matched indices along with the starting point for next
253+
/// matching iteration (`None` if limit has not been reached).
254+
///
255+
/// This method only compares hashes, so additional further check for actual values
256+
/// equality may be required.
257+
fn get_matched_indices_with_limit_offset(
258+
&self,
259+
hash_values: &[u64],
260+
deleted_offset: Option<usize>,
261+
limit: usize,
262+
offset: JoinHashMapOffset,
263+
) -> (Vec<u32>, Vec<u64>, Option<JoinHashMapOffset>) {
264+
let mut input_indices = vec![];
265+
let mut match_indices = vec![];
266+
267+
let mut remaining_output = limit;
268+
269+
let hash_map: &HashTable<(u64, u64)> = self.get_map();
270+
let next_chain = self.get_list();
271+
272+
// Calculate initial `hash_values` index before iterating
273+
let to_skip = match offset {
274+
// None `initial_next_idx` indicates that `initial_idx` processing has'n been started
275+
(initial_idx, None) => initial_idx,
276+
// Zero `initial_next_idx` indicates that `initial_idx` has been processed during
277+
// previous iteration, and it should be skipped
278+
(initial_idx, Some(0)) => initial_idx + 1,
279+
// Otherwise, process remaining `initial_idx` matches by traversing `next_chain`,
280+
// to start with the next index
281+
(initial_idx, Some(initial_next_idx)) => {
282+
chain_traverse!(
283+
input_indices,
284+
match_indices,
285+
hash_values,
286+
next_chain,
287+
initial_idx,
288+
initial_next_idx,
289+
deleted_offset,
290+
remaining_output
291+
);
292+
293+
initial_idx + 1
294+
}
295+
};
296+
297+
let mut row_idx = to_skip;
298+
for hash_value in &hash_values[to_skip..] {
299+
if let Some((_, index)) =
300+
hash_map.find(*hash_value, |(hash, _)| *hash_value == *hash)
301+
{
302+
chain_traverse!(
303+
input_indices,
304+
match_indices,
305+
hash_values,
306+
next_chain,
307+
row_idx,
308+
index,
309+
deleted_offset,
310+
remaining_output
311+
);
312+
}
313+
row_idx += 1;
314+
}
315+
316+
(input_indices, match_indices, None)
317+
}
318+
}
319+
320+
/// Implementation of `JoinHashMapType` for `JoinHashMap`.
321+
impl JoinHashMapType for JoinHashMap {
322+
type NextType = Vec<u64>;
323+
324+
// Void implementation
325+
fn extend_zero(&mut self, _: usize) {}
326+
327+
/// Get mutable references to the hash map and the next.
328+
fn get_mut(&mut self) -> (&mut HashTable<(u64, u64)>, &mut Self::NextType) {
329+
(&mut self.map, &mut self.next)
330+
}
331+
332+
/// Get a reference to the hash map.
333+
fn get_map(&self) -> &HashTable<(u64, u64)> {
334+
&self.map
335+
}
336+
337+
/// Get a reference to the next.
338+
fn get_list(&self) -> &Self::NextType {
339+
&self.next
340+
}
341+
}
342+
343+
impl Debug for JoinHashMap {
344+
fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result {
345+
Ok(())
346+
}
347+
}

datafusion/physical-plan/src/joins/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ mod symmetric_hash_join;
3434
pub mod utils;
3535

3636
mod join_filter;
37+
mod join_hash_map;
38+
3739
#[cfg(test)]
3840
pub mod test_utils;
3941

0 commit comments

Comments
 (0)