Skip to content

Commit fc2fbb3

Browse files
authored
Move HashJoin from RawTable to HashTable (#14904)
* Move to HashTable * Add back in comment * Add TODO for switching to HashTable::allocation_size
1 parent b3593cd commit fc2fbb3

File tree

3 files changed

+63
-51
lines changed

3 files changed

+63
-51
lines changed

datafusion/physical-plan/src/joins/hash_join.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1656,7 +1656,7 @@ mod tests {
16561656
use datafusion_expr::Operator;
16571657
use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
16581658
use datafusion_physical_expr::PhysicalExpr;
1659-
use hashbrown::raw::RawTable;
1659+
use hashbrown::HashTable;
16601660
use rstest::*;
16611661
use rstest_reuse::*;
16621662

@@ -3295,7 +3295,7 @@ mod tests {
32953295

32963296
#[test]
32973297
fn join_with_hash_collision() -> Result<()> {
3298-
let mut hashmap_left = RawTable::with_capacity(2);
3298+
let mut hashmap_left = HashTable::with_capacity(2);
32993299
let left = build_table_i32(
33003300
("a", &vec![10, 20]),
33013301
("x", &vec![100, 200]),
@@ -3311,8 +3311,8 @@ mod tests {
33113311
)?;
33123312

33133313
// Create hash collisions (same hashes)
3314-
hashmap_left.insert(hashes[0], (hashes[0], 1), |(h, _)| *h);
3315-
hashmap_left.insert(hashes[1], (hashes[1], 1), |(h, _)| *h);
3314+
hashmap_left.insert_unique(hashes[0], (hashes[0], 1), |(h, _)| *h);
3315+
hashmap_left.insert_unique(hashes[1], (hashes[1], 1), |(h, _)| *h);
33163316

33173317
let next = vec![2, 0];
33183318

datafusion/physical-plan/src/joins/stream_join_utils.rs

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use arrow::array::{
3232
use arrow::compute::concat_batches;
3333
use arrow::datatypes::{ArrowNativeType, Schema, SchemaRef};
3434
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
35+
use datafusion_common::utils::memory::estimate_memory_size;
3536
use datafusion_common::{
3637
arrow_datafusion_err, DataFusionError, HashSet, JoinSide, Result, ScalarValue,
3738
};
@@ -42,7 +43,7 @@ use datafusion_physical_expr::utils::collect_columns;
4243
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
4344

4445
use datafusion_physical_expr_common::sort_expr::LexOrdering;
45-
use hashbrown::raw::RawTable;
46+
use hashbrown::HashTable;
4647

4748
/// Implementation of `JoinHashMapType` for `PruningJoinHashMap`.
4849
impl JoinHashMapType for PruningJoinHashMap {
@@ -54,12 +55,12 @@ impl JoinHashMapType for PruningJoinHashMap {
5455
}
5556

5657
/// Get mutable references to the hash map and the next.
57-
fn get_mut(&mut self) -> (&mut RawTable<(u64, u64)>, &mut Self::NextType) {
58+
fn get_mut(&mut self) -> (&mut HashTable<(u64, u64)>, &mut Self::NextType) {
5859
(&mut self.map, &mut self.next)
5960
}
6061

6162
/// Get a reference to the hash map.
62-
fn get_map(&self) -> &RawTable<(u64, u64)> {
63+
fn get_map(&self) -> &HashTable<(u64, u64)> {
6364
&self.map
6465
}
6566

@@ -106,7 +107,7 @@ impl JoinHashMapType for PruningJoinHashMap {
106107
/// ```
107108
pub struct PruningJoinHashMap {
108109
/// Stores hash value to last row index
109-
pub map: RawTable<(u64, u64)>,
110+
pub map: HashTable<(u64, u64)>,
110111
/// Stores indices in chained list data structure
111112
pub next: VecDeque<u64>,
112113
}
@@ -122,7 +123,7 @@ impl PruningJoinHashMap {
122123
/// A new instance of `PruningJoinHashMap`.
123124
pub(crate) fn with_capacity(capacity: usize) -> Self {
124125
PruningJoinHashMap {
125-
map: RawTable::with_capacity(capacity),
126+
map: HashTable::with_capacity(capacity),
126127
next: VecDeque::with_capacity(capacity),
127128
}
128129
}
@@ -155,7 +156,11 @@ impl PruningJoinHashMap {
155156
/// # Returns
156157
/// The size of the hash map in bytes.
157158
pub(crate) fn size(&self) -> usize {
158-
self.map.allocation_info().1.size() + self.next.capacity() * size_of::<u64>()
159+
let fixed_size = size_of::<PruningJoinHashMap>();
160+
161+
// TODO: switch to using [HashTable::allocation_size] when available after upgrading hashbrown to 0.15
162+
estimate_memory_size::<(u64, u64)>(self.map.capacity(), fixed_size).unwrap()
163+
+ self.next.capacity() * size_of::<u64>()
159164
}
160165

161166
/// Removes hash values from the map and the list based on the given pruning
@@ -177,20 +182,20 @@ impl PruningJoinHashMap {
177182
self.next.drain(0..prune_length);
178183

179184
// Calculate the keys that should be removed from the map.
180-
let removable_keys = unsafe {
181-
self.map
182-
.iter()
183-
.map(|bucket| bucket.as_ref())
184-
.filter_map(|(hash, tail_index)| {
185-
(*tail_index < prune_length as u64 + deleting_offset).then_some(*hash)
186-
})
187-
.collect::<Vec<_>>()
188-
};
185+
let removable_keys = self
186+
.map
187+
.iter()
188+
.filter_map(|(hash, tail_index)| {
189+
(*tail_index < prune_length as u64 + deleting_offset).then_some(*hash)
190+
})
191+
.collect::<Vec<_>>();
189192

190193
// Remove the keys from the map.
191194
removable_keys.into_iter().for_each(|hash_value| {
192195
self.map
193-
.remove_entry(hash_value, |(hash, _)| hash_value == *hash);
196+
.find_entry(hash_value, |(hash, _)| hash_value == *hash)
197+
.unwrap()
198+
.remove();
194199
});
195200

196201
// Shrink the map if necessary.
@@ -1094,7 +1099,7 @@ pub mod tests {
10941099
let deleted_part = 3 * data_size / 4;
10951100
// Add elements to the JoinHashMap
10961101
for hash_value in 0..data_size {
1097-
join_hash_map.map.insert(
1102+
join_hash_map.map.insert_unique(
10981103
hash_value,
10991104
(hash_value, hash_value),
11001105
|(hash, _)| *hash,
@@ -1108,7 +1113,9 @@ pub mod tests {
11081113
for hash_value in 0..deleted_part {
11091114
join_hash_map
11101115
.map
1111-
.remove_entry(hash_value, |(hash, _)| hash_value == *hash);
1116+
.find_entry(hash_value, |(hash, _)| hash_value == *hash)
1117+
.unwrap()
1118+
.remove();
11121119
}
11131120

11141121
assert_eq!(join_hash_map.map.len(), (data_size - deleted_part) as usize);

datafusion/physical-plan/src/joins/utils.rs

Lines changed: 34 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,13 @@ use datafusion_physical_expr::utils::{collect_columns, merge_vectors};
5454
use datafusion_physical_expr::{
5555
LexOrdering, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr,
5656
};
57+
use hashbrown::hash_table::Entry::{Occupied, Vacant};
58+
use hashbrown::HashTable;
5759

5860
use crate::joins::SharedBitmapBuilder;
5961
use crate::projection::ProjectionExec;
6062
use futures::future::{BoxFuture, Shared};
6163
use futures::{ready, FutureExt};
62-
use hashbrown::raw::RawTable;
6364
use parking_lot::Mutex;
6465

6566
/// Maps a `u64` hash value based on the build side ["on" values] to a list of indices with this key's value.
@@ -126,20 +127,20 @@ use parking_lot::Mutex;
126127
/// ```
127128
pub struct JoinHashMap {
128129
// Stores hash value to last row index
129-
map: RawTable<(u64, u64)>,
130+
map: HashTable<(u64, u64)>,
130131
// Stores indices in chained list data structure
131132
next: Vec<u64>,
132133
}
133134

134135
impl JoinHashMap {
135136
#[cfg(test)]
136-
pub(crate) fn new(map: RawTable<(u64, u64)>, next: Vec<u64>) -> Self {
137+
pub(crate) fn new(map: HashTable<(u64, u64)>, next: Vec<u64>) -> Self {
137138
Self { map, next }
138139
}
139140

140141
pub(crate) fn with_capacity(capacity: usize) -> Self {
141142
JoinHashMap {
142-
map: RawTable::with_capacity(capacity),
143+
map: HashTable::with_capacity(capacity),
143144
next: vec![0; capacity],
144145
}
145146
}
@@ -199,9 +200,9 @@ pub trait JoinHashMapType {
199200
/// Extend with zero
200201
fn extend_zero(&mut self, len: usize);
201202
/// Returns mutable references to the hash map and the next.
202-
fn get_mut(&mut self) -> (&mut RawTable<(u64, u64)>, &mut Self::NextType);
203+
fn get_mut(&mut self) -> (&mut HashTable<(u64, u64)>, &mut Self::NextType);
203204
/// Returns a reference to the hash map.
204-
fn get_map(&self) -> &RawTable<(u64, u64)>;
205+
fn get_map(&self) -> &HashTable<(u64, u64)>;
205206
/// Returns a reference to the next.
206207
fn get_list(&self) -> &Self::NextType;
207208

@@ -212,24 +213,28 @@ pub trait JoinHashMapType {
212213
deleted_offset: usize,
213214
) {
214215
let (mut_map, mut_list) = self.get_mut();
215-
for (row, hash_value) in iter {
216-
let item = mut_map.get_mut(*hash_value, |(hash, _)| *hash_value == *hash);
217-
if let Some((_, index)) = item {
218-
// Already exists: add index to next array
219-
let prev_index = *index;
220-
// Store new value inside hashmap
221-
*index = (row + 1) as u64;
222-
// Update chained Vec at `row` with previous value
223-
mut_list[row - deleted_offset] = prev_index;
224-
} else {
225-
mut_map.insert(
226-
*hash_value,
227-
// store the value + 1 as 0 value reserved for end of list
228-
(*hash_value, (row + 1) as u64),
229-
|(hash, _)| *hash,
230-
);
231-
// chained list at `row` is already initialized with 0
232-
// meaning end of list
216+
for (row, &hash_value) in iter {
217+
let entry = mut_map.entry(
218+
hash_value,
219+
|&(hash, _)| hash_value == hash,
220+
|&(hash, _)| hash,
221+
);
222+
223+
match entry {
224+
Occupied(mut occupied_entry) => {
225+
// Already exists: add index to next array
226+
let (_, index) = occupied_entry.get_mut();
227+
let prev_index = *index;
228+
// Store new value inside hashmap
229+
*index = (row + 1) as u64;
230+
// Update chained Vec at `row` with previous value
231+
mut_list[row - deleted_offset] = prev_index;
232+
}
233+
Vacant(vacant_entry) => {
234+
vacant_entry.insert((hash_value, (row + 1) as u64));
235+
// chained list at `row` is already initialized with 0
236+
// meaning end of list
237+
}
233238
}
234239
}
235240
}
@@ -251,7 +256,7 @@ pub trait JoinHashMapType {
251256
for (row_idx, hash_value) in iter {
252257
// Get the hash and find it in the index
253258
if let Some((_, index)) =
254-
hash_map.get(*hash_value, |(hash, _)| *hash_value == *hash)
259+
hash_map.find(*hash_value, |(hash, _)| *hash_value == *hash)
255260
{
256261
let mut i = *index - 1;
257262
loop {
@@ -299,7 +304,7 @@ pub trait JoinHashMapType {
299304

300305
let mut remaining_output = limit;
301306

302-
let hash_map: &RawTable<(u64, u64)> = self.get_map();
307+
let hash_map: &HashTable<(u64, u64)> = self.get_map();
303308
let next_chain = self.get_list();
304309

305310
// Calculate initial `hash_values` index before iterating
@@ -330,7 +335,7 @@ pub trait JoinHashMapType {
330335
let mut row_idx = to_skip;
331336
for hash_value in &hash_values[to_skip..] {
332337
if let Some((_, index)) =
333-
hash_map.get(*hash_value, |(hash, _)| *hash_value == *hash)
338+
hash_map.find(*hash_value, |(hash, _)| *hash_value == *hash)
334339
{
335340
chain_traverse!(
336341
input_indices,
@@ -358,12 +363,12 @@ impl JoinHashMapType for JoinHashMap {
358363
fn extend_zero(&mut self, _: usize) {}
359364

360365
/// Get mutable references to the hash map and the next.
361-
fn get_mut(&mut self) -> (&mut RawTable<(u64, u64)>, &mut Self::NextType) {
366+
fn get_mut(&mut self) -> (&mut HashTable<(u64, u64)>, &mut Self::NextType) {
362367
(&mut self.map, &mut self.next)
363368
}
364369

365370
/// Get a reference to the hash map.
366-
fn get_map(&self) -> &RawTable<(u64, u64)> {
371+
fn get_map(&self) -> &HashTable<(u64, u64)> {
367372
&self.map
368373
}
369374

0 commit comments

Comments
 (0)