diff --git a/src/adapter/src/optimize.rs b/src/adapter/src/optimize.rs index 12fb7d1563943..4634c40164164 100644 --- a/src/adapter/src/optimize.rs +++ b/src/adapter/src/optimize.rs @@ -247,6 +247,9 @@ impl From<&OptimizerConfig> for mz_sql::plan::HirToMirConfig { enable_new_outer_join_lowering: config.features.enable_new_outer_join_lowering, enable_variadic_left_join_lowering: config.features.enable_variadic_left_join_lowering, enable_outer_join_null_filter: config.features.enable_outer_join_null_filter, + enable_value_window_function_fusion: config + .features + .enable_value_window_function_fusion, } } } diff --git a/src/repr/src/optimize.rs b/src/repr/src/optimize.rs index 64db1cb09229c..168e29dace50f 100644 --- a/src/repr/src/optimize.rs +++ b/src/repr/src/optimize.rs @@ -117,6 +117,8 @@ optimizer_feature_flags!({ // Reoptimize imported views when building and optimizing a // `DataflowDescription` in the global MIR optimization phase. reoptimize_imported_views: bool, + // Enables the value window function fusion optimization. + enable_value_window_function_fusion: bool, }); /// A trait used to implement layered config construction. diff --git a/src/sql-lexer/src/keywords.txt b/src/sql-lexer/src/keywords.txt index ec72dcb6470f3..9d8fa8a22b91e 100644 --- a/src/sql-lexer/src/keywords.txt +++ b/src/sql-lexer/src/keywords.txt @@ -178,6 +178,7 @@ From Full Fullname Function +Fusion Generator Grant Greatest diff --git a/src/sql-parser/src/ast/defs/statement.rs b/src/sql-parser/src/ast/defs/statement.rs index c4aa98d6a078e..d983c114c8d8a 100644 --- a/src/sql-parser/src/ast/defs/statement.rs +++ b/src/sql-parser/src/ast/defs/statement.rs @@ -2024,6 +2024,7 @@ pub enum ClusterFeatureName { EnableVariadicLeftJoinLowering, EnableLetrecFixpointAnalysis, EnableOuterJoinNullFilter, + EnableValueWindowFunctionFusion, } impl WithOptionName for ClusterFeatureName { @@ -2039,7 +2040,8 @@ impl WithOptionName for ClusterFeatureName { | Self::EnableEagerDeltaJoins | Self::EnableVariadicLeftJoinLowering | Self::EnableLetrecFixpointAnalysis - | Self::EnableOuterJoinNullFilter => false, + | Self::EnableOuterJoinNullFilter + | Self::EnableValueWindowFunctionFusion => false, } } } @@ -3566,6 +3568,7 @@ pub enum ExplainPlanOptionName { EnableVariadicLeftJoinLowering, EnableLetrecFixpointAnalysis, EnableOuterJoinNullFilter, + EnableValueWindowFunctionFusion, } impl WithOptionName for ExplainPlanOptionName { @@ -3600,7 +3603,8 @@ impl WithOptionName for ExplainPlanOptionName { | Self::EnableEagerDeltaJoins | Self::EnableVariadicLeftJoinLowering | Self::EnableLetrecFixpointAnalysis - | Self::EnableOuterJoinNullFilter => false, + | Self::EnableOuterJoinNullFilter + | Self::EnableValueWindowFunctionFusion => false, } } } diff --git a/src/sql/src/plan/lowering.rs b/src/sql/src/plan/lowering.rs index 79cb6d66faafb..030bcbc8bc2b8 100644 --- a/src/sql/src/plan/lowering.rs +++ b/src/sql/src/plan/lowering.rs @@ -138,6 +138,7 @@ pub struct Config { pub enable_variadic_left_join_lowering: bool, /// Enable the extra null filter implemented in #28018. pub enable_outer_join_null_filter: bool, + pub enable_value_window_function_fusion: bool, } impl From<&SystemVars> for Config { @@ -146,17 +147,18 @@ impl From<&SystemVars> for Config { enable_new_outer_join_lowering: vars.enable_new_outer_join_lowering(), enable_variadic_left_join_lowering: vars.enable_variadic_left_join_lowering(), enable_outer_join_null_filter: vars.enable_outer_join_null_filter(), + enable_value_window_function_fusion: vars.enable_value_window_function_fusion(), } } } /// Context passed to the lowering. This is wired to most parts of the lowering. -struct Context<'a> { +pub(crate) struct Context<'a> { /// Feature flags affecting the behavior of lowering. - config: &'a Config, + pub config: &'a Config, /// Optional, because some callers don't have an `OptimizerMetrics` handy. When it's None, we /// simply don't write metrics. - metrics: Option<&'a OptimizerMetrics>, + pub metrics: Option<&'a OptimizerMetrics>, } impl HirRelationExpr { @@ -190,7 +192,7 @@ impl HirRelationExpr { let mut id_gen = mz_ore::id_gen::IdGen::default(); transform_expr::split_subquery_predicates(&mut other); transform_expr::try_simplify_quantified_comparisons(&mut other); - transform_expr::fuse_window_functions(&mut other)?; + transform_expr::fuse_window_functions(&mut other, &context)?; MirRelationExpr::constant(vec![vec![]], RelationType::new(vec![])) .let_in_fallible(&mut id_gen, |id_gen, get_outer| { other.applied_to( diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index 3604f917ed08c..4f61ce1665e7b 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -3510,7 +3510,8 @@ generate_extracted_config!( (EnableNewOuterJoinLowering, Option, Default(None)), (EnableVariadicLeftJoinLowering, Option, Default(None)), (EnableLetrecFixpointAnalysis, Option, Default(None)), - (EnableOuterJoinNullFilter, Option, Default(None)) + (EnableOuterJoinNullFilter, Option, Default(None)), + (EnableValueWindowFunctionFusion, Option, Default(None)) ); /// Convert a [`CreateClusterStatement`] into a [`Plan`]. @@ -3648,6 +3649,7 @@ pub fn plan_create_cluster_inner( enable_variadic_left_join_lowering, enable_letrec_fixpoint_analysis, enable_outer_join_null_filter, + enable_value_window_function_fusion, seen: _, } = ClusterFeatureExtracted::try_from(features)?; let optimizer_feature_overrides = OptimizerFeatureOverrides { @@ -3657,6 +3659,7 @@ pub fn plan_create_cluster_inner( enable_variadic_left_join_lowering, enable_letrec_fixpoint_analysis, enable_outer_join_null_filter, + enable_value_window_function_fusion, ..Default::default() }; @@ -3752,6 +3755,7 @@ pub fn unplan_create_cluster( enable_variadic_left_join_lowering, enable_letrec_fixpoint_analysis, enable_outer_join_null_filter, + enable_value_window_function_fusion, } = optimizer_feature_overrides; let features_extracted = ClusterFeatureExtracted { // Seen is ignored when unplanning. @@ -3762,6 +3766,7 @@ pub fn unplan_create_cluster( enable_variadic_left_join_lowering, enable_letrec_fixpoint_analysis, enable_outer_join_null_filter, + enable_value_window_function_fusion, }; let features = features_extracted.into_values(scx.catalog); let availability_zones = if availability_zones.is_empty() { diff --git a/src/sql/src/plan/statement/dml.rs b/src/sql/src/plan/statement/dml.rs index 5a8fd3bf9ac5e..8ecd690acbabf 100644 --- a/src/sql/src/plan/statement/dml.rs +++ b/src/sql/src/plan/statement/dml.rs @@ -365,7 +365,8 @@ generate_extracted_config!( (EnableEagerDeltaJoins, Option, Default(None)), (EnableVariadicLeftJoinLowering, Option, Default(None)), (EnableLetrecFixpointAnalysis, Option, Default(None)), - (EnableOuterJoinNullFilter, Option, Default(None)) + (EnableOuterJoinNullFilter, Option, Default(None)), + (EnableValueWindowFunctionFusion, Option, Default(None)) ); impl TryFrom for ExplainConfig { @@ -417,6 +418,7 @@ impl TryFrom for ExplainConfig { enable_cardinality_estimates: Default::default(), persist_fast_path_limit: Default::default(), reoptimize_imported_views: v.reoptimize_imported_views, + enable_value_window_function_fusion: v.enable_value_window_function_fusion, }, }) } diff --git a/src/sql/src/plan/transform_expr.rs b/src/sql/src/plan/transform_expr.rs index 5eac918920679..e8e2f50d82f58 100644 --- a/src/sql/src/plan/transform_expr.rs +++ b/src/sql/src/plan/transform_expr.rs @@ -353,7 +353,13 @@ fn column_type( /// should be to columns that are earlier than the first element of the group. (No need to check /// column references in the other direction, i.e., references in other expressions that refer to /// columns in the group.) -pub fn fuse_window_functions(root: &mut HirRelationExpr) -> Result<(), RecursionLimitError> { +pub fn fuse_window_functions( + root: &mut HirRelationExpr, + context: &crate::plan::lowering::Context, +) -> Result<(), RecursionLimitError> { + if !context.config.enable_value_window_function_fusion { + return Ok(()); + } impl HirScalarExpr { /// Similar to `MirScalarExpr::support`, but adapted to `HirScalarExpr` in a special way: it diff --git a/src/sql/src/session/vars/definitions.rs b/src/sql/src/session/vars/definitions.rs index 4cd57d78d6ed7..0e9adf1c86a9f 100644 --- a/src/sql/src/session/vars/definitions.rs +++ b/src/sql/src/session/vars/definitions.rs @@ -2088,6 +2088,12 @@ feature_flags!( default: false, enable_for_item_parsing: false, }, + { + name: enable_value_window_function_fusion, + desc: "Enables the value window function fusion optimization", + default: true, + enable_for_item_parsing: false, + }, ); impl From<&super::SystemVars> for OptimizerFeatures { @@ -2101,6 +2107,7 @@ impl From<&super::SystemVars> for OptimizerFeatures { enable_letrec_fixpoint_analysis: vars.enable_letrec_fixpoint_analysis(), enable_cardinality_estimates: vars.enable_cardinality_estimates(), enable_outer_join_null_filter: vars.enable_outer_join_null_filter(), + enable_value_window_function_fusion: vars.enable_value_window_function_fusion(), persist_fast_path_limit: vars.persist_fast_path_limit(), reoptimize_imported_views: false, } diff --git a/test/sqllogictest/window_funcs.slt b/test/sqllogictest/window_funcs.slt index ee05ebbef0b8a..f84c5b7cd4725 100644 --- a/test/sqllogictest/window_funcs.slt +++ b/test/sqllogictest/window_funcs.slt @@ -7476,3 +7476,62 @@ ORDER BY x,y; 13 14 1111 182 2222 27 NULL 27 15 16 1111 240 2222 31 14 31 17 18 1111 306 2222 35 16 35 + +simple conn=mz_system,user=mz_system +ALTER SYSTEM SET enable_value_window_function_fusion = false +---- +COMPLETE 0 + +query T multiline +EXPLAIN +SELECT + *, + lag(x) OVER (), + lag(y) OVER () +FROM t7 +ORDER BY x,y; +---- +Explained Query: + Finish order_by=[#0 asc nulls_last, #1 asc nulls_last] output=[#0..=#3] + Project (#3, #4, #6, #5) + Map (record_get[1](#1), record_get[0](#2), record_get[1](#2), record_get[3](#2), record_get[0](#1)) + FlatMap unnest_list(#0) + Reduce aggregates=[lag[order_by=[]](row(row(row(record_get[0](record_get[1](#0)), record_get[1](record_get[1](#0)), record_get[0](#0), record_get[0](#0)), row(record_get[0](record_get[1](#0)), 1, null))))] + Project (#1) + FlatMap unnest_list(#0) + Reduce aggregates=[lag[order_by=[]](row(row(row(#0, #1), row(#1, 1, null))))] + ReadStorage materialize.public.t7 + +Source materialize.public.t7 + +Target cluster: quickstart + +EOF + +simple conn=mz_system,user=mz_system +ALTER SYSTEM SET enable_value_window_function_fusion = true +---- +COMPLETE 0 + +query T multiline +EXPLAIN +SELECT + *, + lag(x) OVER (), + lag(y) OVER () +FROM t7 +ORDER BY x,y; +---- +Explained Query: + Finish order_by=[#0 asc nulls_last, #1 asc nulls_last] output=[#0..=#3] + Project (#3, #4, #7, #6) + Map (record_get[1](#1), record_get[0](#2), record_get[1](#2), record_get[0](#1), record_get[0](#5), record_get[1](#5)) + FlatMap unnest_list(#0) + Reduce aggregates=[fused_value_window_func[lag[order_by=[]], lag[order_by=[]] order_by=[]](row(row(row(#0, #1), row(row(#1, 1, null), row(#0, 1, null)))))] + ReadStorage materialize.public.t7 + +Source materialize.public.t7 + +Target cluster: quickstart + +EOF