From ec173abed02a47d434a62cc78fe8b7fcf7d1e11d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 10 Jan 2025 09:01:42 -0500 Subject: [PATCH] Refactor into `LexOrdering::collapse`, `LexRequirement::collapse` avoid clone (#14038) * Move collapse_lex_ordering to Lexordering::collapse * reduce diff * avoid clone, cleanup * Introduce LexRequirement::collapse * Improve performance of collapse, from @akurmustafa https://github.com/alamb/datafusion/pull/26 fix formatting * Revert "Improve performance of collapse, from @akurmustafa" This reverts commit a44acfdb3af5bf0082c277de6ee7e09e92251a49. * remove incorrect comment --------- Co-authored-by: Mustafa Akur --- .../physical-expr-common/src/sort_expr.rs | 31 +++++++++++++++++++ .../physical-expr/src/equivalence/class.rs | 7 +++-- .../physical-expr/src/equivalence/mod.rs | 15 ++++----- .../physical-expr/src/equivalence/ordering.rs | 22 +++++-------- .../src/equivalence/properties.rs | 21 +++++-------- .../physical-plan/src/aggregates/mod.rs | 9 +++--- datafusion/physical-plan/src/windows/mod.rs | 5 ++- 7 files changed, 62 insertions(+), 48 deletions(-) diff --git a/datafusion/physical-expr-common/src/sort_expr.rs b/datafusion/physical-expr-common/src/sort_expr.rs index 8395d3e5263d..63397e69c09d 100644 --- a/datafusion/physical-expr-common/src/sort_expr.rs +++ b/datafusion/physical-expr-common/src/sort_expr.rs @@ -409,6 +409,22 @@ impl LexOrdering { .map(PhysicalSortExpr::from) .collect() } + + /// Collapse a `LexOrdering` into a new duplicate-free `LexOrdering` based on expression. + /// + /// This function filters duplicate entries that have same physical + /// expression inside, ignoring [`SortOptions`]. For example: + /// + /// `vec![a ASC, a DESC]` collapses to `vec![a ASC]`. + pub fn collapse(self) -> Self { + let mut output = LexOrdering::default(); + for item in self { + if !output.iter().any(|req| req.expr.eq(&item.expr)) { + output.push(item); + } + } + output + } } impl From> for LexOrdering { @@ -540,6 +556,21 @@ impl LexRequirement { .collect(), ) } + + /// Constructs a duplicate-free `LexOrderingReq` by filtering out + /// duplicate entries that have same physical expression inside. + /// + /// For example, `vec![a Some(ASC), a Some(DESC)]` collapses to `vec![a + /// Some(ASC)]`. + pub fn collapse(self) -> Self { + let mut output = Vec::::new(); + for item in self { + if !output.iter().any(|req| req.expr.eq(&item.expr)) { + output.push(item); + } + } + LexRequirement::new(output) + } } impl From for LexRequirement { diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index cb11409479a8..5c749a1a5a6e 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use super::{add_offset_to_expr, collapse_lex_req, ProjectionMapping}; +use super::{add_offset_to_expr, ProjectionMapping}; use crate::{ expressions::Column, LexOrdering, LexRequirement, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr, PhysicalSortRequirement, @@ -526,12 +526,13 @@ impl EquivalenceGroup { &self, sort_reqs: &LexRequirement, ) -> LexRequirement { - collapse_lex_req(LexRequirement::new( + LexRequirement::new( sort_reqs .iter() .map(|sort_req| self.normalize_sort_requirement(sort_req.clone())) .collect(), - )) + ) + .collapse() } /// Projects `expr` according to the given projection mapping. diff --git a/datafusion/physical-expr/src/equivalence/mod.rs b/datafusion/physical-expr/src/equivalence/mod.rs index b50633d777f7..a5b85064e625 100644 --- a/datafusion/physical-expr/src/equivalence/mod.rs +++ b/datafusion/physical-expr/src/equivalence/mod.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use crate::expressions::Column; -use crate::{LexRequirement, PhysicalExpr, PhysicalSortRequirement}; +use crate::{LexRequirement, PhysicalExpr}; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; @@ -41,14 +41,9 @@ pub use properties::{ /// It will also filter out entries that are ordered if the next entry is; /// for instance, `vec![floor(a) Some(ASC), a Some(ASC)]` will be collapsed to /// `vec![a Some(ASC)]`. +#[deprecated(since = "45.0.0", note = "Use LexRequirement::collapse")] pub fn collapse_lex_req(input: LexRequirement) -> LexRequirement { - let mut output = Vec::::new(); - for item in input { - if !output.iter().any(|req| req.expr.eq(&item.expr)) { - output.push(item); - } - } - LexRequirement::new(output) + input.collapse() } /// Adds the `offset` value to `Column` indices inside `expr`. This function is @@ -80,7 +75,9 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use arrow_schema::{SchemaRef, SortOptions}; use datafusion_common::{plan_datafusion_err, Result}; - use datafusion_physical_expr_common::sort_expr::LexOrdering; + use datafusion_physical_expr_common::sort_expr::{ + LexOrdering, PhysicalSortRequirement, + }; pub fn output_schema( mapping: &ProjectionMapping, diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs b/datafusion/physical-expr/src/equivalence/ordering.rs index 5dfa1b08f366..ae502d4d5f67 100644 --- a/datafusion/physical-expr/src/equivalence/ordering.rs +++ b/datafusion/physical-expr/src/equivalence/ordering.rs @@ -159,8 +159,13 @@ impl OrderingEquivalenceClass { /// Returns the concatenation of all the orderings. This enables merge /// operations to preserve all equivalent orderings simultaneously. pub fn output_ordering(&self) -> Option { - let output_ordering = self.orderings.iter().flatten().cloned().collect(); - let output_ordering = collapse_lex_ordering(output_ordering); + let output_ordering = self + .orderings + .iter() + .flatten() + .cloned() + .collect::() + .collapse(); (!output_ordering.is_empty()).then_some(output_ordering) } @@ -221,19 +226,6 @@ impl IntoIterator for OrderingEquivalenceClass { } } -/// This function constructs a duplicate-free `LexOrdering` by filtering out -/// duplicate entries that have same physical expression inside. For example, -/// `vec![a ASC, a DESC]` collapses to `vec![a ASC]`. -pub fn collapse_lex_ordering(input: LexOrdering) -> LexOrdering { - let mut output = LexOrdering::default(); - for item in input.iter() { - if !output.iter().any(|req| req.expr.eq(&item.expr)) { - output.push(item.clone()); - } - } - output -} - /// Trims `orderings[idx]` if some suffix of it overlaps with a prefix of /// `orderings[pre_idx]`. Returns `true` if there is any overlap, `false` otherwise. fn resolve_overlap(orderings: &mut [LexOrdering], idx: usize, pre_idx: usize) -> bool { diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 4f440416c457..2c7335649b28 100755 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -22,11 +22,9 @@ use std::slice::Iter; use std::sync::Arc; use std::{fmt, mem}; -use super::ordering::collapse_lex_ordering; use crate::equivalence::class::{const_exprs_contains, AcrossPartitions}; use crate::equivalence::{ - collapse_lex_req, EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass, - ProjectionMapping, + EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping, }; use crate::expressions::{with_new_schema, CastExpr, Column, Literal}; use crate::{ @@ -505,15 +503,12 @@ impl EquivalenceProperties { ); let constants_normalized = self.eq_group.normalize_exprs(constant_exprs); // Prune redundant sections in the requirement: - collapse_lex_req( - normalized_sort_reqs - .iter() - .filter(|&order| { - !physical_exprs_contains(&constants_normalized, &order.expr) - }) - .cloned() - .collect(), - ) + normalized_sort_reqs + .iter() + .filter(|&order| !physical_exprs_contains(&constants_normalized, &order.expr)) + .cloned() + .collect::() + .collapse() } /// Checks whether the given ordering is satisfied by any of the existing @@ -915,7 +910,7 @@ impl EquivalenceProperties { // Simplify each ordering by removing redundant sections: orderings .chain(projected_orderings) - .map(collapse_lex_ordering) + .map(|lex_ordering| lex_ordering.collapse()) .collect() } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 4812fa41347d..52fd6f90e595 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -44,10 +44,9 @@ use datafusion_execution::TaskContext; use datafusion_expr::{Accumulator, Aggregate}; use datafusion_physical_expr::aggregate::AggregateFunctionExpr; use datafusion_physical_expr::{ - equivalence::{collapse_lex_req, ProjectionMapping}, - expressions::Column, - physical_exprs_contains, EquivalenceProperties, LexOrdering, LexRequirement, - PhysicalExpr, PhysicalSortRequirement, + equivalence::ProjectionMapping, expressions::Column, physical_exprs_contains, + EquivalenceProperties, LexOrdering, LexRequirement, PhysicalExpr, + PhysicalSortRequirement, }; use itertools::Itertools; @@ -473,7 +472,7 @@ impl AggregateExec { &mode, )?; new_requirement.inner.extend(req); - new_requirement = collapse_lex_req(new_requirement); + new_requirement = new_requirement.collapse(); // If our aggregation has grouping sets then our base grouping exprs will // be expanded based on the flags in `group_by.groups` where for each diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 36c4b9f18da9..510cbc248b63 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -32,7 +32,6 @@ use datafusion_expr::{ PartitionEvaluator, ReversedUDWF, WindowFrame, WindowFunctionDefinition, WindowUDF, }; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; -use datafusion_physical_expr::equivalence::collapse_lex_req; use datafusion_physical_expr::{ reverse_order_bys, window::{SlidingAggregateWindowExpr, StandardWindowFunctionExpr}, @@ -469,8 +468,8 @@ pub fn get_window_mode( { let req = LexRequirement::new( [partition_by_reqs.inner.clone(), order_by_reqs.inner].concat(), - ); - let req = collapse_lex_req(req); + ) + .collapse(); if partition_by_eqs.ordering_satisfy_requirement(&req) { // Window can be run with existing ordering let mode = if indices.len() == partitionby_exprs.len() {