From 31d371695478839b3112b788ec1966142a6fc0ff Mon Sep 17 00:00:00 2001 From: suremarc <8771538+suremarc@users.noreply.github.com> Date: Thu, 7 Nov 2024 18:11:05 +0000 Subject: [PATCH] make MinMaxStatistics only care about sorting column statistics --- datafusion/physical-plan/src/statistics.rs | 66 ++++++++++++---------- 1 file changed, 36 insertions(+), 30 deletions(-) diff --git a/datafusion/physical-plan/src/statistics.rs b/datafusion/physical-plan/src/statistics.rs index 6ebcbe7c7c38..13a2d35516a4 100644 --- a/datafusion/physical-plan/src/statistics.rs +++ b/datafusion/physical-plan/src/statistics.rs @@ -73,11 +73,37 @@ impl MinMaxStatistics { ) -> Result { use datafusion_common::ScalarValue; - let statistics = statistics.into_iter().collect::>(); + let sort_columns = sort_columns_from_physical_sort_exprs(sort_order).ok_or( + DataFusionError::Plan("sort expression must be on column".to_string()), + )?; + + let projection = sort_columns.iter().map(|c| c.index()).collect::>(); + + // Project the schema & sort order down to just the relevant columns + + let projected_schema = Arc::new(schema.project(&projection)?); + + let projected_sort_order = LexOrdering { + inner: sort_columns + .iter() + .zip(sort_order.iter()) + .enumerate() + .map(|(i, (col, sort))| PhysicalSortExpr { + expr: Arc::new(Column::new(col.name(), i)), + options: sort.options, + }) + .collect::>(), + }; - // Helper function to get min/max statistics for a given column of projected_schema + let projected_statistics = statistics + .into_iter() + .cloned() + .map(|s| s.project(Some(&projection))) + .collect::>(); + + // Helper function to get min/max statistics let get_min_max = |i: usize| -> Result<(Vec, Vec)> { - Ok(statistics + Ok(projected_statistics .iter() .map(|s| { s.column_statistics[i] @@ -94,31 +120,11 @@ impl MinMaxStatistics { .unzip()) }; - let sort_columns = sort_columns_from_physical_sort_exprs(sort_order).ok_or( - DataFusionError::Plan("sort expression must be on column".to_string()), - )?; - - // Project the schema & sort order down to just the relevant columns - let min_max_schema = Arc::new( - schema - .project(&(sort_columns.iter().map(|c| c.index()).collect::>()))?, - ); - let min_max_sort_order = LexOrdering { - inner: sort_columns - .iter() - .zip(sort_order.iter()) - .enumerate() - .map(|(i, (col, sort))| PhysicalSortExpr { - expr: Arc::new(Column::new(col.name(), i)), - options: sort.options, - }) - .collect::>(), - }; - let (min_values, max_values): (Vec<_>, Vec<_>) = sort_columns .iter() - .map(|c| { - let (min, max) = get_min_max(c.index()).map_err(|e| { + .enumerate() + .map(|(i, c)| { + let (min, max) = get_min_max(i).map_err(|e| { e.context(format!("get min/max for column: '{}'", c.name())) })?; Ok(( @@ -132,14 +138,14 @@ impl MinMaxStatistics { .unzip(); Self::new( - &min_max_sort_order, - &min_max_schema, - RecordBatch::try_new(Arc::clone(&min_max_schema), min_values).map_err( + &projected_sort_order, + &projected_schema, + RecordBatch::try_new(Arc::clone(&projected_schema), min_values).map_err( |e| { DataFusionError::ArrowError(e, Some("\ncreate min batch".to_string())) }, )?, - RecordBatch::try_new(Arc::clone(&min_max_schema), max_values).map_err( + RecordBatch::try_new(Arc::clone(&projected_schema), max_values).map_err( |e| { DataFusionError::ArrowError(e, Some("\ncreate max batch".to_string())) },