From f3023835ebb913208816568bedef225e45a16d0b Mon Sep 17 00:00:00 2001 From: Jay Zhan Date: Tue, 19 Nov 2024 11:35:14 +0800 Subject: [PATCH] Remove unreachable filter logic in final grouping stage (#13463) * rm filtere in final grouping stage Signed-off-by: jayzhan211 * add comment Signed-off-by: jayzhan211 --------- Signed-off-by: jayzhan211 --- datafusion/functions-aggregate/src/count.rs | 24 ++++------- .../functions-aggregate/src/variance.rs | 42 ++++++------------- .../physical-plan/src/aggregates/row_hash.rs | 11 +++-- 3 files changed, 24 insertions(+), 53 deletions(-) diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index 52181372698f..8fdd702b5b7c 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -467,7 +467,8 @@ impl GroupsAccumulator for CountGroupsAccumulator { &mut self, values: &[ArrayRef], group_indices: &[usize], - opt_filter: Option<&BooleanArray>, + // Since aggregate filter should be applied in partial stage, in final stage there should be no filter + _opt_filter: Option<&BooleanArray>, total_num_groups: usize, ) -> Result<()> { assert_eq!(values.len(), 1, "one argument to merge_batch"); @@ -480,22 +481,11 @@ impl GroupsAccumulator for CountGroupsAccumulator { // Adds the counts with the partial counts self.counts.resize(total_num_groups, 0); - match opt_filter { - Some(filter) => filter - .iter() - .zip(group_indices.iter()) - .zip(partial_counts.iter()) - .for_each(|((filter_value, &group_index), partial_count)| { - if let Some(true) = filter_value { - self.counts[group_index] += partial_count; - } - }), - None => group_indices.iter().zip(partial_counts.iter()).for_each( - |(&group_index, partial_count)| { - self.counts[group_index] += partial_count; - }, - ), - } + group_indices.iter().zip(partial_counts.iter()).for_each( + |(&group_index, partial_count)| { + self.counts[group_index] += partial_count; + }, + ); Ok(()) } diff --git a/datafusion/functions-aggregate/src/variance.rs b/datafusion/functions-aggregate/src/variance.rs index 8daa85a5cc83..55d4181a96df 100644 --- a/datafusion/functions-aggregate/src/variance.rs +++ b/datafusion/functions-aggregate/src/variance.rs @@ -460,7 +460,7 @@ impl VarianceGroupsAccumulator { counts: &UInt64Array, means: &Float64Array, m2s: &Float64Array, - opt_filter: Option<&BooleanArray>, + _opt_filter: Option<&BooleanArray>, mut value_fn: F, ) where F: FnMut(usize, u64, f64, f64) + Send, @@ -469,33 +469,14 @@ impl VarianceGroupsAccumulator { assert_eq!(means.null_count(), 0); assert_eq!(m2s.null_count(), 0); - match opt_filter { - None => { - group_indices - .iter() - .zip(counts.values().iter()) - .zip(means.values().iter()) - .zip(m2s.values().iter()) - .for_each(|(((&group_index, &count), &mean), &m2)| { - value_fn(group_index, count, mean, m2); - }); - } - Some(filter) => { - group_indices - .iter() - .zip(counts.values().iter()) - .zip(means.values().iter()) - .zip(m2s.values().iter()) - .zip(filter.iter()) - .for_each( - |((((&group_index, &count), &mean), &m2), filter_value)| { - if let Some(true) = filter_value { - value_fn(group_index, count, mean, m2); - } - }, - ); - } - } + group_indices + .iter() + .zip(counts.values().iter()) + .zip(means.values().iter()) + .zip(m2s.values().iter()) + .for_each(|(((&group_index, &count), &mean), &m2)| { + value_fn(group_index, count, mean, m2); + }); } pub fn variance( @@ -554,7 +535,8 @@ impl GroupsAccumulator for VarianceGroupsAccumulator { &mut self, values: &[ArrayRef], group_indices: &[usize], - opt_filter: Option<&BooleanArray>, + // Since aggregate filter should be applied in partial stage, in final stage there should be no filter + _opt_filter: Option<&BooleanArray>, total_num_groups: usize, ) -> Result<()> { assert_eq!(values.len(), 3, "two arguments to merge_batch"); @@ -569,7 +551,7 @@ impl GroupsAccumulator for VarianceGroupsAccumulator { partial_counts, partial_means, partial_m2s, - opt_filter, + None, |group_index, partial_count, partial_mean, partial_m2| { if partial_count == 0 { return; diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 0fa9f206f13d..965adbb8c780 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -859,14 +859,13 @@ impl GroupedHashAggregateStream { )?; } _ => { + if opt_filter.is_some() { + return internal_err!("aggregate filter should be applied in partial stage, there should be no filter in final stage"); + } + // if aggregation is over intermediate states, // use merge - acc.merge_batch( - values, - group_indices, - opt_filter, - total_num_groups, - )?; + acc.merge_batch(values, group_indices, None, total_num_groups)?; } } }