Skip to content

Commit

Permalink
Pre-compute the interesting orderings
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jan 8, 2025
1 parent 721a723 commit a8182fc
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 15 deletions.
1 change: 0 additions & 1 deletion datafusion/physical-expr/src/equivalence/ordering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ impl OrderingEquivalenceClass {

/// Extend this ordering equivalence class with the `other` class.
#[deprecated(since = "45.0.0", note = "Use add_new_orderings instead")]

pub fn extend(&mut self, other: Self) {
self.orderings.extend(other.orderings);
// Make sure that there are no redundant orderings:
Expand Down
72 changes: 58 additions & 14 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ pub struct EquivalenceProperties {
eq_group: EquivalenceGroup,
/// Equivalent sort expressions
oeq_class: OrderingEquivalenceClass,
/// Normalized sort expressions (where eq_group and constants are applied to oeq_class)
normalized_oeq_class: OrderingEquivalenceClass,
/// Expressions whose values are constant
///
/// TODO: We do not need to track constants separately, they can be tracked
Expand All @@ -143,6 +145,7 @@ impl EquivalenceProperties {
Self {
eq_group: EquivalenceGroup::empty(),
oeq_class: OrderingEquivalenceClass::empty(),
normalized_oeq_class: OrderingEquivalenceClass::empty(),
constants: vec![],
schema,
}
Expand All @@ -153,6 +156,7 @@ impl EquivalenceProperties {
Self {
eq_group: EquivalenceGroup::empty(),
oeq_class: OrderingEquivalenceClass::new(orderings.to_vec()),
normalized_oeq_class: OrderingEquivalenceClass::empty(),
constants: vec![],
schema,
}
Expand All @@ -168,6 +172,11 @@ impl EquivalenceProperties {
&self.oeq_class
}

/// Returns a reference to the normalized ordering equivalence class within.
pub fn normalized_oeq_class(&self) -> &OrderingEquivalenceClass {
&self.normalized_oeq_class
}

/// Return the inner OrderingEquivalenceClass, consuming self
pub fn into_oeq_class(self) -> OrderingEquivalenceClass {
self.oeq_class
Expand All @@ -194,18 +203,23 @@ impl EquivalenceProperties {
(!output_ordering.is_empty()).then_some(output_ordering)
}

/// Returns the normalized version of the ordering equivalence class within.
/// Recalculates the normalized version of the ordering equivalence class within.
///
/// Normalization removes constants and duplicates as well as standardizing
/// expressions according to the equivalence group within.
pub fn normalized_oeq_class(&self) -> OrderingEquivalenceClass {
OrderingEquivalenceClass::new(
self.oeq_class
.iter()
.map(|ordering| self.normalize_sort_exprs(ordering))
.collect(),
fn renormalize_oeq_class(&mut self) {
//let normalized_oeq_class = mem::take(&mut self.normalized_oeq_class);
let normalized_oeq_class = self.oeq_class.clone();
self.normalized_oeq_class = OrderingEquivalenceClass::new(
self.normalize_orderings(normalized_oeq_class.into_inner()),
)
}

fn with_renormalize_oeq_class(mut self) -> Self {
self.renormalize_oeq_class();
self
}

/// Extends this `EquivalenceProperties` with the `other` object.
pub fn extend(mut self, other: Self) -> Self {
self.eq_group.extend(other.eq_group);
Expand All @@ -218,6 +232,7 @@ impl EquivalenceProperties {
///
/// Call this method when existing orderings are invalidated.
pub fn clear_orderings(&mut self) -> OrderingEquivalenceClass {
self.normalized_oeq_class.clear(); // clear previous normalized orderings
mem::take(&mut self.oeq_class) // reset to default
}

Expand All @@ -226,7 +241,8 @@ impl EquivalenceProperties {
pub fn clear_per_partition_constants(&mut self) {
self.constants.retain(|item| {
matches!(item.across_partitions(), AcrossPartitions::Uniform(_))
})
});
self.renormalize_oeq_class();
}

/// Extends this `EquivalenceProperties` by adding the orderings inside the
Expand All @@ -240,8 +256,15 @@ impl EquivalenceProperties {
&mut self,
orderings: impl IntoIterator<Item = LexOrdering>,
) {
let (orderings, normalized_orderings): (Vec<_>, Vec<_>) = orderings
.into_iter()
.map(|sort_expr| (sort_expr.clone(), self.normalize_sort_exprs(sort_expr)))
.unzip();

// single location oeq_class is updated
self.oeq_class.add_new_orderings(orderings);
self.normalized_oeq_class
.add_new_orderings(normalized_orderings);
}

/// Adds a single ordering to the existing ordering equivalence class.
Expand All @@ -253,6 +276,7 @@ impl EquivalenceProperties {
/// equivalence group within.
pub fn add_equivalence_group(&mut self, other_eq_group: EquivalenceGroup) {
self.eq_group.extend(other_eq_group);
self.renormalize_oeq_class()
}

/// Adds a new equality condition into the existing equivalence group.
Expand Down Expand Up @@ -282,6 +306,7 @@ impl EquivalenceProperties {

// Add equal expressions to the state
self.eq_group.add_equal_conditions(left, right);
self.renormalize_oeq_class();

// Discover any new orderings
self.discover_new_orderings(left)?;
Expand Down Expand Up @@ -325,9 +350,10 @@ impl EquivalenceProperties {

// Add all new normalized constants
self.constants.extend(normalized_constants);
self.renormalize_oeq_class();

// Discover any new orderings based on the constants
for ordering in self.normalized_oeq_class().iter() {
for ordering in self.normalized_oeq_class().clone().iter() {
if let Err(e) = self.discover_new_orderings(&ordering[0].expr) {
log::debug!("error discovering new orderings: {e}");
}
Expand Down Expand Up @@ -476,15 +502,23 @@ impl EquivalenceProperties {
/// function would return `vec![a ASC, c ASC]`. Internally, it would first
/// normalize to `vec![a ASC, c ASC, a ASC]` and end up with the final result
/// after deduplication.
fn normalize_sort_exprs(&self, sort_exprs: &LexOrdering) -> LexOrdering {
fn normalize_sort_exprs(&self, sort_exprs: LexOrdering) -> LexOrdering {
// Convert sort expressions to sort requirements:
let sort_reqs = LexRequirement::from(sort_exprs.clone());
let sort_reqs = LexRequirement::from(sort_exprs);
// Normalize the requirements:
let normalized_sort_reqs = self.normalize_sort_requirements(&sort_reqs);
// Convert sort requirements back to sort expressions:
LexOrdering::from(normalized_sort_reqs)
}

/// Applies normalization to all the orderings
fn normalize_orderings(&self, sort_exprs: Vec<LexOrdering>) -> Vec<LexOrdering> {
sort_exprs
.into_iter()
.map(|sort_expr| self.normalize_sort_exprs(sort_expr))
.collect()
}

/// Normalizes the given sort requirements (i.e. `sort_reqs`) using the
/// equivalence group and the ordering equivalence class within. It works by:
/// - Removing expressions that have a constant value from the given requirement.
Expand Down Expand Up @@ -986,10 +1020,12 @@ impl EquivalenceProperties {
let projected_orderings = self.projected_orderings(projection_mapping);
Self {
eq_group: projected_eq_group,
oeq_class: OrderingEquivalenceClass::new(projected_orderings),
oeq_class: OrderingEquivalenceClass::new(projected_orderings.clone()),
normalized_oeq_class: OrderingEquivalenceClass::new(projected_orderings),
constants: projected_constants,
schema: output_schema,
}
.with_renormalize_oeq_class()
}

/// Returns the longest (potentially partial) permutation satisfying the
Expand Down Expand Up @@ -2016,8 +2052,16 @@ fn calculate_union_binary(
// Next, calculate valid orderings for the union by searching for prefixes
// in both sides.
let mut orderings = UnionEquivalentOrderingBuilder::new();
orderings.add_satisfied_orderings(lhs.normalized_oeq_class(), lhs.constants(), &rhs);
orderings.add_satisfied_orderings(rhs.normalized_oeq_class(), rhs.constants(), &lhs);
orderings.add_satisfied_orderings(
lhs.normalized_oeq_class().clone(),
lhs.constants(),
&rhs,
);
orderings.add_satisfied_orderings(
rhs.normalized_oeq_class().clone(),
rhs.constants(),
&lhs,
);
let orderings = orderings.build();

let mut eq_properties =
Expand Down

0 comments on commit a8182fc

Please sign in to comment.