Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into alamb/encapsulate_oeq
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jan 8, 2025
2 parents 500998c + b0bd899 commit badfdd9
Show file tree
Hide file tree
Showing 14 changed files with 230 additions and 58 deletions.
9 changes: 4 additions & 5 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@
#[cfg(feature = "parquet")]
mod parquet;

use std::any::Any;
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;

use crate::arrow::record_batch::RecordBatch;
use crate::arrow::util::pretty;
use crate::datasource::file_format::csv::CsvFormatFactory;
Expand All @@ -43,6 +38,10 @@ use crate::physical_plan::{
ExecutionPlan, SendableRecordBatchStream,
};
use crate::prelude::SessionContext;
use std::any::Any;
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;

use arrow::array::{Array, ArrayRef, Int64Array, StringArray};
use arrow::compute::{cast, concat};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/sanity_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ pub fn check_plan_sanity(
plan_str,
format_physical_sort_requirement_list(&sort_req),
idx,
child_eq_props.oeq_class
child_eq_props.oeq_class()
);
}
}
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/tests/fuzz_cases/equivalence/ordering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ fn test_ordering_satisfy_with_equivalence_random() -> Result<()> {
table_data_with_properties.clone(),
)?;
let err_msg = format!(
"Error in test case requirement:{:?}, expected: {:?}, eq_properties.oeq_class: {:?}, eq_properties.eq_group: {:?}, eq_properties.constants: {:?}",
requirement, expected, eq_properties.oeq_class, eq_properties.eq_group, eq_properties.constants
"Error in test case requirement:{:?}, expected: {:?}, eq_properties {}",
requirement, expected, eq_properties
);
// Check whether ordering_satisfy API result and
// experimental result matches.
Expand Down Expand Up @@ -141,8 +141,8 @@ fn test_ordering_satisfy_with_equivalence_complex_random() -> Result<()> {
table_data_with_properties.clone(),
)?;
let err_msg = format!(
"Error in test case requirement:{:?}, expected: {:?}, eq_properties.oeq_class: {:?}, eq_properties.eq_group: {:?}, eq_properties.constants: {:?}",
requirement, expected, eq_properties.oeq_class, eq_properties.eq_group, eq_properties.constants
"Error in test case requirement:{:?}, expected: {:?}, eq_properties: {}",
requirement, expected, eq_properties,
);
// Check whether ordering_satisfy API result and
// experimental result matches.
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/tests/fuzz_cases/equivalence/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ fn project_orderings_random() -> Result<()> {
// Make sure each ordering after projection is valid.
for ordering in projected_eq.oeq_class().iter() {
let err_msg = format!(
"Error in test case ordering:{:?}, eq_properties.oeq_class: {:?}, eq_properties.eq_group: {:?}, eq_properties.constants: {:?}, proj_exprs: {:?}",
ordering, eq_properties.oeq_class, eq_properties.eq_group, eq_properties.constants, proj_exprs
"Error in test case ordering:{:?}, eq_properties {}, proj_exprs: {:?}",
ordering, eq_properties, proj_exprs,
);
// Since ordered section satisfies schema, we expect
// that result will be same after sort (e.g sort was unnecessary).
Expand Down Expand Up @@ -179,8 +179,8 @@ fn ordering_satisfy_after_projection_random() -> Result<()> {
projected_batch.clone(),
)?;
let err_msg = format!(
"Error in test case requirement:{:?}, expected: {:?}, eq_properties.oeq_class: {:?}, eq_properties.eq_group: {:?}, eq_properties.constants: {:?}, projected_eq.oeq_class: {:?}, projected_eq.eq_group: {:?}, projected_eq.constants: {:?}, projection_mapping: {:?}",
requirement, expected, eq_properties.oeq_class, eq_properties.eq_group, eq_properties.constants, projected_eq.oeq_class, projected_eq.eq_group, projected_eq.constants, projection_mapping
"Error in test case requirement:{:?}, expected: {:?}, eq_properties: {}, projected_eq: {}, projection_mapping: {:?}",
requirement, expected, eq_properties, projected_eq, projection_mapping
);
// Check whether ordering_satisfy API result and
// experimental result matches.
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/tests/fuzz_cases/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ fn test_find_longest_permutation_random() -> Result<()> {
);

let err_msg = format!(
"Error in test case ordering:{:?}, eq_properties.oeq_class: {:?}, eq_properties.eq_group: {:?}, eq_properties.constants: {:?}",
ordering, eq_properties.oeq_class, eq_properties.eq_group, eq_properties.constants
"Error in test case ordering:{:?}, eq_properties: {}",
ordering, eq_properties
);
assert_eq!(ordering.len(), indices.len(), "{}", err_msg);
// Since ordered section satisfies schema, we expect
Expand Down
12 changes: 6 additions & 6 deletions datafusion/core/tests/fuzz_cases/equivalence/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ fn add_equal_conditions_test() -> Result<()> {
// This new entry is redundant, size shouldn't increase
eq_properties.add_equal_conditions(&col_b_expr, &col_a_expr)?;
assert_eq!(eq_properties.eq_group().len(), 1);
let eq_groups = &eq_properties.eq_group().classes[0];
let eq_groups = eq_properties.eq_group().iter().next().unwrap();
assert_eq!(eq_groups.len(), 2);
assert!(eq_groups.contains(&col_a_expr));
assert!(eq_groups.contains(&col_b_expr));
Expand All @@ -188,7 +188,7 @@ fn add_equal_conditions_test() -> Result<()> {
// however there shouldn't be any new equivalence class
eq_properties.add_equal_conditions(&col_b_expr, &col_c_expr)?;
assert_eq!(eq_properties.eq_group().len(), 1);
let eq_groups = &eq_properties.eq_group().classes[0];
let eq_groups = eq_properties.eq_group().iter().next().unwrap();
assert_eq!(eq_groups.len(), 3);
assert!(eq_groups.contains(&col_a_expr));
assert!(eq_groups.contains(&col_b_expr));
Expand All @@ -202,7 +202,7 @@ fn add_equal_conditions_test() -> Result<()> {
// Hence equivalent class count should decrease from 2 to 1.
eq_properties.add_equal_conditions(&col_x_expr, &col_a_expr)?;
assert_eq!(eq_properties.eq_group().len(), 1);
let eq_groups = &eq_properties.eq_group().classes[0];
let eq_groups = eq_properties.eq_group().iter().next().unwrap();
assert_eq!(eq_groups.len(), 5);
assert!(eq_groups.contains(&col_a_expr));
assert!(eq_groups.contains(&col_b_expr));
Expand Down Expand Up @@ -373,7 +373,7 @@ pub fn generate_table_for_eq_properties(
};

// Fill constant columns
for constant in &eq_properties.constants {
for constant in eq_properties.constants() {
let col = constant.expr().as_any().downcast_ref::<Column>().unwrap();
let (idx, _field) = schema.column_with_name(col.name()).unwrap();
let arr =
Expand All @@ -382,7 +382,7 @@ pub fn generate_table_for_eq_properties(
}

// Fill columns based on ordering equivalences
for ordering in eq_properties.oeq_class.iter() {
for ordering in eq_properties.oeq_class().iter() {
let (sort_columns, indices): (Vec<_>, Vec<_>) = ordering
.iter()
.map(|PhysicalSortExpr { expr, options }| {
Expand All @@ -406,7 +406,7 @@ pub fn generate_table_for_eq_properties(
}

// Fill columns based on equivalence groups
for eq_group in eq_properties.eq_group.iter() {
for eq_group in eq_properties.eq_group().iter() {
let representative_array =
get_representative_arr(eq_group, &schema_vec, Arc::clone(schema))
.unwrap_or_else(|| generate_random_array(n_elem, n_distinct));
Expand Down
25 changes: 19 additions & 6 deletions datafusion/physical-expr/src/equivalence/class.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ use crate::{
expressions::Column, LexOrdering, LexRequirement, PhysicalExpr, PhysicalExprRef,
PhysicalSortExpr, PhysicalSortRequirement,
};
use std::fmt::Display;
use std::sync::Arc;

use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{JoinType, ScalarValue};
use datafusion_physical_expr_common::physical_expr::format_physical_expr_list;
use std::fmt::Display;
use std::sync::Arc;
use std::vec::IntoIter;

use indexmap::{IndexMap, IndexSet};

Expand Down Expand Up @@ -323,11 +323,10 @@ impl Display for EquivalenceClass {
}
}

/// An `EquivalenceGroup` is a collection of `EquivalenceClass`es where each
/// class represents a distinct equivalence class in a relation.
/// A collection of distinct `EquivalenceClass`es
#[derive(Debug, Clone)]
pub struct EquivalenceGroup {
pub classes: Vec<EquivalenceClass>,
classes: Vec<EquivalenceClass>,
}

impl EquivalenceGroup {
Expand Down Expand Up @@ -717,6 +716,20 @@ impl EquivalenceGroup {
.zip(right_children)
.all(|(left_child, right_child)| self.exprs_equal(left_child, right_child))
}

/// Return the inner classes of this equivalence group.
pub fn into_inner(self) -> Vec<EquivalenceClass> {
self.classes
}
}

impl IntoIterator for EquivalenceGroup {
type Item = EquivalenceClass;
type IntoIter = IntoIter<EquivalenceClass>;

fn into_iter(self) -> Self::IntoIter {
self.classes.into_iter()
}
}

impl Display for EquivalenceGroup {
Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-expr/src/equivalence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ mod tests {
// This new entry is redundant, size shouldn't increase
eq_properties.add_equal_conditions(&col_b_expr, &col_a_expr)?;
assert_eq!(eq_properties.eq_group().len(), 1);
let eq_groups = &eq_properties.eq_group().classes[0];
let eq_groups = eq_properties.eq_group().iter().next().unwrap();
assert_eq!(eq_groups.len(), 2);
assert!(eq_groups.contains(&col_a_expr));
assert!(eq_groups.contains(&col_b_expr));
Expand All @@ -263,7 +263,7 @@ mod tests {
// however there shouldn't be any new equivalence class
eq_properties.add_equal_conditions(&col_b_expr, &col_c_expr)?;
assert_eq!(eq_properties.eq_group().len(), 1);
let eq_groups = &eq_properties.eq_group().classes[0];
let eq_groups = eq_properties.eq_group().iter().next().unwrap();
assert_eq!(eq_groups.len(), 3);
assert!(eq_groups.contains(&col_a_expr));
assert!(eq_groups.contains(&col_b_expr));
Expand All @@ -277,7 +277,7 @@ mod tests {
// Hence equivalent class count should decrease from 2 to 1.
eq_properties.add_equal_conditions(&col_x_expr, &col_a_expr)?;
assert_eq!(eq_properties.eq_group().len(), 1);
let eq_groups = &eq_properties.eq_group().classes[0];
let eq_groups = eq_properties.eq_group().iter().next().unwrap();
assert_eq!(eq_groups.len(), 5);
assert!(eq_groups.contains(&col_a_expr));
assert!(eq_groups.contains(&col_b_expr));
Expand Down
16 changes: 8 additions & 8 deletions datafusion/physical-expr/src/equivalence/ordering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,17 +305,17 @@ mod tests {
},
]);
// finer ordering satisfies, crude ordering should return true
let mut eq_properties_finer =
EquivalenceProperties::new(Arc::clone(&input_schema));
eq_properties_finer
.oeq_class
.add_new_ordering(finer.clone());
let eq_properties_finer = EquivalenceProperties::new_with_orderings(
Arc::clone(&input_schema),
&[finer.clone()],
);
assert!(eq_properties_finer.ordering_satisfy(crude.as_ref()));

// Crude ordering doesn't satisfy finer ordering. should return false
let mut eq_properties_crude =
EquivalenceProperties::new(Arc::clone(&input_schema));
eq_properties_crude.oeq_class.add_new_ordering(crude);
let eq_properties_crude = EquivalenceProperties::new_with_orderings(
Arc::clone(&input_schema),
&[crude.clone()],
);
assert!(!eq_properties_crude.ordering_satisfy(finer.as_ref()));
Ok(())
}
Expand Down
26 changes: 15 additions & 11 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,15 @@ use itertools::Itertools;
/// ```
#[derive(Debug, Clone)]
pub struct EquivalenceProperties {
/// Collection of equivalence classes that store expressions with the same
/// value.
pub eq_group: EquivalenceGroup,
/// Equivalent sort expressions for this table.
pub oeq_class: OrderingEquivalenceClass,
/// Expressions whose values are constant throughout the table.
/// Distinct equivalence classes (exprs known to have the same expressions)
eq_group: EquivalenceGroup,
/// Equivalent sort expressions
oeq_class: OrderingEquivalenceClass,
/// Expressions whose values are constant
///
/// TODO: We do not need to track constants separately, they can be tracked
/// inside `eq_groups` as `Literal` expressions.
pub constants: Vec<ConstExpr>,
/// inside `eq_group` as `Literal` expressions.
constants: Vec<ConstExpr>,
/// Schema associated with this object.
schema: SchemaRef,
}
Expand Down Expand Up @@ -168,6 +168,11 @@ impl EquivalenceProperties {
&self.oeq_class
}

/// Return the inner OrderingEquivalenceClass, consuming self
pub fn into_oeq_class(self) -> OrderingEquivalenceClass {
self.oeq_class
}

/// Returns a reference to the equivalence group within.
pub fn eq_group(&self) -> &EquivalenceGroup {
&self.eq_group
Expand Down Expand Up @@ -338,7 +343,6 @@ impl EquivalenceProperties {
let normalized_expr = self.eq_group().normalize_expr(Arc::clone(expr));
let eq_class = self
.eq_group
.classes
.iter()
.find_map(|class| {
class
Expand Down Expand Up @@ -1234,7 +1238,7 @@ impl EquivalenceProperties {

// Rewrite equivalence classes according to the new schema:
let mut eq_classes = vec![];
for eq_class in self.eq_group.classes {
for eq_class in self.eq_group {
let new_eq_exprs = eq_class
.into_vec()
.into_iter()
Expand Down Expand Up @@ -2307,7 +2311,7 @@ mod tests {

// At the output a1=a2=a3=a4
assert_eq!(out_properties.eq_group().len(), 1);
let eq_class = &out_properties.eq_group().classes[0];
let eq_class = out_properties.eq_group().iter().next().unwrap();
assert_eq!(eq_class.len(), 4);
assert!(eq_class.contains(col_a1));
assert!(eq_class.contains(col_a2));
Expand Down
Loading

0 comments on commit badfdd9

Please sign in to comment.