Skip to content

Commit

Permalink
For review
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Aug 20, 2024
1 parent 8854571 commit f98693e
Showing 1 changed file with 4 additions and 2 deletions.
6 changes: 4 additions & 2 deletions datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,8 @@ struct BufferedBatch {
/// Size estimation used for reserving / releasing memory
pub size_estimation: usize,
/// The indices of buffered batch that failed the join filter.
/// This is a map between buffered row index and a boolean value indicating whether all joined row
/// of the buffered row failed the join filter.
/// When dequeuing the buffered batch, we need to produce null joined rows for these indices.
pub join_filter_failed_map: HashMap<u64, bool>,
/// Current buffered batch number of rows. Equal to batch.num_rows()
Expand Down Expand Up @@ -1232,14 +1234,14 @@ impl SMJStream {
// For buffered row which is joined with streamed side rows but all joined rows
// don't satisfy the join filter
if output_not_matched_filter {
let buffered_indices = buffered_batch
let not_matched_buffered_indices = buffered_batch
.join_filter_failed_map
.iter()
.filter_map(|(idx, failed)| if *failed { Some(*idx) } else { None })
.collect::<Vec<_>>();

let buffered_indices = UInt64Array::from_iter_values(
buffered_indices.iter().copied(),
not_matched_buffered_indices.iter().copied(),
);

if let Some(record_batch) = produce_buffered_null_batch(
Expand Down

0 comments on commit f98693e

Please sign in to comment.