Skip to content

Commit

Permalink
fix: issue apache#8838 discard extra sort when sorted element is wrap…
Browse files Browse the repository at this point in the history
…ped (apache#9127)

* fix: issue apache#8838 discard extra sort when sorted element is wrapped

fix: issue apache#8838 discard extra sort when sorted element is wrapped

fix: issue apache#8838 discard extra sort when sorted element is wrapped

* fix bugs

* fix bugs

* fix bugs

* fix:bugs

* adding tests

* adding cast UTF8 type and diable scalarfunction situation

* fix typo
  • Loading branch information
Lordworms authored Feb 20, 2024
1 parent fc84a63 commit 453a45a
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 16 deletions.
93 changes: 87 additions & 6 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
// specific language governing permissions and limitations
// under the License.

use crate::expressions::CastExpr;
use arrow_schema::SchemaRef;
use datafusion_common::{JoinSide, JoinType};
use indexmap::IndexSet;
use itertools::Itertools;
use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
Expand All @@ -31,12 +36,8 @@ use crate::{
PhysicalSortRequirement,
};

use arrow_schema::{SchemaRef, SortOptions};
use arrow_schema::SortOptions;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{JoinSide, JoinType};

use indexmap::IndexSet;
use itertools::Itertools;

/// A `EquivalenceProperties` object stores useful information related to a schema.
/// Currently, it keeps track of:
Expand Down Expand Up @@ -426,6 +427,87 @@ impl EquivalenceProperties {
(!meet.is_empty()).then_some(meet)
}

/// we substitute the ordering according to input expression type, this is a simplified version
/// In this case, we just substitute when the expression satisfy the following confition
/// I. just have one column and is a CAST expression
/// II. just have one parameter and is a ScalarFUnctionexpression and it is monotonic
/// TODO: we could precompute all the senario that is computable, for example: atan(x + 1000) should also be substituted if
/// x is DESC or ASC
pub fn substitute_ordering_component(
matching_exprs: Arc<Vec<&Arc<dyn PhysicalExpr>>>,
sort_expr: &[PhysicalSortExpr],
schema: SchemaRef,
) -> Vec<PhysicalSortExpr> {
sort_expr
.iter()
.filter(|sort_expr| {
matching_exprs.iter().any(|matched| !matched.eq(*sort_expr))
})
.map(|sort_expr| {
let referring_exprs: Vec<_> = matching_exprs
.iter()
.filter(|matched| expr_refers(matched, &sort_expr.expr))
.cloned()
.collect();
// does not referring to any matching component, we just skip it

if referring_exprs.len() == 1 {
// we check whether this expression is substitutable or not
let r_expr = referring_exprs[0].clone();
if let Some(cast_expr) = r_expr.as_any().downcast_ref::<CastExpr>() {
// we need to know whether the Cast Expr matches or not
let expr_type =
sort_expr.expr.data_type(schema.as_ref()).unwrap();
if cast_expr.expr.eq(&sort_expr.expr)
&& cast_expr.is_bigger_cast(expr_type)
{
PhysicalSortExpr {
expr: r_expr.clone(),
options: sort_expr.options,
}
} else {
sort_expr.clone()
}
} else {
sort_expr.clone()
}
} else {
sort_expr.clone()
}
})
.collect()
}
/// In projection, supposed we have a input function 'A DESC B DESC' and the output shares the same expression
/// with A and B, we could surely use the ordering of the original ordering, However, if the A has been changed,
/// for example, A-> Cast(A, Int64) or any other form, it is invalid if we continue using the original ordering
/// Since it would cause bug in dependency constructions, we should substitute the input order in order to get correct
/// dependency map, happen in issue 8838: <https://github.com/apache/arrow-datafusion/issues/8838>
pub fn substitute_oeq_class(
&mut self,
exprs: &[(Arc<dyn PhysicalExpr>, String)],
mapping: &ProjectionMapping,
schema: SchemaRef,
) {
let matching_exprs: Arc<Vec<_>> = Arc::new(
exprs
.iter()
.filter(|(expr, _)| mapping.iter().any(|(source, _)| source.eq(expr)))
.map(|(source, _)| source)
.collect(),
);
let orderings = std::mem::take(&mut self.oeq_class.orderings);
let new_order = orderings
.into_iter()
.map(move |order| {
Self::substitute_ordering_component(
matching_exprs.clone(),
&order,
schema.clone(),
)
})
.collect();
self.oeq_class = OrderingEquivalenceClass::new(new_order);
}
/// Projects argument `expr` according to `projection_mapping`, taking
/// equivalences into account.
///
Expand Down Expand Up @@ -564,7 +646,6 @@ impl EquivalenceProperties {

// Get dependency map for existing orderings:
let dependency_map = self.construct_dependency_map(&mapping);

let orderings = mapping.iter().flat_map(|(source, target)| {
referred_dependencies(&dependency_map, source)
.into_iter()
Expand Down
30 changes: 25 additions & 5 deletions datafusion/physical-expr/src/expressions/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
// specific language governing permissions and limitations
// under the License.

use crate::physical_expr::down_cast_any_ref;
use crate::sort_properties::SortProperties;
use crate::PhysicalExpr;
use std::any::Any;
use std::fmt;
use std::hash::{Hash, Hasher};
use std::sync::Arc;

use crate::physical_expr::down_cast_any_ref;
use crate::sort_properties::SortProperties;
use crate::PhysicalExpr;
use DataType::*;

use arrow::compute::{can_cast_types, kernels, CastOptions};
use arrow::datatypes::{DataType, Schema};
Expand All @@ -41,7 +41,7 @@ const DEFAULT_CAST_OPTIONS: CastOptions<'static> = CastOptions {
#[derive(Debug, Clone)]
pub struct CastExpr {
/// The expression to cast
expr: Arc<dyn PhysicalExpr>,
pub expr: Arc<dyn PhysicalExpr>,
/// The data type to cast to
cast_type: DataType,
/// Cast options
Expand Down Expand Up @@ -76,6 +76,26 @@ impl CastExpr {
pub fn cast_options(&self) -> &CastOptions<'static> {
&self.cast_options
}
pub fn is_bigger_cast(&self, src: DataType) -> bool {
if src == self.cast_type {
return true;
}
matches!(
(src, &self.cast_type),
(Int8, Int16 | Int32 | Int64)
| (Int16, Int32 | Int64)
| (Int32, Int64)
| (UInt8, UInt16 | UInt32 | UInt64)
| (UInt16, UInt32 | UInt64)
| (UInt32, UInt64)
| (
Int8 | Int16 | Int32 | UInt8 | UInt16 | UInt32,
Float32 | Float64
)
| (Int64 | UInt64, Float64)
| (Utf8, LargeUtf8)
)
}
}

impl fmt::Display for CastExpr {
Expand Down
16 changes: 11 additions & 5 deletions datafusion/physical-plan/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ impl ProjectionExec {
input: Arc<dyn ExecutionPlan>,
) -> Result<Self> {
let input_schema = input.schema();

let fields: Result<Vec<Field>> = expr
.iter()
.map(|(e, name)| {
Expand All @@ -95,7 +94,10 @@ impl ProjectionExec {
// construct a map from the input expressions to the output expression of the Projection
let projection_mapping = ProjectionMapping::try_new(&expr, &input_schema)?;

let input_eqs = input.equivalence_properties();
let mut input_eqs = input.equivalence_properties();

input_eqs.substitute_oeq_class(&expr, &projection_mapping, input_schema.clone());

let project_eqs = input_eqs.project(&projection_mapping, schema.clone());
let output_ordering = project_eqs.oeq_class().output_ordering();

Expand Down Expand Up @@ -201,9 +203,13 @@ impl ExecutionPlan for ProjectionExec {
}

fn equivalence_properties(&self) -> EquivalenceProperties {
self.input
.equivalence_properties()
.project(&self.projection_mapping, self.schema())
let mut equi_properties = self.input.equivalence_properties();
equi_properties.substitute_oeq_class(
&self.expr,
&self.projection_mapping,
self.input.schema().clone(),
);
equi_properties.project(&self.projection_mapping, self.schema())
}

fn with_new_children(
Expand Down
86 changes: 86 additions & 0 deletions datafusion/sqllogictest/test_files/monotonic_projection_test.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# prepare the table
statement ok
CREATE EXTERNAL TABLE delta_encoding_required_column (
c_customer_sk INT NOT NULL,
c_current_cdemo_sk INT NOT NULL
)
STORED AS CSV
WITH ORDER (
c_customer_sk DESC,
c_current_cdemo_sk DESC
)
LOCATION '../../testing/data/csv/aggregate_test_100.csv';

# test for substitute CAST senario
query TT
EXPLAIN
SELECT
CAST(c_customer_sk AS BIGINT) AS c_customer_sk_big,
c_current_cdemo_sk
FROM delta_encoding_required_column
ORDER BY c_customer_sk_big DESC, c_current_cdemo_sk DESC;
----
logical_plan
Sort: c_customer_sk_big DESC NULLS FIRST, delta_encoding_required_column.c_current_cdemo_sk DESC NULLS FIRST
--Projection: CAST(delta_encoding_required_column.c_customer_sk AS Int64) AS c_customer_sk_big, delta_encoding_required_column.c_current_cdemo_sk
----TableScan: delta_encoding_required_column projection=[c_customer_sk, c_current_cdemo_sk]
physical_plan
SortPreservingMergeExec: [c_customer_sk_big@0 DESC,c_current_cdemo_sk@1 DESC]
--ProjectionExec: expr=[CAST(c_customer_sk@0 AS Int64) as c_customer_sk_big, c_current_cdemo_sk@1 as c_current_cdemo_sk]
----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c_customer_sk, c_current_cdemo_sk], output_ordering=[c_customer_sk@0 DESC, c_current_cdemo_sk@1 DESC], has_header=false

# test for commom rename
query TT
EXPLAIN
SELECT
c_customer_sk AS c_customer_sk_big,
c_current_cdemo_sk
FROM delta_encoding_required_column
ORDER BY c_customer_sk_big DESC, c_current_cdemo_sk DESC;
----
logical_plan
Sort: c_customer_sk_big DESC NULLS FIRST, delta_encoding_required_column.c_current_cdemo_sk DESC NULLS FIRST
--Projection: delta_encoding_required_column.c_customer_sk AS c_customer_sk_big, delta_encoding_required_column.c_current_cdemo_sk
----TableScan: delta_encoding_required_column projection=[c_customer_sk, c_current_cdemo_sk]
physical_plan
ProjectionExec: expr=[c_customer_sk@0 as c_customer_sk_big, c_current_cdemo_sk@1 as c_current_cdemo_sk]
--CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c_customer_sk, c_current_cdemo_sk], output_ordering=[c_customer_sk@0 DESC, c_current_cdemo_sk@1 DESC], has_header=false


# test for cast Utf8
query TT
EXPLAIN
SELECT
CAST(c_customer_sk AS STRING) AS c_customer_sk_big,
c_current_cdemo_sk
FROM delta_encoding_required_column
ORDER BY c_customer_sk_big DESC, c_current_cdemo_sk DESC;
----
logical_plan
Sort: c_customer_sk_big DESC NULLS FIRST, delta_encoding_required_column.c_current_cdemo_sk DESC NULLS FIRST
--Projection: CAST(delta_encoding_required_column.c_customer_sk AS Utf8) AS c_customer_sk_big, delta_encoding_required_column.c_current_cdemo_sk
----TableScan: delta_encoding_required_column projection=[c_customer_sk, c_current_cdemo_sk]
physical_plan
SortPreservingMergeExec: [c_customer_sk_big@0 DESC,c_current_cdemo_sk@1 DESC]
--SortExec: expr=[c_customer_sk_big@0 DESC,c_current_cdemo_sk@1 DESC]
----ProjectionExec: expr=[CAST(c_customer_sk@0 AS Utf8) as c_customer_sk_big, c_current_cdemo_sk@1 as c_current_cdemo_sk]
------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c_customer_sk, c_current_cdemo_sk], output_ordering=[c_customer_sk@0 DESC, c_current_cdemo_sk@1 DESC], has_header=false

0 comments on commit 453a45a

Please sign in to comment.