diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index e8b6e99d2dc4..fb22752286fc 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1584,6 +1584,7 @@ dependencies = [ "datafusion-common", "datafusion-expr-common", "hashbrown 0.14.5", + "indexmap", "itertools 0.14.0", ] diff --git a/datafusion/physical-expr-common/Cargo.toml b/datafusion/physical-expr-common/Cargo.toml index 14d6ca64d15e..bd5d1e223c4a 100644 --- a/datafusion/physical-expr-common/Cargo.toml +++ b/datafusion/physical-expr-common/Cargo.toml @@ -41,4 +41,5 @@ arrow = { workspace = true } datafusion-common = { workspace = true, default-features = true } datafusion-expr-common = { workspace = true } hashbrown = { workspace = true } +indexmap = { workspace = true } itertools = { workspace = true } diff --git a/datafusion/physical-expr-common/src/sort_expr.rs b/datafusion/physical-expr-common/src/sort_expr.rs index 0d7501610662..f5314dd533ee 100644 --- a/datafusion/physical-expr-common/src/sort_expr.rs +++ b/datafusion/physical-expr-common/src/sort_expr.rs @@ -30,7 +30,8 @@ use arrow::datatypes::Schema; use arrow::record_batch::RecordBatch; use datafusion_common::Result; use datafusion_expr_common::columnar_value::ColumnarValue; -use itertools::Itertools; +use indexmap::IndexSet; +use itertools::{izip, Itertools}; /// Represents Sort operation for a column in a RecordBatch /// @@ -409,6 +410,22 @@ impl LexOrdering { .map(PhysicalSortExpr::from) .collect() } + + /// Collapse a `LexOrdering` into a new duplicate-free `LexOrdering` based on expression. + /// + /// This function filters duplicate entries that have same physical + /// expression inside, ignoring [`SortOptions`]. For example: + /// + /// `vec![a ASC, a DESC]` collapses to `vec![a ASC]`. + pub fn collapse(self) -> Self { + let mut output = LexOrdering::default(); + for item in self { + if !output.iter().any(|req| req.expr.eq(&item.expr)) { + output.push(item); + } + } + output + } } impl From> for LexOrdering { @@ -540,6 +557,33 @@ impl LexRequirement { .collect(), ) } + + /// Constructs a duplicate-free `LexOrderingReq` by filtering out + /// duplicate entries that have same physical expression inside. + /// + /// For example, `vec![a Some(ASC), a Some(DESC)]` collapses to `vec![a + /// Some(ASC)]`. + /// + /// It will also filter out entries that are ordered if the next entry is; + /// for instance, `vec![floor(a) Some(ASC), a Some(ASC)]` will be collapsed to + /// `vec![a Some(ASC)]`. + pub fn collapse(self) -> Self { + let mut output = Vec::::new(); + let mut exprs = IndexSet::new(); + let mut reqs = vec![]; + for item in self { + let PhysicalSortRequirement { expr, options: req } = item; + // new insertion + if exprs.insert(expr) { + reqs.push(req); + } + } + debug_assert_eq!(reqs.len(), exprs.len()); + for (expr, req) in izip!(exprs, reqs) { + output.push(PhysicalSortRequirement::new(expr, req)); + } + LexRequirement::new(output) + } } impl From for LexRequirement { diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index 9e535a94eb6e..495cf211efe7 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use super::{add_offset_to_expr, collapse_lex_req, ProjectionMapping}; +use super::{add_offset_to_expr, ProjectionMapping}; use crate::{ expressions::Column, LexOrdering, LexRequirement, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr, PhysicalSortRequirement, @@ -527,12 +527,13 @@ impl EquivalenceGroup { &self, sort_reqs: &LexRequirement, ) -> LexRequirement { - collapse_lex_req(LexRequirement::new( + LexRequirement::new( sort_reqs .iter() .map(|sort_req| self.normalize_sort_requirement(sort_req.clone())) .collect(), - )) + ) + .collapse() } /// Projects `expr` according to the given projection mapping. diff --git a/datafusion/physical-expr/src/equivalence/mod.rs b/datafusion/physical-expr/src/equivalence/mod.rs index d4c14f7bc8ff..60e508dd937a 100644 --- a/datafusion/physical-expr/src/equivalence/mod.rs +++ b/datafusion/physical-expr/src/equivalence/mod.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use crate::expressions::Column; -use crate::{LexRequirement, PhysicalExpr, PhysicalSortRequirement}; +use crate::{LexRequirement, PhysicalExpr}; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; @@ -41,14 +41,9 @@ pub use properties::{ /// It will also filter out entries that are ordered if the next entry is; /// for instance, `vec![floor(a) Some(ASC), a Some(ASC)]` will be collapsed to /// `vec![a Some(ASC)]`. +#[deprecated(since = "45.0.0", note = "Use LexRequirement::collapse")] pub fn collapse_lex_req(input: LexRequirement) -> LexRequirement { - let mut output = Vec::::new(); - for item in input { - if !output.iter().any(|req| req.expr.eq(&item.expr)) { - output.push(item); - } - } - LexRequirement::new(output) + input.collapse() } /// Adds the `offset` value to `Column` indices inside `expr`. This function is @@ -80,7 +75,9 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use arrow_schema::{SchemaRef, SortOptions}; use datafusion_common::{plan_datafusion_err, Result}; - use datafusion_physical_expr_common::sort_expr::LexOrdering; + use datafusion_physical_expr_common::sort_expr::{ + LexOrdering, PhysicalSortRequirement, + }; pub fn output_schema( mapping: &ProjectionMapping, diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs b/datafusion/physical-expr/src/equivalence/ordering.rs index 0ae5f4af8f08..1fe7eda7cccc 100644 --- a/datafusion/physical-expr/src/equivalence/ordering.rs +++ b/datafusion/physical-expr/src/equivalence/ordering.rs @@ -146,8 +146,13 @@ impl OrderingEquivalenceClass { /// Returns the concatenation of all the orderings. This enables merge /// operations to preserve all equivalent orderings simultaneously. pub fn output_ordering(&self) -> Option { - let output_ordering = self.orderings.iter().flatten().cloned().collect(); - let output_ordering = collapse_lex_ordering(output_ordering); + let output_ordering = self + .orderings + .iter() + .flatten() + .cloned() + .collect::() + .collapse(); (!output_ordering.is_empty()).then_some(output_ordering) } @@ -207,19 +212,6 @@ impl IntoIterator for OrderingEquivalenceClass { } } -/// This function constructs a duplicate-free `LexOrdering` by filtering out -/// duplicate entries that have same physical expression inside. For example, -/// `vec![a ASC, a DESC]` collapses to `vec![a ASC]`. -pub fn collapse_lex_ordering(input: LexOrdering) -> LexOrdering { - let mut output = LexOrdering::default(); - for item in input.iter() { - if !output.iter().any(|req| req.expr.eq(&item.expr)) { - output.push(item.clone()); - } - } - output -} - /// Trims `orderings[idx]` if some suffix of it overlaps with a prefix of /// `orderings[pre_idx]`. Returns `true` if there is any overlap, `false` otherwise. fn resolve_overlap(orderings: &mut [LexOrdering], idx: usize, pre_idx: usize) -> bool { diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index d2eeccda2cae..a75f9901ddde 100755 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -22,11 +22,9 @@ use std::slice::Iter; use std::sync::Arc; use std::{fmt, mem}; -use super::ordering::collapse_lex_ordering; use crate::equivalence::class::{const_exprs_contains, AcrossPartitions}; use crate::equivalence::{ - collapse_lex_req, EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass, - ProjectionMapping, + EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping, }; use crate::expressions::{with_new_schema, CastExpr, Column, Literal}; use crate::{ @@ -501,15 +499,12 @@ impl EquivalenceProperties { ); let constants_normalized = self.eq_group.normalize_exprs(constant_exprs); // Prune redundant sections in the requirement: - collapse_lex_req( - normalized_sort_reqs - .iter() - .filter(|&order| { - !physical_exprs_contains(&constants_normalized, &order.expr) - }) - .cloned() - .collect(), - ) + normalized_sort_reqs + .iter() + .filter(|&order| !physical_exprs_contains(&constants_normalized, &order.expr)) + .cloned() + .collect::() + .collapse() } /// Checks whether the given ordering is satisfied by any of the existing @@ -911,7 +906,7 @@ impl EquivalenceProperties { // Simplify each ordering by removing redundant sections: orderings .chain(projected_orderings) - .map(collapse_lex_ordering) + .map(|lex_ordering| lex_ordering.collapse()) .collect() } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index c04211d679ca..ef98be691c99 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -44,10 +44,9 @@ use datafusion_execution::TaskContext; use datafusion_expr::{Accumulator, Aggregate}; use datafusion_physical_expr::aggregate::AggregateFunctionExpr; use datafusion_physical_expr::{ - equivalence::{collapse_lex_req, ProjectionMapping}, - expressions::Column, - physical_exprs_contains, EquivalenceProperties, LexOrdering, LexRequirement, - PhysicalExpr, PhysicalSortRequirement, + equivalence::ProjectionMapping, expressions::Column, physical_exprs_contains, + EquivalenceProperties, LexOrdering, LexRequirement, PhysicalExpr, + PhysicalSortRequirement, }; use itertools::Itertools; @@ -473,7 +472,7 @@ impl AggregateExec { &mode, )?; new_requirement.inner.extend(req); - new_requirement = collapse_lex_req(new_requirement); + new_requirement = new_requirement.collapse(); // If our aggregation has grouping sets then our base grouping exprs will // be expanded based on the flags in `group_by.groups` where for each diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 36c4b9f18da9..510cbc248b63 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -32,7 +32,6 @@ use datafusion_expr::{ PartitionEvaluator, ReversedUDWF, WindowFrame, WindowFunctionDefinition, WindowUDF, }; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; -use datafusion_physical_expr::equivalence::collapse_lex_req; use datafusion_physical_expr::{ reverse_order_bys, window::{SlidingAggregateWindowExpr, StandardWindowFunctionExpr}, @@ -469,8 +468,8 @@ pub fn get_window_mode( { let req = LexRequirement::new( [partition_by_reqs.inner.clone(), order_by_reqs.inner].concat(), - ); - let req = collapse_lex_req(req); + ) + .collapse(); if partition_by_eqs.ordering_satisfy_requirement(&req) { // Window can be run with existing ordering let mode = if indices.len() == partitionby_exprs.len() {