Skip to content

Commit

Permalink
Refactor into LexOrdering::collapse, LexRequirement::collapse avo…
Browse files Browse the repository at this point in the history
…id clone (#14038)

* Move collapse_lex_ordering to Lexordering::collapse

* reduce diff

* avoid clone, cleanup

* Introduce LexRequirement::collapse

* Improve performance of collapse, from @akurmustafa

alamb#26

fix formatting

* Revert "Improve performance of collapse, from @akurmustafa"

This reverts commit a44acfd.

* remove incorrect comment

---------

Co-authored-by: Mustafa Akur <[email protected]>
  • Loading branch information
alamb and akurmustafa authored Jan 10, 2025
1 parent 918ac1b commit ec173ab
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 48 deletions.
31 changes: 31 additions & 0 deletions datafusion/physical-expr-common/src/sort_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,22 @@ impl LexOrdering {
.map(PhysicalSortExpr::from)
.collect()
}

/// Collapse a `LexOrdering` into a new duplicate-free `LexOrdering` based on expression.
///
/// This function filters duplicate entries that have same physical
/// expression inside, ignoring [`SortOptions`]. For example:
///
/// `vec![a ASC, a DESC]` collapses to `vec![a ASC]`.
pub fn collapse(self) -> Self {
let mut output = LexOrdering::default();
for item in self {
if !output.iter().any(|req| req.expr.eq(&item.expr)) {
output.push(item);
}
}
output
}
}

impl From<Vec<PhysicalSortExpr>> for LexOrdering {
Expand Down Expand Up @@ -540,6 +556,21 @@ impl LexRequirement {
.collect(),
)
}

/// Constructs a duplicate-free `LexOrderingReq` by filtering out
/// duplicate entries that have same physical expression inside.
///
/// For example, `vec![a Some(ASC), a Some(DESC)]` collapses to `vec![a
/// Some(ASC)]`.
pub fn collapse(self) -> Self {
let mut output = Vec::<PhysicalSortRequirement>::new();
for item in self {
if !output.iter().any(|req| req.expr.eq(&item.expr)) {
output.push(item);
}
}
LexRequirement::new(output)
}
}

impl From<LexOrdering> for LexRequirement {
Expand Down
7 changes: 4 additions & 3 deletions datafusion/physical-expr/src/equivalence/class.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use super::{add_offset_to_expr, collapse_lex_req, ProjectionMapping};
use super::{add_offset_to_expr, ProjectionMapping};
use crate::{
expressions::Column, LexOrdering, LexRequirement, PhysicalExpr, PhysicalExprRef,
PhysicalSortExpr, PhysicalSortRequirement,
Expand Down Expand Up @@ -526,12 +526,13 @@ impl EquivalenceGroup {
&self,
sort_reqs: &LexRequirement,
) -> LexRequirement {
collapse_lex_req(LexRequirement::new(
LexRequirement::new(
sort_reqs
.iter()
.map(|sort_req| self.normalize_sort_requirement(sort_req.clone()))
.collect(),
))
)
.collapse()
}

/// Projects `expr` according to the given projection mapping.
Expand Down
15 changes: 6 additions & 9 deletions datafusion/physical-expr/src/equivalence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use std::sync::Arc;

use crate::expressions::Column;
use crate::{LexRequirement, PhysicalExpr, PhysicalSortRequirement};
use crate::{LexRequirement, PhysicalExpr};

use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};

Expand All @@ -41,14 +41,9 @@ pub use properties::{
/// It will also filter out entries that are ordered if the next entry is;
/// for instance, `vec![floor(a) Some(ASC), a Some(ASC)]` will be collapsed to
/// `vec![a Some(ASC)]`.
#[deprecated(since = "45.0.0", note = "Use LexRequirement::collapse")]
pub fn collapse_lex_req(input: LexRequirement) -> LexRequirement {
let mut output = Vec::<PhysicalSortRequirement>::new();
for item in input {
if !output.iter().any(|req| req.expr.eq(&item.expr)) {
output.push(item);
}
}
LexRequirement::new(output)
input.collapse()
}

/// Adds the `offset` value to `Column` indices inside `expr`. This function is
Expand Down Expand Up @@ -80,7 +75,9 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema};
use arrow_schema::{SchemaRef, SortOptions};
use datafusion_common::{plan_datafusion_err, Result};
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_expr_common::sort_expr::{
LexOrdering, PhysicalSortRequirement,
};

pub fn output_schema(
mapping: &ProjectionMapping,
Expand Down
22 changes: 7 additions & 15 deletions datafusion/physical-expr/src/equivalence/ordering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,13 @@ impl OrderingEquivalenceClass {
/// Returns the concatenation of all the orderings. This enables merge
/// operations to preserve all equivalent orderings simultaneously.
pub fn output_ordering(&self) -> Option<LexOrdering> {
let output_ordering = self.orderings.iter().flatten().cloned().collect();
let output_ordering = collapse_lex_ordering(output_ordering);
let output_ordering = self
.orderings
.iter()
.flatten()
.cloned()
.collect::<LexOrdering>()
.collapse();
(!output_ordering.is_empty()).then_some(output_ordering)
}

Expand Down Expand Up @@ -221,19 +226,6 @@ impl IntoIterator for OrderingEquivalenceClass {
}
}

/// This function constructs a duplicate-free `LexOrdering` by filtering out
/// duplicate entries that have same physical expression inside. For example,
/// `vec![a ASC, a DESC]` collapses to `vec![a ASC]`.
pub fn collapse_lex_ordering(input: LexOrdering) -> LexOrdering {
let mut output = LexOrdering::default();
for item in input.iter() {
if !output.iter().any(|req| req.expr.eq(&item.expr)) {
output.push(item.clone());
}
}
output
}

/// Trims `orderings[idx]` if some suffix of it overlaps with a prefix of
/// `orderings[pre_idx]`. Returns `true` if there is any overlap, `false` otherwise.
fn resolve_overlap(orderings: &mut [LexOrdering], idx: usize, pre_idx: usize) -> bool {
Expand Down
21 changes: 8 additions & 13 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ use std::slice::Iter;
use std::sync::Arc;
use std::{fmt, mem};

use super::ordering::collapse_lex_ordering;
use crate::equivalence::class::{const_exprs_contains, AcrossPartitions};
use crate::equivalence::{
collapse_lex_req, EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass,
ProjectionMapping,
EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping,
};
use crate::expressions::{with_new_schema, CastExpr, Column, Literal};
use crate::{
Expand Down Expand Up @@ -505,15 +503,12 @@ impl EquivalenceProperties {
);
let constants_normalized = self.eq_group.normalize_exprs(constant_exprs);
// Prune redundant sections in the requirement:
collapse_lex_req(
normalized_sort_reqs
.iter()
.filter(|&order| {
!physical_exprs_contains(&constants_normalized, &order.expr)
})
.cloned()
.collect(),
)
normalized_sort_reqs
.iter()
.filter(|&order| !physical_exprs_contains(&constants_normalized, &order.expr))
.cloned()
.collect::<LexRequirement>()
.collapse()
}

/// Checks whether the given ordering is satisfied by any of the existing
Expand Down Expand Up @@ -915,7 +910,7 @@ impl EquivalenceProperties {
// Simplify each ordering by removing redundant sections:
orderings
.chain(projected_orderings)
.map(collapse_lex_ordering)
.map(|lex_ordering| lex_ordering.collapse())
.collect()
}

Expand Down
9 changes: 4 additions & 5 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,9 @@ use datafusion_execution::TaskContext;
use datafusion_expr::{Accumulator, Aggregate};
use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
use datafusion_physical_expr::{
equivalence::{collapse_lex_req, ProjectionMapping},
expressions::Column,
physical_exprs_contains, EquivalenceProperties, LexOrdering, LexRequirement,
PhysicalExpr, PhysicalSortRequirement,
equivalence::ProjectionMapping, expressions::Column, physical_exprs_contains,
EquivalenceProperties, LexOrdering, LexRequirement, PhysicalExpr,
PhysicalSortRequirement,
};

use itertools::Itertools;
Expand Down Expand Up @@ -473,7 +472,7 @@ impl AggregateExec {
&mode,
)?;
new_requirement.inner.extend(req);
new_requirement = collapse_lex_req(new_requirement);
new_requirement = new_requirement.collapse();

// If our aggregation has grouping sets then our base grouping exprs will
// be expanded based on the flags in `group_by.groups` where for each
Expand Down
5 changes: 2 additions & 3 deletions datafusion/physical-plan/src/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use datafusion_expr::{
PartitionEvaluator, ReversedUDWF, WindowFrame, WindowFunctionDefinition, WindowUDF,
};
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
use datafusion_physical_expr::equivalence::collapse_lex_req;
use datafusion_physical_expr::{
reverse_order_bys,
window::{SlidingAggregateWindowExpr, StandardWindowFunctionExpr},
Expand Down Expand Up @@ -469,8 +468,8 @@ pub fn get_window_mode(
{
let req = LexRequirement::new(
[partition_by_reqs.inner.clone(), order_by_reqs.inner].concat(),
);
let req = collapse_lex_req(req);
)
.collapse();
if partition_by_eqs.ordering_satisfy_requirement(&req) {
// Window can be run with existing ordering
let mode = if indices.len() == partitionby_exprs.len() {
Expand Down

0 comments on commit ec173ab

Please sign in to comment.