Skip to content

Commit

Permalink
Update for API changes
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Nov 1, 2024
1 parent c51f96b commit 50ded20
Show file tree
Hide file tree
Showing 17 changed files with 80 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1418,7 +1418,6 @@ pub(crate) mod tests {
use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
use datafusion_physical_expr::{
expressions::binary, expressions::lit, LexOrdering, PhysicalSortExpr,
PhysicalSortRequirement,
};
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use datafusion_physical_plan::PlanProperties;
Expand Down Expand Up @@ -1491,9 +1490,7 @@ pub(crate) mod tests {
if self.expr.is_empty() {
vec![None]
} else {
vec![Some(PhysicalSortRequirement::from_sort_exprs(
self.expr.iter(),
))]
vec![Some(LexRequirement::from(self.expr.clone()))]
}
}

Expand Down
10 changes: 6 additions & 4 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ use crate::physical_plan::{Distribution, ExecutionPlan, InputOrderMode};

use datafusion_common::plan_err;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_physical_expr::{Partitioning, PhysicalSortRequirement};
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef};
use datafusion_physical_expr::Partitioning;
use datafusion_physical_expr_common::sort_expr::{
LexOrdering, LexOrderingRef, LexRequirement,
};
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion_physical_plan::repartition::RepartitionExec;
Expand Down Expand Up @@ -221,7 +223,7 @@ fn replace_with_partial_sort(
// here we're trying to find the common prefix for sorted columns that is required for the
// sort and already satisfied by the given ordering
let child_eq_properties = child.equivalence_properties();
let sort_req = PhysicalSortRequirement::from_sort_exprs(sort_plan.expr());
let sort_req = LexRequirement::from(sort_plan.expr());

let mut common_prefix_length = 0;
while child_eq_properties
Expand Down Expand Up @@ -275,7 +277,7 @@ fn parallelize_sorts(
{
// Take the initial sort expressions and requirements
let (sort_exprs, fetch) = get_sort_exprs(&requirements.plan)?;
let sort_reqs = PhysicalSortRequirement::from_sort_exprs(sort_exprs);
let sort_reqs = LexRequirement::from(sort_exprs);
let sort_exprs = LexOrdering::new(sort_exprs.to_vec());

// If there is a connection between a `CoalescePartitionsExec` and a
Expand Down
23 changes: 9 additions & 14 deletions datafusion/core/src/physical_optimizer/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ fn pushdown_sorts_helper(
if is_sort(plan) {
let required_ordering = plan
.output_ordering()
.map(PhysicalSortRequirement::from_sort_exprs)
.map(LexRequirement::from)
.unwrap_or_default();
if !satisfy_parent {
// Make sure this `SortExec` satisfies parent requirements:
Expand Down Expand Up @@ -181,9 +181,8 @@ fn pushdown_requirement_to_children(
RequirementsCompatibility::NonCompatible => Ok(None),
}
} else if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
let sort_req = PhysicalSortRequirement::from_sort_exprs(
sort_exec.properties().output_ordering().unwrap_or(&[]),
);
let sort_req =
LexRequirement::from(sort_exec.properties().output_ordering().unwrap_or(&[]));
if sort_exec
.properties()
.eq_properties
Expand All @@ -203,9 +202,8 @@ fn pushdown_requirement_to_children(
.iter()
.all(|maintain| *maintain)
{
let output_req = PhysicalSortRequirement::from_sort_exprs(
plan.properties().output_ordering().unwrap_or(&[]),
);
let output_req =
LexRequirement::from(plan.properties().output_ordering().unwrap_or(&[]));
// Push down through operator with fetch when:
// - requirement is aligned with output ordering
// - it preserves ordering during execution
Expand All @@ -229,8 +227,7 @@ fn pushdown_requirement_to_children(
} else if let Some(smj) = plan.as_any().downcast_ref::<SortMergeJoinExec>() {
// If the current plan is SortMergeJoinExec
let left_columns_len = smj.left().schema().fields().len();
let parent_required_expr =
PhysicalSortRequirement::to_sort_exprs(parent_required.iter().cloned());
let parent_required_expr = LexOrdering::from(parent_required);
match expr_source_side(&parent_required_expr, smj.join_type(), left_columns_len) {
Some(JoinSide::Left) => try_pushdown_requirements_to_join(
smj,
Expand All @@ -243,8 +240,7 @@ fn pushdown_requirement_to_children(
smj.schema().fields.len() - smj.right().schema().fields.len();
let new_right_required =
shift_right_required(parent_required, right_offset)?;
let new_right_required_expr =
PhysicalSortRequirement::to_sort_exprs(new_right_required);
let new_right_required_expr = LexOrdering::from(new_right_required);
try_pushdown_requirements_to_join(
smj,
parent_required,
Expand All @@ -270,8 +266,7 @@ fn pushdown_requirement_to_children(
// Pushing down is not beneficial
Ok(None)
} else if is_sort_preserving_merge(plan) {
let new_ordering =
PhysicalSortRequirement::to_sort_exprs(parent_required.to_vec());
let new_ordering = LexOrdering::from(parent_required);
let mut spm_eqs = plan.equivalence_properties().clone();
// Sort preserving merge will have new ordering, one requirement above is pushed down to its below.
spm_eqs = spm_eqs.with_reorder(new_ordering);
Expand Down Expand Up @@ -403,7 +398,7 @@ fn try_pushdown_requirements_to_join(
let should_pushdown = smj_eqs.ordering_satisfy_requirement(parent_required);
Ok(should_pushdown.then(|| {
let mut required_input_ordering = smj.required_input_ordering();
let new_req = Some(PhysicalSortRequirement::from_sort_exprs(sort_expr));
let new_req = Some(LexRequirement::from(sort_expr));
match push_side {
JoinSide::Left => {
required_input_ordering[0] = new_req;
Expand Down
8 changes: 2 additions & 6 deletions datafusion/core/src/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ use datafusion_physical_plan::{

use async_trait::async_trait;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr_common::sort_expr::{
LexOrdering, LexRequirement, PhysicalSortRequirement,
};
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};

async fn register_current_csv(
ctx: &SessionContext,
Expand Down Expand Up @@ -419,9 +417,7 @@ impl ExecutionPlan for RequirementsTestExec {
}

fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
let requirement = PhysicalSortRequirement::from_sort_exprs(
self.required_input_ordering.as_ref().iter(),
);
let requirement = LexRequirement::from(self.required_input_ordering.clone());
vec![Some(requirement)]
}

Expand Down
9 changes: 4 additions & 5 deletions datafusion/core/src/physical_optimizer/update_aggr_exprs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
use datafusion_physical_expr::{
reverse_order_bys, EquivalenceProperties, PhysicalSortRequirement,
};
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::aggregates::concat_slices;
use datafusion_physical_plan::windows::get_ordered_partition_by_indices;
Expand Down Expand Up @@ -138,12 +139,10 @@ fn try_convert_aggregate_if_better(
aggr_exprs
.into_iter()
.map(|aggr_expr| {
let aggr_sort_exprs = &aggr_expr.order_bys().unwrap_or_default();
let aggr_sort_exprs = aggr_expr.order_bys().unwrap_or_default();
let reverse_aggr_sort_exprs = reverse_order_bys(aggr_sort_exprs);
let aggr_sort_reqs =
PhysicalSortRequirement::from_sort_exprs(aggr_sort_exprs.iter());
let reverse_aggr_req =
PhysicalSortRequirement::from_sort_exprs(&reverse_aggr_sort_exprs.inner);
let aggr_sort_reqs = LexRequirement::from(aggr_sort_exprs);
let reverse_aggr_req = LexRequirement::from(reverse_aggr_sort_exprs);

// If the aggregate expression benefits from input ordering, and
// there is an actual ordering enabling this, try to update the
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/physical_optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use crate::physical_plan::union::UnionExec;
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties};

use datafusion_physical_expr::{LexRequirement, PhysicalSortRequirement};
use datafusion_physical_expr::LexRequirement;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion_physical_plan::tree_node::PlanContext;

Expand All @@ -38,7 +39,7 @@ pub fn add_sort_above<T: Clone + Default>(
sort_requirements: LexRequirement,
fetch: Option<usize>,
) -> PlanContext<T> {
let mut sort_expr = PhysicalSortRequirement::to_sort_exprs(sort_requirements);
let mut sort_expr = LexOrdering::from(sort_requirements);
sort_expr.inner.retain(|sort_expr| {
!node
.plan
Expand Down
22 changes: 22 additions & 0 deletions datafusion/physical-expr-common/src/sort_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,14 @@ impl LexOrdering {
}
}

// TODO Remove when https://github.com/apache/datafusion/issues/13220 is
// done. This From clones things non obviously
impl From<LexRequirementRef<'_>> for LexOrdering {
fn from(value: LexRequirementRef) -> Self {
Self::new(value.iter().cloned().map(PhysicalSortExpr::from).collect())
}
}

impl From<Vec<PhysicalSortExpr>> for LexOrdering {
fn from(value: Vec<PhysicalSortExpr>) -> Self {
Self::new(value)
Expand Down Expand Up @@ -547,6 +555,20 @@ impl From<LexOrdering> for LexRequirement {
}
}

// TODO Remove when https://github.com/apache/datafusion/issues/13220 is
// done. This impl clones things non obviously
impl From<LexOrderingRef<'_>> for LexRequirement {
fn from(value: LexOrderingRef) -> Self {
Self::new(
value
.iter()
.cloned()
.map(PhysicalSortRequirement::from)
.collect(),
)
}
}

impl From<Vec<PhysicalSortRequirement>> for LexRequirement {
fn from(value: Vec<PhysicalSortRequirement>) -> Self {
Self::new(value)
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-expr/src/equivalence/class.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,11 +477,11 @@ impl EquivalenceGroup {
/// sort expressions.
pub fn normalize_sort_exprs(&self, sort_exprs: LexOrderingRef) -> LexOrdering {
// Convert sort expressions to sort requirements:
let sort_reqs = PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter());
let sort_reqs = LexRequirement::from(sort_exprs);
// Normalize the requirements:
let normalized_sort_reqs = self.normalize_sort_requirements(&sort_reqs);
// Convert sort requirements back to sort expressions:
PhysicalSortRequirement::to_sort_exprs(normalized_sort_reqs.inner)
LexOrdering::from(normalized_sort_reqs)
}

/// This function applies the `normalize_sort_requirement` function for all
Expand Down
12 changes: 6 additions & 6 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,11 +410,11 @@ impl EquivalenceProperties {
/// after deduplication.
fn normalize_sort_exprs(&self, sort_exprs: LexOrderingRef) -> LexOrdering {
// Convert sort expressions to sort requirements:
let sort_reqs = PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter());
let sort_reqs = LexRequirement::from(sort_exprs);
// Normalize the requirements:
let normalized_sort_reqs = self.normalize_sort_requirements(&sort_reqs);
// Convert sort requirements back to sort expressions:
PhysicalSortRequirement::to_sort_exprs(normalized_sort_reqs)
LexOrdering::from(normalized_sort_reqs)
}

/// Normalizes the given sort requirements (i.e. `sort_reqs`) using the
Expand Down Expand Up @@ -458,7 +458,7 @@ impl EquivalenceProperties {
/// orderings.
pub fn ordering_satisfy(&self, given: LexOrderingRef) -> bool {
// Convert the given sort expressions to sort requirements:
let sort_requirements = PhysicalSortRequirement::from_sort_exprs(given.iter());
let sort_requirements = LexRequirement::from(given);
self.ordering_satisfy_requirement(&sort_requirements)
}

Expand Down Expand Up @@ -552,11 +552,11 @@ impl EquivalenceProperties {
rhs: LexOrderingRef,
) -> Option<LexOrdering> {
// Convert the given sort expressions to sort requirements:
let lhs = PhysicalSortRequirement::from_sort_exprs(lhs);
let rhs = PhysicalSortRequirement::from_sort_exprs(rhs);
let lhs = LexRequirement::from(lhs);
let rhs = LexRequirement::from(rhs);
let finer = self.get_finer_requirement(&lhs, &rhs);
// Convert the chosen sort requirements back to sort expressions:
finer.map(PhysicalSortRequirement::to_sort_exprs)
finer.map(LexOrdering::from)
}

/// Returns the finer ordering among the requirements `lhs` and `rhs`,
Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-optimizer/src/output_requirements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use datafusion_physical_plan::{
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{Result, Statistics};
use datafusion_physical_expr::{Distribution, LexRequirement, PhysicalSortRequirement};
use datafusion_physical_expr::{Distribution, LexRequirement};
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion_physical_plan::{ExecutionPlanProperties, PlanProperties};

Expand Down Expand Up @@ -256,13 +256,13 @@ fn require_top_ordering_helper(
// Therefore; we check the sort expression field of the SortExec to assign the requirements.
let req_ordering = sort_exec.expr();
let req_dist = sort_exec.required_input_distribution()[0].clone();
let reqs = PhysicalSortRequirement::from_sort_exprs(req_ordering);
let reqs = LexRequirement::from(req_ordering);
Ok((
Arc::new(OutputRequirementExec::new(plan, Some(reqs), req_dist)) as _,
true,
))
} else if let Some(spm) = plan.as_any().downcast_ref::<SortPreservingMergeExec>() {
let reqs = PhysicalSortRequirement::from_sort_exprs(spm.expr());
let reqs = LexRequirement::from(spm.expr());
Ok((
Arc::new(OutputRequirementExec::new(
plan,
Expand Down
6 changes: 2 additions & 4 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1074,9 +1074,7 @@ pub fn get_finer_aggregate_exprs_requirement(
);
}

Ok(PhysicalSortRequirement::from_sort_exprs(
requirement.inner.iter(),
))
Ok(LexRequirement::from(requirement))
}

/// Returns physical expressions for arguments to evaluate against a batch.
Expand Down Expand Up @@ -2304,7 +2302,7 @@ mod tests {
&eq_properties,
&AggregateMode::Partial,
)?;
let res = PhysicalSortRequirement::to_sort_exprs(res);
let res = LexOrdering::from(res);
assert_eq!(res, common_requirement);
Ok(())
}
Expand Down
10 changes: 3 additions & 7 deletions datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::equivalence::join_equivalence_properties;
use datafusion_physical_expr::{PhysicalExprRef, PhysicalSortRequirement};
use datafusion_physical_expr::PhysicalExprRef;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
use futures::{Stream, StreamExt};
use hashbrown::HashSet;
Expand Down Expand Up @@ -292,12 +292,8 @@ impl ExecutionPlan for SortMergeJoinExec {

fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
vec![
Some(PhysicalSortRequirement::from_sort_exprs(
self.left_sort_exprs.iter(),
)),
Some(PhysicalSortRequirement::from_sort_exprs(
self.right_sort_exprs.iter(),
)),
Some(LexRequirement::from(self.left_sort_exprs.clone())),
Some(LexRequirement::from(self.right_sort_exprs.clone())),
]
}

Expand Down
10 changes: 5 additions & 5 deletions datafusion/physical-plan/src/joins/symmetric_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ use datafusion_execution::TaskContext;
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_physical_expr::equivalence::join_equivalence_properties;
use datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph;
use datafusion_physical_expr::{PhysicalExprRef, PhysicalSortRequirement};
use datafusion_physical_expr::PhysicalExprRef;

use ahash::RandomState;
use datafusion_physical_expr_common::sort_expr::{
Expand Down Expand Up @@ -417,12 +417,12 @@ impl ExecutionPlan for SymmetricHashJoinExec {
vec![
self.left_sort_exprs
.as_ref()
.map(LexOrdering::iter)
.map(PhysicalSortRequirement::from_sort_exprs),
.cloned()
.map(LexRequirement::from),
self.right_sort_exprs
.as_ref()
.map(LexOrdering::iter)
.map(PhysicalSortRequirement::from_sort_exprs),
.cloned()
.map(LexRequirement::from),
]
}

Expand Down
Loading

0 comments on commit 50ded20

Please sign in to comment.