diff --git a/datafusion/physical-expr/src/aggregate/count_distinct/mod.rs b/datafusion/physical-expr/src/aggregate/count_distinct/mod.rs index ee63945eb249..ae3370df723a 100644 --- a/datafusion/physical-expr/src/aggregate/count_distinct/mod.rs +++ b/datafusion/physical-expr/src/aggregate/count_distinct/mod.rs @@ -99,55 +99,70 @@ impl AggregateExpr for DistinctCount { use DataType::*; use TimeUnit::*; - Ok(match &self.state_data_type { + let data_type = &self.state_data_type; + Ok(match data_type { // try and use a specialized accumulator if possible, otherwise fall back to generic accumulator - Int8 => Box::new(PrimitiveDistinctCountAccumulator::::new()), - Int16 => Box::new(PrimitiveDistinctCountAccumulator::::new()), - Int32 => Box::new(PrimitiveDistinctCountAccumulator::::new()), - Int64 => Box::new(PrimitiveDistinctCountAccumulator::::new()), - UInt8 => Box::new(PrimitiveDistinctCountAccumulator::::new()), - UInt16 => Box::new(PrimitiveDistinctCountAccumulator::::new()), - UInt32 => Box::new(PrimitiveDistinctCountAccumulator::::new()), - UInt64 => Box::new(PrimitiveDistinctCountAccumulator::::new()), - dt @ Decimal128(_, _) => Box::new( - PrimitiveDistinctCountAccumulator::::new() - .with_data_type(dt.clone()), - ), - dt @ Decimal256(_, _) => Box::new( - PrimitiveDistinctCountAccumulator::::new() - .with_data_type(dt.clone()), - ), - - Date32 => Box::new(PrimitiveDistinctCountAccumulator::::new()), - Date64 => Box::new(PrimitiveDistinctCountAccumulator::::new()), + Int8 => Box::new(PrimitiveDistinctCountAccumulator::::new( + data_type, + )), + Int16 => Box::new(PrimitiveDistinctCountAccumulator::::new( + data_type, + )), + Int32 => Box::new(PrimitiveDistinctCountAccumulator::::new( + data_type, + )), + Int64 => Box::new(PrimitiveDistinctCountAccumulator::::new( + data_type, + )), + UInt8 => Box::new(PrimitiveDistinctCountAccumulator::::new( + data_type, + )), + UInt16 => Box::new(PrimitiveDistinctCountAccumulator::::new( + data_type, + )), + UInt32 => Box::new(PrimitiveDistinctCountAccumulator::::new( + data_type, + )), + UInt64 => Box::new(PrimitiveDistinctCountAccumulator::::new( + data_type, + )), + Decimal128(_, _) => Box::new(PrimitiveDistinctCountAccumulator::< + Decimal128Type, + >::new(data_type)), + Decimal256(_, _) => Box::new(PrimitiveDistinctCountAccumulator::< + Decimal256Type, + >::new(data_type)), + + Date32 => Box::new(PrimitiveDistinctCountAccumulator::::new( + data_type, + )), + Date64 => Box::new(PrimitiveDistinctCountAccumulator::::new( + data_type, + )), Time32(Millisecond) => Box::new(PrimitiveDistinctCountAccumulator::< Time32MillisecondType, - >::new()), - Time32(Second) => { - Box::new(PrimitiveDistinctCountAccumulator::::new()) - } + >::new(data_type)), + Time32(Second) => Box::new(PrimitiveDistinctCountAccumulator::< + Time32SecondType, + >::new(data_type)), Time64(Microsecond) => Box::new(PrimitiveDistinctCountAccumulator::< Time64MicrosecondType, - >::new()), - Time64(Nanosecond) => { - Box::new(PrimitiveDistinctCountAccumulator::::new()) - } - dt @ Timestamp(Microsecond, _) => Box::new( - PrimitiveDistinctCountAccumulator::::new() - .with_data_type(dt.clone()), - ), - dt @ Timestamp(Millisecond, _) => Box::new( - PrimitiveDistinctCountAccumulator::::new() - .with_data_type(dt.clone()), - ), - dt @ Timestamp(Nanosecond, _) => Box::new( - PrimitiveDistinctCountAccumulator::::new() - .with_data_type(dt.clone()), - ), - dt @ Timestamp(Second, _) => Box::new( - PrimitiveDistinctCountAccumulator::::new() - .with_data_type(dt.clone()), - ), + >::new(data_type)), + Time64(Nanosecond) => Box::new(PrimitiveDistinctCountAccumulator::< + Time64NanosecondType, + >::new(data_type)), + Timestamp(Microsecond, _) => Box::new(PrimitiveDistinctCountAccumulator::< + TimestampMicrosecondType, + >::new(data_type)), + Timestamp(Millisecond, _) => Box::new(PrimitiveDistinctCountAccumulator::< + TimestampMillisecondType, + >::new(data_type)), + Timestamp(Nanosecond, _) => Box::new(PrimitiveDistinctCountAccumulator::< + TimestampNanosecondType, + >::new(data_type)), + Timestamp(Second, _) => Box::new(PrimitiveDistinctCountAccumulator::< + TimestampSecondType, + >::new(data_type)), Float16 => Box::new(FloatDistinctCountAccumulator::::new()), Float32 => Box::new(FloatDistinctCountAccumulator::::new()), diff --git a/datafusion/physical-expr/src/aggregate/count_distinct/native.rs b/datafusion/physical-expr/src/aggregate/count_distinct/native.rs index 8f3ce8acfe07..97ff1ef257b4 100644 --- a/datafusion/physical-expr/src/aggregate/count_distinct/native.rs +++ b/datafusion/physical-expr/src/aggregate/count_distinct/native.rs @@ -54,17 +54,12 @@ where T: ArrowPrimitiveType + Send, T::Native: Eq + Hash, { - pub(super) fn new() -> Self { + pub(super) fn new(data_type: &DataType) -> Self { Self { values: HashSet::default(), - data_type: T::DATA_TYPE, + data_type: data_type.clone(), } } - - pub(super) fn with_data_type(mut self, data_type: DataType) -> Self { - self.data_type = data_type; - self - } } impl Accumulator for PrimitiveDistinctCountAccumulator