From 36e3415215227c3ae2a69993bf6ab08f4d8ca74e Mon Sep 17 00:00:00 2001 From: Albert Skalt <133099191+askalt@users.noreply.github.com> Date: Sun, 10 Nov 2024 12:02:10 +0300 Subject: [PATCH] Optimize `replace_params_with_values` (#13308) * Add benchmark for `with_param_values` Target of this benchmark: control that placeholders replacing does not get slower, if the query does not contain placeholders at all. * Do not save original expression name if it does not contain placeholders Prior this patch, the original expression name was unconditionally preserved during the replacement of placeholders. However, in some cases, this is unnecessary. If the expression does not contain any placeholders, its name remains unchanged, and we do not need to preserve it. This patch adds the following optimization: - If during first pass (placeholders type inference) was investigated that there are no placeholders, do not save the original name, do not apply second pass (replacing). Just leave the expression as it is and return `Transformed::no(...)`. --- datafusion/core/benches/sql_planner.rs | 29 ++++++++++++++++++++++++ datafusion/expr/src/expr.rs | 10 +++++++- datafusion/expr/src/logical_plan/plan.rs | 17 ++++++++++---- datafusion/sql/src/expr/mod.rs | 2 +- 4 files changed, 51 insertions(+), 7 deletions(-) diff --git a/datafusion/core/benches/sql_planner.rs b/datafusion/core/benches/sql_planner.rs index 140e266a0272..44320e7a287a 100644 --- a/datafusion/core/benches/sql_planner.rs +++ b/datafusion/core/benches/sql_planner.rs @@ -24,8 +24,10 @@ mod data_utils; use crate::criterion::Criterion; use arrow::datatypes::{DataType, Field, Fields, Schema}; +use criterion::Bencher; use datafusion::datasource::MemTable; use datafusion::execution::context::SessionContext; +use datafusion_common::ScalarValue; use itertools::Itertools; use std::fs::File; use std::io::{BufRead, BufReader}; @@ -122,6 +124,29 @@ fn register_clickbench_hits_table() -> SessionContext { ctx } +/// Target of this benchmark: control that placeholders replacing does not get slower, +/// if the query does not contain placeholders at all. +fn benchmark_with_param_values_many_columns(ctx: &SessionContext, b: &mut Bencher) { + const COLUMNS_NUM: usize = 200; + let mut aggregates = String::new(); + for i in 0..COLUMNS_NUM { + if i > 0 { + aggregates.push_str(", "); + } + aggregates.push_str(format!("MAX(a{})", i).as_str()); + } + // SELECT max(attr0), ..., max(attrN) FROM t1. + let query = format!("SELECT {} FROM t1", aggregates); + let statement = ctx.state().sql_to_statement(&query, "Generic").unwrap(); + let rt = Runtime::new().unwrap(); + let plan = + rt.block_on(async { ctx.state().statement_to_plan(statement).await.unwrap() }); + b.iter(|| { + let plan = plan.clone(); + criterion::black_box(plan.with_param_values(vec![ScalarValue::from(1)]).unwrap()); + }); +} + fn criterion_benchmark(c: &mut Criterion) { // verify that we can load the clickbench data prior to running the benchmark if !PathBuf::from(format!("{BENCHMARKS_PATH_1}{CLICKBENCH_DATA_PATH}")).exists() @@ -388,6 +413,10 @@ fn criterion_benchmark(c: &mut Criterion) { } }) }); + + c.bench_function("with_param_values_many_columns", |b| { + benchmark_with_param_values_many_columns(&ctx, b); + }); } criterion_group!(benches, criterion_benchmark); diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 0cdfa055216b..aee3c9b42fb0 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -1598,7 +1598,11 @@ impl Expr { /// /// For example, gicen an expression like ` = $0` will infer `$0` to /// have type `int32`. - pub fn infer_placeholder_types(self, schema: &DFSchema) -> Result { + /// + /// Returns transformed expression and flag that is true if expression contains + /// at least one placeholder. + pub fn infer_placeholder_types(self, schema: &DFSchema) -> Result<(Expr, bool)> { + let mut has_placeholder = false; self.transform(|mut expr| { // Default to assuming the arguments are the same type if let Expr::BinaryExpr(BinaryExpr { left, op: _, right }) = &mut expr { @@ -1615,9 +1619,13 @@ impl Expr { rewrite_placeholder(low.as_mut(), expr.as_ref(), schema)?; rewrite_placeholder(high.as_mut(), expr.as_ref(), schema)?; } + if let Expr::Placeholder(_) = &expr { + has_placeholder = true; + } Ok(Transformed::yes(expr)) }) .data() + .map(|data| (data, has_placeholder)) } /// Returns true if some of this `exprs` subexpressions may not be evaluated diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 56e13b356b47..6ee99b22c7f3 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1413,9 +1413,15 @@ impl LogicalPlan { let schema = Arc::clone(plan.schema()); let name_preserver = NamePreserver::new(&plan); plan.map_expressions(|e| { - let original_name = name_preserver.save(&e); - let transformed_expr = - e.infer_placeholder_types(&schema)?.transform_up(|e| { + let (e, has_placeholder) = e.infer_placeholder_types(&schema)?; + if !has_placeholder { + // Performance optimization: + // avoid NamePreserver copy and second pass over expression + // if no placeholders. + Ok(Transformed::no(e)) + } else { + let original_name = name_preserver.save(&e); + let transformed_expr = e.transform_up(|e| { if let Expr::Placeholder(Placeholder { id, .. }) = e { let value = param_values.get_placeholders_with_values(&id)?; Ok(Transformed::yes(Expr::Literal(value))) @@ -1423,8 +1429,9 @@ impl LogicalPlan { Ok(Transformed::no(e)) } })?; - // Preserve name to avoid breaking column references to this expression - Ok(transformed_expr.update_data(|expr| original_name.restore(expr))) + // Preserve name to avoid breaking column references to this expression + Ok(transformed_expr.update_data(|expr| original_name.restore(expr))) + } }) }) .map(|res| res.data) diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs index 432e8668c52e..b68be90b03e1 100644 --- a/datafusion/sql/src/expr/mod.rs +++ b/datafusion/sql/src/expr/mod.rs @@ -140,7 +140,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let mut expr = self.sql_expr_to_logical_expr(sql, schema, planner_context)?; expr = self.rewrite_partial_qualifier(expr, schema); self.validate_schema_satisfies_exprs(schema, &[expr.clone()])?; - let expr = expr.infer_placeholder_types(schema)?; + let (expr, _) = expr.infer_placeholder_types(schema)?; Ok(expr) }