Skip to content

Commit

Permalink
Reuse on expressions values in HashJoinExec (#14131)
Browse files Browse the repository at this point in the history
* Reduce duplicated build side experssions evaluations in HashJoinExec

* Reuse probe side on expressions values
  • Loading branch information
lewiszlw authored Jan 16, 2025
1 parent 3906c04 commit 50c7977
Showing 1 changed file with 35 additions and 30 deletions.
65 changes: 35 additions & 30 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ struct JoinLeftData {
hash_map: JoinHashMap,
/// The input rows for the build side
batch: RecordBatch,
/// The build side on expressions values
values: Vec<ArrayRef>,
/// Shared bitmap builder for visited left indices
visited_indices_bitmap: SharedBitmapBuilder,
/// Counter of running probe-threads, potentially
Expand All @@ -104,13 +106,15 @@ impl JoinLeftData {
fn new(
hash_map: JoinHashMap,
batch: RecordBatch,
values: Vec<ArrayRef>,
visited_indices_bitmap: SharedBitmapBuilder,
probe_threads_counter: AtomicUsize,
reservation: MemoryReservation,
) -> Self {
Self {
hash_map,
batch,
values,
visited_indices_bitmap,
probe_threads_counter,
_reservation: reservation,
Expand All @@ -127,6 +131,11 @@ impl JoinLeftData {
&self.batch
}

/// returns a reference to the build side expressions values
fn values(&self) -> &[ArrayRef] {
&self.values
}

/// returns a reference to the visited indices bitmap
fn visited_indices_bitmap(&self) -> &SharedBitmapBuilder {
&self.visited_indices_bitmap
Expand Down Expand Up @@ -853,7 +862,6 @@ impl ExecutionPlan for HashJoinExec {

Ok(Box::pin(HashJoinStream {
schema: self.schema(),
on_left,
on_right,
filter: self.filter.clone(),
join_type: self.join_type,
Expand Down Expand Up @@ -984,9 +992,18 @@ async fn collect_left_input(
BooleanBufferBuilder::new(0)
};

let left_values = on_left
.iter()
.map(|c| {
c.evaluate(&single_batch)?
.into_array(single_batch.num_rows())
})
.collect::<Result<Vec<_>>>()?;

let data = JoinLeftData::new(
hashmap,
single_batch,
left_values,
Mutex::new(visited_indices_bitmap),
AtomicUsize::new(probe_threads_count),
reservation,
Expand Down Expand Up @@ -1136,6 +1153,8 @@ impl HashJoinStreamState {
struct ProcessProbeBatchState {
/// Current probe-side batch
batch: RecordBatch,
/// Probe-side on expressions values
values: Vec<ArrayRef>,
/// Starting offset for JoinHashMap lookups
offset: JoinHashMapOffset,
/// Max joined probe-side index from current batch
Expand All @@ -1162,8 +1181,6 @@ impl ProcessProbeBatchState {
struct HashJoinStream {
/// Input schema
schema: Arc<Schema>,
/// equijoin columns from the left (build side)
on_left: Vec<PhysicalExprRef>,
/// equijoin columns from the right (probe side)
on_right: Vec<PhysicalExprRef>,
/// optional join filter
Expand Down Expand Up @@ -1249,27 +1266,13 @@ impl RecordBatchStream for HashJoinStream {
#[allow(clippy::too_many_arguments)]
fn lookup_join_hashmap(
build_hashmap: &JoinHashMap,
build_input_buffer: &RecordBatch,
probe_batch: &RecordBatch,
build_on: &[PhysicalExprRef],
probe_on: &[PhysicalExprRef],
build_side_values: &[ArrayRef],
probe_side_values: &[ArrayRef],
null_equals_null: bool,
hashes_buffer: &[u64],
limit: usize,
offset: JoinHashMapOffset,
) -> Result<(UInt64Array, UInt32Array, Option<JoinHashMapOffset>)> {
let keys_values = probe_on
.iter()
.map(|c| c.evaluate(probe_batch)?.into_array(probe_batch.num_rows()))
.collect::<Result<Vec<_>>>()?;
let build_join_values = build_on
.iter()
.map(|c| {
c.evaluate(build_input_buffer)?
.into_array(build_input_buffer.num_rows())
})
.collect::<Result<Vec<_>>>()?;

let (probe_indices, build_indices, next_offset) = build_hashmap
.get_matched_indices_with_limit_offset(hashes_buffer, None, limit, offset);

Expand All @@ -1279,8 +1282,8 @@ fn lookup_join_hashmap(
let (build_indices, probe_indices) = equal_rows_arr(
&build_indices,
&probe_indices,
&build_join_values,
&keys_values,
build_side_values,
probe_side_values,
null_equals_null,
)?;

Expand Down Expand Up @@ -1430,6 +1433,7 @@ impl HashJoinStream {
self.state =
HashJoinStreamState::ProcessProbeBatch(ProcessProbeBatchState {
batch,
values: keys_values,
offset: (0, None),
joined_probe_idx: None,
});
Expand All @@ -1454,10 +1458,8 @@ impl HashJoinStream {
// get the matched by join keys indices
let (left_indices, right_indices, next_offset) = lookup_join_hashmap(
build_side.left_data.hash_map(),
build_side.left_data.batch(),
&state.batch,
&self.on_left,
&self.on_right,
build_side.left_data.values(),
&state.values,
self.null_equals_null,
&self.hashes_buffer,
self.batch_size,
Expand Down Expand Up @@ -3297,17 +3299,20 @@ mod tests {

let join_hash_map = JoinHashMap::new(hashmap_left, next);

let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?;
let right_keys_values =
key_column.evaluate(&right)?.into_array(right.num_rows())?;
let mut hashes_buffer = vec![0; right.num_rows()];
create_hashes(&[right_keys_values], &random_state, &mut hashes_buffer)?;
create_hashes(
&[Arc::clone(&right_keys_values)],
&random_state,
&mut hashes_buffer,
)?;

let (l, r, _) = lookup_join_hashmap(
&join_hash_map,
&left,
&right,
&[Arc::clone(&key_column)],
&[key_column],
&[left_keys_values],
&[right_keys_values],
false,
&hashes_buffer,
8192,
Expand Down

0 comments on commit 50c7977

Please sign in to comment.