Skip to content

feat: Support u32 indices for HashJoinExec #16434

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 97 additions & 24 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use super::{
};
use super::{JoinOn, JoinOnRef};
use crate::execution_plan::{boundedness_from_children, EmissionType};
use crate::joins::join_hash_map::{JoinHashMapU32, JoinHashMapU64};
use crate::projection::{
try_embed_projection, try_pushdown_through_join, EmbeddedProjection, JoinData,
ProjectionExec,
Expand All @@ -50,7 +51,7 @@ use crate::{
build_batch_from_indices, build_join_schema, check_join_is_valid,
estimate_join_statistics, need_produce_result_in_final,
symmetric_join_output_partitioning, BuildProbeJoinMetrics, ColumnIndex,
JoinFilter, JoinHashMap, JoinHashMapType, StatefulStreamResult,
JoinFilter, JoinHashMapType, StatefulStreamResult,
},
metrics::{ExecutionPlanMetricsSet, MetricsSet},
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
Expand Down Expand Up @@ -93,7 +94,7 @@ const HASH_JOIN_SEED: RandomState =
/// HashTable and input data for the left (build side) of a join
struct JoinLeftData {
/// The hash table with indices into `batch`
hash_map: JoinHashMap,
hash_map: Box<dyn JoinHashMapType>,
/// The input rows for the build side
batch: RecordBatch,
/// The build side on expressions values
Expand All @@ -113,7 +114,7 @@ struct JoinLeftData {
impl JoinLeftData {
/// Create a new `JoinLeftData` from its parts
fn new(
hash_map: JoinHashMap,
hash_map: Box<dyn JoinHashMapType>,
batch: RecordBatch,
values: Vec<ArrayRef>,
visited_indices_bitmap: SharedBitmapBuilder,
Expand All @@ -131,8 +132,8 @@ impl JoinLeftData {
}

/// return a reference to the hash map
fn hash_map(&self) -> &JoinHashMap {
&self.hash_map
fn hash_map(&self) -> &dyn JoinHashMapType {
&*self.hash_map
}

/// returns a reference to the build side batch
Expand Down Expand Up @@ -981,14 +982,25 @@ async fn collect_left_input(

// Estimation of memory size, required for hashtable, prior to allocation.
// Final result can be verified using `RawTable.allocation_info()`
let fixed_size = size_of::<JoinHashMap>();
let estimated_hashtable_size =
estimate_memory_size::<(u64, u64)>(num_rows, fixed_size)?;

reservation.try_grow(estimated_hashtable_size)?;
metrics.build_mem_used.add(estimated_hashtable_size);
let fixed_size_u32 = size_of::<JoinHashMapU32>();
let fixed_size_u64 = size_of::<JoinHashMapU64>();

// Use `u32` indices for the JoinHashMap when num_rows ≤ u32::MAX, otherwise use the
// `u64` indice variant
let mut hashmap: Box<dyn JoinHashMapType> = if num_rows > u32::MAX as usize {
let estimated_hashtable_size =
estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?;
reservation.try_grow(estimated_hashtable_size)?;
metrics.build_mem_used.add(estimated_hashtable_size);
Box::new(JoinHashMapU64::with_capacity(num_rows))
} else {
let estimated_hashtable_size =
estimate_memory_size::<(u32, u64)>(num_rows, fixed_size_u32)?;
reservation.try_grow(estimated_hashtable_size)?;
metrics.build_mem_used.add(estimated_hashtable_size);
Box::new(JoinHashMapU32::with_capacity(num_rows))
};

let mut hashmap = JoinHashMap::with_capacity(num_rows);
let mut hashes_buffer = Vec::new();
let mut offset = 0;

Expand All @@ -1000,7 +1012,7 @@ async fn collect_left_input(
update_hash(
&on_left,
batch,
&mut hashmap,
&mut *hashmap,
offset,
&random_state,
&mut hashes_buffer,
Expand Down Expand Up @@ -1052,19 +1064,16 @@ async fn collect_left_input(
/// which allows to keep either first (if set to true) or last (if set to false) row index
/// as a chain head for rows with equal hash values.
#[allow(clippy::too_many_arguments)]
pub fn update_hash<T>(
pub fn update_hash(
on: &[PhysicalExprRef],
batch: &RecordBatch,
hash_map: &mut T,
hash_map: &mut dyn JoinHashMapType,
offset: usize,
random_state: &RandomState,
hashes_buffer: &mut Vec<u64>,
deleted_offset: usize,
fifo_hashmap: bool,
) -> Result<()>
where
T: JoinHashMapType,
{
) -> Result<()> {
// evaluate the keys
let keys_values = on
.iter()
Expand All @@ -1084,9 +1093,9 @@ where
.map(|(i, val)| (i + offset, val));

if fifo_hashmap {
hash_map.update_from_iter(hash_values_iter.rev(), deleted_offset);
hash_map.update_from_iter(Box::new(hash_values_iter.rev()), deleted_offset);
} else {
hash_map.update_from_iter(hash_values_iter, deleted_offset);
hash_map.update_from_iter(Box::new(hash_values_iter), deleted_offset);
}

Ok(())
Expand Down Expand Up @@ -1298,7 +1307,7 @@ impl RecordBatchStream for HashJoinStream {
/// ```
#[allow(clippy::too_many_arguments)]
fn lookup_join_hashmap(
build_hashmap: &JoinHashMap,
build_hashmap: &dyn JoinHashMapType,
build_side_values: &[ArrayRef],
probe_side_values: &[ArrayRef],
null_equality: NullEquality,
Expand Down Expand Up @@ -3569,7 +3578,7 @@ mod tests {
}

#[test]
fn join_with_hash_collision() -> Result<()> {
fn join_with_hash_collisions_64() -> Result<()> {
let mut hashmap_left = HashTable::with_capacity(4);
let left = build_table_i32(
("a", &vec![10, 20]),
Expand Down Expand Up @@ -3606,7 +3615,7 @@ mod tests {
// Join key column for both join sides
let key_column: PhysicalExprRef = Arc::new(Column::new("a", 0)) as _;

let join_hash_map = JoinHashMap::new(hashmap_left, next);
let join_hash_map = JoinHashMapU64::new(hashmap_left, next);

let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?;
let right_keys_values =
Expand Down Expand Up @@ -3639,6 +3648,70 @@ mod tests {
Ok(())
}

#[test]
fn join_with_hash_collisions_u32() -> Result<()> {
let mut hashmap_left = HashTable::with_capacity(4);
let left = build_table_i32(
("a", &vec![10, 20]),
("x", &vec![100, 200]),
("y", &vec![200, 300]),
);

let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut vec![0; left.num_rows()];
let hashes = create_hashes(
&[Arc::clone(&left.columns()[0])],
&random_state,
hashes_buff,
)?;

hashmap_left.insert_unique(hashes[0], (hashes[0], 1u32), |(h, _)| *h);
hashmap_left.insert_unique(hashes[0], (hashes[0], 2u32), |(h, _)| *h);
hashmap_left.insert_unique(hashes[1], (hashes[1], 1u32), |(h, _)| *h);
hashmap_left.insert_unique(hashes[1], (hashes[1], 2u32), |(h, _)| *h);

let next: Vec<u32> = vec![2, 0];

let right = build_table_i32(
("a", &vec![10, 20]),
("b", &vec![0, 0]),
("c", &vec![30, 40]),
);

let key_column: PhysicalExprRef = Arc::new(Column::new("a", 0)) as _;

let join_hash_map = JoinHashMapU32::new(hashmap_left, next);

let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?;
let right_keys_values =
key_column.evaluate(&right)?.into_array(right.num_rows())?;
let mut hashes_buffer = vec![0; right.num_rows()];
create_hashes(
&[Arc::clone(&right_keys_values)],
&random_state,
&mut hashes_buffer,
)?;

let (l, r, _) = lookup_join_hashmap(
&join_hash_map,
&[left_keys_values],
&[right_keys_values],
NullEquality::NullEqualsNothing,
&hashes_buffer,
8192,
(0, None),
)?;

// We still expect to match rows 0 and 1 on both sides
let left_ids: UInt64Array = vec![0, 1].into();
let right_ids: UInt32Array = vec![0, 1].into();

assert_eq!(left_ids, l);
assert_eq!(right_ids, r);

Ok(())
}

#[tokio::test]
async fn join_with_duplicated_column_names() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
Expand Down
Loading