Skip to content

Commit

Permalink
Add feature flag for value window function fusion
Browse files Browse the repository at this point in the history
  • Loading branch information
ggevay committed Aug 29, 2024
1 parent cee22a7 commit b1d8778
Show file tree
Hide file tree
Showing 10 changed files with 100 additions and 9 deletions.
3 changes: 3 additions & 0 deletions src/adapter/src/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,9 @@ impl From<&OptimizerConfig> for mz_sql::plan::HirToMirConfig {
enable_new_outer_join_lowering: config.features.enable_new_outer_join_lowering,
enable_variadic_left_join_lowering: config.features.enable_variadic_left_join_lowering,
enable_outer_join_null_filter: config.features.enable_outer_join_null_filter,
enable_value_window_function_fusion: config
.features
.enable_value_window_function_fusion,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/repr/src/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ optimizer_feature_flags!({
// Reoptimize imported views when building and optimizing a
// `DataflowDescription` in the global MIR optimization phase.
reoptimize_imported_views: bool,
// Enables the value window function fusion optimization.
enable_value_window_function_fusion: bool,
});

/// A trait used to implement layered config construction.
Expand Down
1 change: 1 addition & 0 deletions src/sql-lexer/src/keywords.txt
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ From
Full
Fullname
Function
Fusion
Generator
Grant
Greatest
Expand Down
8 changes: 6 additions & 2 deletions src/sql-parser/src/ast/defs/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2024,6 +2024,7 @@ pub enum ClusterFeatureName {
EnableVariadicLeftJoinLowering,
EnableLetrecFixpointAnalysis,
EnableOuterJoinNullFilter,
EnableValueWindowFunctionFusion,
}

impl WithOptionName for ClusterFeatureName {
Expand All @@ -2039,7 +2040,8 @@ impl WithOptionName for ClusterFeatureName {
| Self::EnableEagerDeltaJoins
| Self::EnableVariadicLeftJoinLowering
| Self::EnableLetrecFixpointAnalysis
| Self::EnableOuterJoinNullFilter => false,
| Self::EnableOuterJoinNullFilter
| Self::EnableValueWindowFunctionFusion => false,
}
}
}
Expand Down Expand Up @@ -3566,6 +3568,7 @@ pub enum ExplainPlanOptionName {
EnableVariadicLeftJoinLowering,
EnableLetrecFixpointAnalysis,
EnableOuterJoinNullFilter,
EnableValueWindowFunctionFusion,
}

impl WithOptionName for ExplainPlanOptionName {
Expand Down Expand Up @@ -3600,7 +3603,8 @@ impl WithOptionName for ExplainPlanOptionName {
| Self::EnableEagerDeltaJoins
| Self::EnableVariadicLeftJoinLowering
| Self::EnableLetrecFixpointAnalysis
| Self::EnableOuterJoinNullFilter => false,
| Self::EnableOuterJoinNullFilter
| Self::EnableValueWindowFunctionFusion => false,
}
}
}
Expand Down
10 changes: 6 additions & 4 deletions src/sql/src/plan/lowering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ pub struct Config {
pub enable_variadic_left_join_lowering: bool,
/// Enable the extra null filter implemented in #28018.
pub enable_outer_join_null_filter: bool,
pub enable_value_window_function_fusion: bool,
}

impl From<&SystemVars> for Config {
Expand All @@ -146,17 +147,18 @@ impl From<&SystemVars> for Config {
enable_new_outer_join_lowering: vars.enable_new_outer_join_lowering(),
enable_variadic_left_join_lowering: vars.enable_variadic_left_join_lowering(),
enable_outer_join_null_filter: vars.enable_outer_join_null_filter(),
enable_value_window_function_fusion: vars.enable_value_window_function_fusion(),
}
}
}

/// Context passed to the lowering. This is wired to most parts of the lowering.
struct Context<'a> {
pub(crate) struct Context<'a> {
/// Feature flags affecting the behavior of lowering.
config: &'a Config,
pub config: &'a Config,
/// Optional, because some callers don't have an `OptimizerMetrics` handy. When it's None, we
/// simply don't write metrics.
metrics: Option<&'a OptimizerMetrics>,
pub metrics: Option<&'a OptimizerMetrics>,
}

impl HirRelationExpr {
Expand Down Expand Up @@ -190,7 +192,7 @@ impl HirRelationExpr {
let mut id_gen = mz_ore::id_gen::IdGen::default();
transform_expr::split_subquery_predicates(&mut other);
transform_expr::try_simplify_quantified_comparisons(&mut other);
transform_expr::fuse_window_functions(&mut other)?;
transform_expr::fuse_window_functions(&mut other, &context)?;
MirRelationExpr::constant(vec![vec![]], RelationType::new(vec![]))
.let_in_fallible(&mut id_gen, |id_gen, get_outer| {
other.applied_to(
Expand Down
7 changes: 6 additions & 1 deletion src/sql/src/plan/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3510,7 +3510,8 @@ generate_extracted_config!(
(EnableNewOuterJoinLowering, Option<bool>, Default(None)),
(EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
(EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
(EnableOuterJoinNullFilter, Option<bool>, Default(None))
(EnableOuterJoinNullFilter, Option<bool>, Default(None)),
(EnableValueWindowFunctionFusion, Option<bool>, Default(None))
);

/// Convert a [`CreateClusterStatement`] into a [`Plan`].
Expand Down Expand Up @@ -3648,6 +3649,7 @@ pub fn plan_create_cluster_inner(
enable_variadic_left_join_lowering,
enable_letrec_fixpoint_analysis,
enable_outer_join_null_filter,
enable_value_window_function_fusion,
seen: _,
} = ClusterFeatureExtracted::try_from(features)?;
let optimizer_feature_overrides = OptimizerFeatureOverrides {
Expand All @@ -3657,6 +3659,7 @@ pub fn plan_create_cluster_inner(
enable_variadic_left_join_lowering,
enable_letrec_fixpoint_analysis,
enable_outer_join_null_filter,
enable_value_window_function_fusion,
..Default::default()
};

Expand Down Expand Up @@ -3752,6 +3755,7 @@ pub fn unplan_create_cluster(
enable_variadic_left_join_lowering,
enable_letrec_fixpoint_analysis,
enable_outer_join_null_filter,
enable_value_window_function_fusion,
} = optimizer_feature_overrides;
let features_extracted = ClusterFeatureExtracted {
// Seen is ignored when unplanning.
Expand All @@ -3762,6 +3766,7 @@ pub fn unplan_create_cluster(
enable_variadic_left_join_lowering,
enable_letrec_fixpoint_analysis,
enable_outer_join_null_filter,
enable_value_window_function_fusion,
};
let features = features_extracted.into_values(scx.catalog);
let availability_zones = if availability_zones.is_empty() {
Expand Down
4 changes: 3 additions & 1 deletion src/sql/src/plan/statement/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,8 @@ generate_extracted_config!(
(EnableEagerDeltaJoins, Option<bool>, Default(None)),
(EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
(EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
(EnableOuterJoinNullFilter, Option<bool>, Default(None))
(EnableOuterJoinNullFilter, Option<bool>, Default(None)),
(EnableValueWindowFunctionFusion, Option<bool>, Default(None))
);

impl TryFrom<ExplainPlanOptionExtracted> for ExplainConfig {
Expand Down Expand Up @@ -417,6 +418,7 @@ impl TryFrom<ExplainPlanOptionExtracted> for ExplainConfig {
enable_cardinality_estimates: Default::default(),
persist_fast_path_limit: Default::default(),
reoptimize_imported_views: v.reoptimize_imported_views,
enable_value_window_function_fusion: v.enable_value_window_function_fusion,
},
})
}
Expand Down
8 changes: 7 additions & 1 deletion src/sql/src/plan/transform_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,13 @@ fn column_type(
/// should be to columns that are earlier than the first element of the group. (No need to check
/// column references in the other direction, i.e., references in other expressions that refer to
/// columns in the group.)
pub fn fuse_window_functions(root: &mut HirRelationExpr) -> Result<(), RecursionLimitError> {
pub fn fuse_window_functions(
root: &mut HirRelationExpr,
context: &crate::plan::lowering::Context,
) -> Result<(), RecursionLimitError> {
if !context.config.enable_value_window_function_fusion {
return Ok(());
}

impl HirScalarExpr {
/// Similar to `MirScalarExpr::support`, but adapted to `HirScalarExpr` in a special way: it
Expand Down
7 changes: 7 additions & 0 deletions src/sql/src/session/vars/definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2088,6 +2088,12 @@ feature_flags!(
default: false,
enable_for_item_parsing: false,
},
{
name: enable_value_window_function_fusion,
desc: "Enables the value window function fusion optimization",
default: true,
enable_for_item_parsing: false,
},
);

impl From<&super::SystemVars> for OptimizerFeatures {
Expand All @@ -2101,6 +2107,7 @@ impl From<&super::SystemVars> for OptimizerFeatures {
enable_letrec_fixpoint_analysis: vars.enable_letrec_fixpoint_analysis(),
enable_cardinality_estimates: vars.enable_cardinality_estimates(),
enable_outer_join_null_filter: vars.enable_outer_join_null_filter(),
enable_value_window_function_fusion: vars.enable_value_window_function_fusion(),
persist_fast_path_limit: vars.persist_fast_path_limit(),
reoptimize_imported_views: false,
}
Expand Down
59 changes: 59 additions & 0 deletions test/sqllogictest/window_funcs.slt
Original file line number Diff line number Diff line change
Expand Up @@ -7476,3 +7476,62 @@ ORDER BY x,y;
13 14 1111 182 2222 27 NULL 27
15 16 1111 240 2222 31 14 31
17 18 1111 306 2222 35 16 35

simple conn=mz_system,user=mz_system
ALTER SYSTEM SET enable_value_window_function_fusion = false
----
COMPLETE 0

query T multiline
EXPLAIN
SELECT
*,
lag(x) OVER (),
lag(y) OVER ()
FROM t7
ORDER BY x,y;
----
Explained Query:
Finish order_by=[#0 asc nulls_last, #1 asc nulls_last] output=[#0..=#3]
Project (#3, #4, #6, #5)
Map (record_get[1](#1), record_get[0](#2), record_get[1](#2), record_get[3](#2), record_get[0](#1))
FlatMap unnest_list(#0)
Reduce aggregates=[lag[order_by=[]](row(row(row(record_get[0](record_get[1](#0)), record_get[1](record_get[1](#0)), record_get[0](#0), record_get[0](#0)), row(record_get[0](record_get[1](#0)), 1, null))))]
Project (#1)
FlatMap unnest_list(#0)
Reduce aggregates=[lag[order_by=[]](row(row(row(#0, #1), row(#1, 1, null))))]
ReadStorage materialize.public.t7

Source materialize.public.t7

Target cluster: quickstart

EOF

simple conn=mz_system,user=mz_system
ALTER SYSTEM SET enable_value_window_function_fusion = true
----
COMPLETE 0

query T multiline
EXPLAIN
SELECT
*,
lag(x) OVER (),
lag(y) OVER ()
FROM t7
ORDER BY x,y;
----
Explained Query:
Finish order_by=[#0 asc nulls_last, #1 asc nulls_last] output=[#0..=#3]
Project (#3, #4, #7, #6)
Map (record_get[1](#1), record_get[0](#2), record_get[1](#2), record_get[0](#1), record_get[0](#5), record_get[1](#5))
FlatMap unnest_list(#0)
Reduce aggregates=[fused_value_window_func[lag[order_by=[]], lag[order_by=[]] order_by=[]](row(row(row(#0, #1), row(row(#1, 1, null), row(#0, 1, null)))))]
ReadStorage materialize.public.t7

Source materialize.public.t7

Target cluster: quickstart

EOF

0 comments on commit b1d8778

Please sign in to comment.