Skip to content

Commit

Permalink
Remove unreachable filter logic in final grouping stage (apache#13463)
Browse files Browse the repository at this point in the history
* rm filtere in final grouping stage

Signed-off-by: jayzhan211 <[email protected]>

* add comment

Signed-off-by: jayzhan211 <[email protected]>

---------

Signed-off-by: jayzhan211 <[email protected]>
  • Loading branch information
jayzhan211 authored Nov 19, 2024
1 parent 1a09adf commit f302383
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 53 deletions.
24 changes: 7 additions & 17 deletions datafusion/functions-aggregate/src/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,8 @@ impl GroupsAccumulator for CountGroupsAccumulator {
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
// Since aggregate filter should be applied in partial stage, in final stage there should be no filter
_opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
assert_eq!(values.len(), 1, "one argument to merge_batch");
Expand All @@ -480,22 +481,11 @@ impl GroupsAccumulator for CountGroupsAccumulator {

// Adds the counts with the partial counts
self.counts.resize(total_num_groups, 0);
match opt_filter {
Some(filter) => filter
.iter()
.zip(group_indices.iter())
.zip(partial_counts.iter())
.for_each(|((filter_value, &group_index), partial_count)| {
if let Some(true) = filter_value {
self.counts[group_index] += partial_count;
}
}),
None => group_indices.iter().zip(partial_counts.iter()).for_each(
|(&group_index, partial_count)| {
self.counts[group_index] += partial_count;
},
),
}
group_indices.iter().zip(partial_counts.iter()).for_each(
|(&group_index, partial_count)| {
self.counts[group_index] += partial_count;
},
);

Ok(())
}
Expand Down
42 changes: 12 additions & 30 deletions datafusion/functions-aggregate/src/variance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ impl VarianceGroupsAccumulator {
counts: &UInt64Array,
means: &Float64Array,
m2s: &Float64Array,
opt_filter: Option<&BooleanArray>,
_opt_filter: Option<&BooleanArray>,
mut value_fn: F,
) where
F: FnMut(usize, u64, f64, f64) + Send,
Expand All @@ -469,33 +469,14 @@ impl VarianceGroupsAccumulator {
assert_eq!(means.null_count(), 0);
assert_eq!(m2s.null_count(), 0);

match opt_filter {
None => {
group_indices
.iter()
.zip(counts.values().iter())
.zip(means.values().iter())
.zip(m2s.values().iter())
.for_each(|(((&group_index, &count), &mean), &m2)| {
value_fn(group_index, count, mean, m2);
});
}
Some(filter) => {
group_indices
.iter()
.zip(counts.values().iter())
.zip(means.values().iter())
.zip(m2s.values().iter())
.zip(filter.iter())
.for_each(
|((((&group_index, &count), &mean), &m2), filter_value)| {
if let Some(true) = filter_value {
value_fn(group_index, count, mean, m2);
}
},
);
}
}
group_indices
.iter()
.zip(counts.values().iter())
.zip(means.values().iter())
.zip(m2s.values().iter())
.for_each(|(((&group_index, &count), &mean), &m2)| {
value_fn(group_index, count, mean, m2);
});
}

pub fn variance(
Expand Down Expand Up @@ -554,7 +535,8 @@ impl GroupsAccumulator for VarianceGroupsAccumulator {
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
// Since aggregate filter should be applied in partial stage, in final stage there should be no filter
_opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
assert_eq!(values.len(), 3, "two arguments to merge_batch");
Expand All @@ -569,7 +551,7 @@ impl GroupsAccumulator for VarianceGroupsAccumulator {
partial_counts,
partial_means,
partial_m2s,
opt_filter,
None,
|group_index, partial_count, partial_mean, partial_m2| {
if partial_count == 0 {
return;
Expand Down
11 changes: 5 additions & 6 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -859,14 +859,13 @@ impl GroupedHashAggregateStream {
)?;
}
_ => {
if opt_filter.is_some() {
return internal_err!("aggregate filter should be applied in partial stage, there should be no filter in final stage");
}

// if aggregation is over intermediate states,
// use merge
acc.merge_batch(
values,
group_indices,
opt_filter,
total_num_groups,
)?;
acc.merge_batch(values, group_indices, None, total_num_groups)?;
}
}
}
Expand Down

0 comments on commit f302383

Please sign in to comment.