From 453a45aa796fe798987057ef70a184a16d454660 Mon Sep 17 00:00:00 2001 From: Lordworms <48054792+Lordworms@users.noreply.github.com> Date: Tue, 20 Feb 2024 01:47:46 -0600 Subject: [PATCH] fix: issue #8838 discard extra sort when sorted element is wrapped (#9127) * fix: issue #8838 discard extra sort when sorted element is wrapped fix: issue #8838 discard extra sort when sorted element is wrapped fix: issue #8838 discard extra sort when sorted element is wrapped * fix bugs * fix bugs * fix bugs * fix:bugs * adding tests * adding cast UTF8 type and diable scalarfunction situation * fix typo --- .../src/equivalence/properties.rs | 93 +++++++++++++++++-- .../physical-expr/src/expressions/cast.rs | 30 +++++- datafusion/physical-plan/src/projection.rs | 16 +++- .../test_files/monotonic_projection_test.slt | 86 +++++++++++++++++ 4 files changed, 209 insertions(+), 16 deletions(-) create mode 100644 datafusion/sqllogictest/test_files/monotonic_projection_test.slt diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 386e74b4a6dc..9e60ffe748b0 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -15,6 +15,11 @@ // specific language governing permissions and limitations // under the License. +use crate::expressions::CastExpr; +use arrow_schema::SchemaRef; +use datafusion_common::{JoinSide, JoinType}; +use indexmap::IndexSet; +use itertools::Itertools; use std::collections::{HashMap, HashSet}; use std::hash::{Hash, Hasher}; use std::sync::Arc; @@ -31,12 +36,8 @@ use crate::{ PhysicalSortRequirement, }; -use arrow_schema::{SchemaRef, SortOptions}; +use arrow_schema::SortOptions; use datafusion_common::tree_node::{Transformed, TreeNode}; -use datafusion_common::{JoinSide, JoinType}; - -use indexmap::IndexSet; -use itertools::Itertools; /// A `EquivalenceProperties` object stores useful information related to a schema. /// Currently, it keeps track of: @@ -426,6 +427,87 @@ impl EquivalenceProperties { (!meet.is_empty()).then_some(meet) } + /// we substitute the ordering according to input expression type, this is a simplified version + /// In this case, we just substitute when the expression satisfy the following confition + /// I. just have one column and is a CAST expression + /// II. just have one parameter and is a ScalarFUnctionexpression and it is monotonic + /// TODO: we could precompute all the senario that is computable, for example: atan(x + 1000) should also be substituted if + /// x is DESC or ASC + pub fn substitute_ordering_component( + matching_exprs: Arc>>, + sort_expr: &[PhysicalSortExpr], + schema: SchemaRef, + ) -> Vec { + sort_expr + .iter() + .filter(|sort_expr| { + matching_exprs.iter().any(|matched| !matched.eq(*sort_expr)) + }) + .map(|sort_expr| { + let referring_exprs: Vec<_> = matching_exprs + .iter() + .filter(|matched| expr_refers(matched, &sort_expr.expr)) + .cloned() + .collect(); + // does not referring to any matching component, we just skip it + + if referring_exprs.len() == 1 { + // we check whether this expression is substitutable or not + let r_expr = referring_exprs[0].clone(); + if let Some(cast_expr) = r_expr.as_any().downcast_ref::() { + // we need to know whether the Cast Expr matches or not + let expr_type = + sort_expr.expr.data_type(schema.as_ref()).unwrap(); + if cast_expr.expr.eq(&sort_expr.expr) + && cast_expr.is_bigger_cast(expr_type) + { + PhysicalSortExpr { + expr: r_expr.clone(), + options: sort_expr.options, + } + } else { + sort_expr.clone() + } + } else { + sort_expr.clone() + } + } else { + sort_expr.clone() + } + }) + .collect() + } + /// In projection, supposed we have a input function 'A DESC B DESC' and the output shares the same expression + /// with A and B, we could surely use the ordering of the original ordering, However, if the A has been changed, + /// for example, A-> Cast(A, Int64) or any other form, it is invalid if we continue using the original ordering + /// Since it would cause bug in dependency constructions, we should substitute the input order in order to get correct + /// dependency map, happen in issue 8838: + pub fn substitute_oeq_class( + &mut self, + exprs: &[(Arc, String)], + mapping: &ProjectionMapping, + schema: SchemaRef, + ) { + let matching_exprs: Arc> = Arc::new( + exprs + .iter() + .filter(|(expr, _)| mapping.iter().any(|(source, _)| source.eq(expr))) + .map(|(source, _)| source) + .collect(), + ); + let orderings = std::mem::take(&mut self.oeq_class.orderings); + let new_order = orderings + .into_iter() + .map(move |order| { + Self::substitute_ordering_component( + matching_exprs.clone(), + &order, + schema.clone(), + ) + }) + .collect(); + self.oeq_class = OrderingEquivalenceClass::new(new_order); + } /// Projects argument `expr` according to `projection_mapping`, taking /// equivalences into account. /// @@ -564,7 +646,6 @@ impl EquivalenceProperties { // Get dependency map for existing orderings: let dependency_map = self.construct_dependency_map(&mapping); - let orderings = mapping.iter().flat_map(|(source, target)| { referred_dependencies(&dependency_map, source) .into_iter() diff --git a/datafusion/physical-expr/src/expressions/cast.rs b/datafusion/physical-expr/src/expressions/cast.rs index 0c4ed3c12549..b0e175e711fe 100644 --- a/datafusion/physical-expr/src/expressions/cast.rs +++ b/datafusion/physical-expr/src/expressions/cast.rs @@ -15,14 +15,14 @@ // specific language governing permissions and limitations // under the License. +use crate::physical_expr::down_cast_any_ref; +use crate::sort_properties::SortProperties; +use crate::PhysicalExpr; use std::any::Any; use std::fmt; use std::hash::{Hash, Hasher}; use std::sync::Arc; - -use crate::physical_expr::down_cast_any_ref; -use crate::sort_properties::SortProperties; -use crate::PhysicalExpr; +use DataType::*; use arrow::compute::{can_cast_types, kernels, CastOptions}; use arrow::datatypes::{DataType, Schema}; @@ -41,7 +41,7 @@ const DEFAULT_CAST_OPTIONS: CastOptions<'static> = CastOptions { #[derive(Debug, Clone)] pub struct CastExpr { /// The expression to cast - expr: Arc, + pub expr: Arc, /// The data type to cast to cast_type: DataType, /// Cast options @@ -76,6 +76,26 @@ impl CastExpr { pub fn cast_options(&self) -> &CastOptions<'static> { &self.cast_options } + pub fn is_bigger_cast(&self, src: DataType) -> bool { + if src == self.cast_type { + return true; + } + matches!( + (src, &self.cast_type), + (Int8, Int16 | Int32 | Int64) + | (Int16, Int32 | Int64) + | (Int32, Int64) + | (UInt8, UInt16 | UInt32 | UInt64) + | (UInt16, UInt32 | UInt64) + | (UInt32, UInt64) + | ( + Int8 | Int16 | Int32 | UInt8 | UInt16 | UInt32, + Float32 | Float64 + ) + | (Int64 | UInt64, Float64) + | (Utf8, LargeUtf8) + ) + } } impl fmt::Display for CastExpr { diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index cc2ab62049ed..51423d37e77c 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -70,7 +70,6 @@ impl ProjectionExec { input: Arc, ) -> Result { let input_schema = input.schema(); - let fields: Result> = expr .iter() .map(|(e, name)| { @@ -95,7 +94,10 @@ impl ProjectionExec { // construct a map from the input expressions to the output expression of the Projection let projection_mapping = ProjectionMapping::try_new(&expr, &input_schema)?; - let input_eqs = input.equivalence_properties(); + let mut input_eqs = input.equivalence_properties(); + + input_eqs.substitute_oeq_class(&expr, &projection_mapping, input_schema.clone()); + let project_eqs = input_eqs.project(&projection_mapping, schema.clone()); let output_ordering = project_eqs.oeq_class().output_ordering(); @@ -201,9 +203,13 @@ impl ExecutionPlan for ProjectionExec { } fn equivalence_properties(&self) -> EquivalenceProperties { - self.input - .equivalence_properties() - .project(&self.projection_mapping, self.schema()) + let mut equi_properties = self.input.equivalence_properties(); + equi_properties.substitute_oeq_class( + &self.expr, + &self.projection_mapping, + self.input.schema().clone(), + ); + equi_properties.project(&self.projection_mapping, self.schema()) } fn with_new_children( diff --git a/datafusion/sqllogictest/test_files/monotonic_projection_test.slt b/datafusion/sqllogictest/test_files/monotonic_projection_test.slt new file mode 100644 index 000000000000..57b810673358 --- /dev/null +++ b/datafusion/sqllogictest/test_files/monotonic_projection_test.slt @@ -0,0 +1,86 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# prepare the table +statement ok +CREATE EXTERNAL TABLE delta_encoding_required_column ( + c_customer_sk INT NOT NULL, + c_current_cdemo_sk INT NOT NULL +) +STORED AS CSV +WITH ORDER ( + c_customer_sk DESC, + c_current_cdemo_sk DESC +) +LOCATION '../../testing/data/csv/aggregate_test_100.csv'; + +# test for substitute CAST senario +query TT +EXPLAIN +SELECT + CAST(c_customer_sk AS BIGINT) AS c_customer_sk_big, + c_current_cdemo_sk +FROM delta_encoding_required_column +ORDER BY c_customer_sk_big DESC, c_current_cdemo_sk DESC; +---- +logical_plan +Sort: c_customer_sk_big DESC NULLS FIRST, delta_encoding_required_column.c_current_cdemo_sk DESC NULLS FIRST +--Projection: CAST(delta_encoding_required_column.c_customer_sk AS Int64) AS c_customer_sk_big, delta_encoding_required_column.c_current_cdemo_sk +----TableScan: delta_encoding_required_column projection=[c_customer_sk, c_current_cdemo_sk] +physical_plan +SortPreservingMergeExec: [c_customer_sk_big@0 DESC,c_current_cdemo_sk@1 DESC] +--ProjectionExec: expr=[CAST(c_customer_sk@0 AS Int64) as c_customer_sk_big, c_current_cdemo_sk@1 as c_current_cdemo_sk] +----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c_customer_sk, c_current_cdemo_sk], output_ordering=[c_customer_sk@0 DESC, c_current_cdemo_sk@1 DESC], has_header=false + +# test for commom rename +query TT +EXPLAIN +SELECT + c_customer_sk AS c_customer_sk_big, + c_current_cdemo_sk +FROM delta_encoding_required_column +ORDER BY c_customer_sk_big DESC, c_current_cdemo_sk DESC; +---- +logical_plan +Sort: c_customer_sk_big DESC NULLS FIRST, delta_encoding_required_column.c_current_cdemo_sk DESC NULLS FIRST +--Projection: delta_encoding_required_column.c_customer_sk AS c_customer_sk_big, delta_encoding_required_column.c_current_cdemo_sk +----TableScan: delta_encoding_required_column projection=[c_customer_sk, c_current_cdemo_sk] +physical_plan +ProjectionExec: expr=[c_customer_sk@0 as c_customer_sk_big, c_current_cdemo_sk@1 as c_current_cdemo_sk] +--CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c_customer_sk, c_current_cdemo_sk], output_ordering=[c_customer_sk@0 DESC, c_current_cdemo_sk@1 DESC], has_header=false + + +# test for cast Utf8 +query TT +EXPLAIN +SELECT + CAST(c_customer_sk AS STRING) AS c_customer_sk_big, + c_current_cdemo_sk +FROM delta_encoding_required_column +ORDER BY c_customer_sk_big DESC, c_current_cdemo_sk DESC; +---- +logical_plan +Sort: c_customer_sk_big DESC NULLS FIRST, delta_encoding_required_column.c_current_cdemo_sk DESC NULLS FIRST +--Projection: CAST(delta_encoding_required_column.c_customer_sk AS Utf8) AS c_customer_sk_big, delta_encoding_required_column.c_current_cdemo_sk +----TableScan: delta_encoding_required_column projection=[c_customer_sk, c_current_cdemo_sk] +physical_plan +SortPreservingMergeExec: [c_customer_sk_big@0 DESC,c_current_cdemo_sk@1 DESC] +--SortExec: expr=[c_customer_sk_big@0 DESC,c_current_cdemo_sk@1 DESC] +----ProjectionExec: expr=[CAST(c_customer_sk@0 AS Utf8) as c_customer_sk_big, c_current_cdemo_sk@1 as c_current_cdemo_sk] +------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c_customer_sk, c_current_cdemo_sk], output_ordering=[c_customer_sk@0 DESC, c_current_cdemo_sk@1 DESC], has_header=false