Skip to content

Commit

Permalink
feat(optimizer): Enable filter pushdown on window functions (#14026)
Browse files Browse the repository at this point in the history
* feat(optimizer): Enable filter pushdown on window functions

Ensures selections can be pushed past window functions similarly
to what is already done with aggregations, when possible.

* fix: Add missing dependency

* minor(optimizer): Use 'datafusion-functions-window' as a dev dependency

* docs(optimizer): Add example to filter pushdown on LogicalPlan::Window
  • Loading branch information
nuno-faria authored Jan 8, 2025
1 parent da4208b commit ad5a04f
Show file tree
Hide file tree
Showing 3 changed files with 600 additions and 4 deletions.
1 change: 1 addition & 0 deletions datafusion/optimizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ regex-syntax = "0.8.0"
async-trait = { workspace = true }
ctor = { workspace = true }
datafusion-functions-aggregate = { workspace = true }
datafusion-functions-window = { workspace = true }
datafusion-functions-window-common = { workspace = true }
datafusion-sql = { workspace = true }
env_logger = { workspace = true }
310 changes: 306 additions & 4 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,87 @@ impl OptimizerRule for PushDownFilter {
}
})
}
// Tries to push filters based on the partition key(s) of the window function(s) used.
// Example:
// Before:
// Filter: (a > 1) and (b > 1) and (c > 1)
// Window: func() PARTITION BY [a] ...
// ---
// After:
// Filter: (b > 1) and (c > 1)
// Window: func() PARTITION BY [a] ...
// Filter: (a > 1)
LogicalPlan::Window(window) => {
// Retrieve the set of potential partition keys where we can push filters by.
// Unlike aggregations, where there is only one statement per SELECT, there can be
// multiple window functions, each with potentially different partition keys.
// Therefore, we need to ensure that any potential partition key returned is used in
// ALL window functions. Otherwise, filters cannot be pushed by through that column.
let potential_partition_keys = window
.window_expr
.iter()
.map(|e| {
if let Expr::WindowFunction(window_expression) = e {
window_expression
.partition_by
.iter()
.map(|c| {
Column::from_qualified_name(
c.schema_name().to_string(),
)
})
.collect::<HashSet<_>>()
} else {
// window functions expressions are only Expr::WindowFunction
unreachable!()
}
})
// performs the set intersection of the partition keys of all window functions,
// returning only the common ones
.reduce(|a, b| &a & &b)
.unwrap_or_default();

let predicates = split_conjunction_owned(filter.predicate);
let mut keep_predicates = vec![];
let mut push_predicates = vec![];
for expr in predicates {
let cols = expr.column_refs();
if cols.iter().all(|c| potential_partition_keys.contains(c)) {
push_predicates.push(expr);
} else {
keep_predicates.push(expr);
}
}

// Unlike with aggregations, there are no cases where we have to replace, e.g.,
// `a+b` with Column(a)+Column(b). This is because partition expressions are not
// available as standalone columns to the user. For example, while an aggregation on
// `a+b` becomes Column(a + b), in a window partition it becomes
// `func() PARTITION BY [a + b] ...`. Thus, filters on expressions always remain in
// place, so we can use `push_predicates` directly. This is consistent with other
// optimizers, such as the one used by Postgres.

let window_input = Arc::clone(&window.input);
Transformed::yes(LogicalPlan::Window(window))
.transform_data(|new_plan| {
// If we have a filter to push, we push it down to the input of the window
if let Some(predicate) = conjunction(push_predicates) {
let new_filter = make_filter(predicate, window_input)?;
insert_below(new_plan, new_filter)
} else {
Ok(Transformed::no(new_plan))
}
})?
.map_data(|child_plan| {
// if there are any remaining predicates we can't push, add them
// back as a filter
if let Some(predicate) = conjunction(keep_predicates) {
make_filter(predicate, Arc::new(child_plan))
} else {
Ok(child_plan)
}
})
}
LogicalPlan::Join(join) => push_down_join(join, Some(&filter.predicate)),
LogicalPlan::TableScan(scan) => {
let filter_predicates = split_conjunction(&filter.predicate);
Expand Down Expand Up @@ -1289,12 +1370,12 @@ mod tests {
use async_trait::async_trait;

use datafusion_common::{DFSchemaRef, ScalarValue};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::expr::{ScalarFunction, WindowFunction};
use datafusion_expr::logical_plan::table_scan;
use datafusion_expr::{
col, in_list, in_subquery, lit, ColumnarValue, Extension, LogicalPlanBuilder,
ScalarUDF, ScalarUDFImpl, Signature, TableSource, TableType,
UserDefinedLogicalNodeCore, Volatility,
col, in_list, in_subquery, lit, ColumnarValue, ExprFunctionExt, Extension,
LogicalPlanBuilder, ScalarUDF, ScalarUDFImpl, Signature, TableSource, TableType,
UserDefinedLogicalNodeCore, Volatility, WindowFunctionDefinition,
};

use crate::optimizer::Optimizer;
Expand Down Expand Up @@ -1442,6 +1523,227 @@ mod tests {
assert_optimized_plan_eq(plan, expected)
}

/// verifies that when partitioning by 'a' and 'b', and filtering by 'b', 'b' is pushed
#[test]
fn filter_move_window() -> Result<()> {
let table_scan = test_table_scan()?;

let window = Expr::WindowFunction(WindowFunction::new(
WindowFunctionDefinition::WindowUDF(
datafusion_functions_window::rank::rank_udwf(),
),
vec![],
))
.partition_by(vec![col("a"), col("b")])
.order_by(vec![col("c").sort(true, true)])
.build()
.unwrap();

let plan = LogicalPlanBuilder::from(table_scan)
.window(vec![window])?
.filter(col("b").gt(lit(10i64)))?
.build()?;

let expected = "\
WindowAggr: windowExpr=[[rank() PARTITION BY [test.a, test.b] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
\n TableScan: test, full_filters=[test.b > Int64(10)]";
assert_optimized_plan_eq(plan, expected)
}

/// verifies that when partitioning by 'a' and 'b', and filtering by 'a' and 'b', both 'a' and
/// 'b' are pushed
#[test]
fn filter_move_complex_window() -> Result<()> {
let table_scan = test_table_scan()?;

let window = Expr::WindowFunction(WindowFunction::new(
WindowFunctionDefinition::WindowUDF(
datafusion_functions_window::rank::rank_udwf(),
),
vec![],
))
.partition_by(vec![col("a"), col("b")])
.order_by(vec![col("c").sort(true, true)])
.build()
.unwrap();

let plan = LogicalPlanBuilder::from(table_scan)
.window(vec![window])?
.filter(and(col("a").gt(lit(10i64)), col("b").eq(lit(1i64))))?
.build()?;

let expected = "\
WindowAggr: windowExpr=[[rank() PARTITION BY [test.a, test.b] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
\n TableScan: test, full_filters=[test.a > Int64(10), test.b = Int64(1)]";
assert_optimized_plan_eq(plan, expected)
}

/// verifies that when partitioning by 'a' and filtering by 'a' and 'b', only 'a' is pushed
#[test]
fn filter_move_partial_window() -> Result<()> {
let table_scan = test_table_scan()?;

let window = Expr::WindowFunction(WindowFunction::new(
WindowFunctionDefinition::WindowUDF(
datafusion_functions_window::rank::rank_udwf(),
),
vec![],
))
.partition_by(vec![col("a")])
.order_by(vec![col("c").sort(true, true)])
.build()
.unwrap();

let plan = LogicalPlanBuilder::from(table_scan)
.window(vec![window])?
.filter(and(col("a").gt(lit(10i64)), col("b").eq(lit(1i64))))?
.build()?;

let expected = "\
Filter: test.b = Int64(1)\
\n WindowAggr: windowExpr=[[rank() PARTITION BY [test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
\n TableScan: test, full_filters=[test.a > Int64(10)]";
assert_optimized_plan_eq(plan, expected)
}

/// verifies that filters on partition expressions are not pushed, as the single expression
/// column is not available to the user, unlike with aggregations
#[test]
fn filter_expression_keep_window() -> Result<()> {
let table_scan = test_table_scan()?;

let window = Expr::WindowFunction(WindowFunction::new(
WindowFunctionDefinition::WindowUDF(
datafusion_functions_window::rank::rank_udwf(),
),
vec![],
))
.partition_by(vec![add(col("a"), col("b"))]) // PARTITION BY a + b
.order_by(vec![col("c").sort(true, true)])
.build()
.unwrap();

let plan = LogicalPlanBuilder::from(table_scan)
.window(vec![window])?
// unlike with aggregations, single partition column "test.a + test.b" is not available
// to the plan, so we use multiple columns when filtering
.filter(add(col("a"), col("b")).gt(lit(10i64)))?
.build()?;

let expected = "\
Filter: test.a + test.b > Int64(10)\
\n WindowAggr: windowExpr=[[rank() PARTITION BY [test.a + test.b] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
\n TableScan: test";
assert_optimized_plan_eq(plan, expected)
}

/// verifies that filters are not pushed on order by columns (that are not used in partitioning)
#[test]
fn filter_order_keep_window() -> Result<()> {
let table_scan = test_table_scan()?;

let window = Expr::WindowFunction(WindowFunction::new(
WindowFunctionDefinition::WindowUDF(
datafusion_functions_window::rank::rank_udwf(),
),
vec![],
))
.partition_by(vec![col("a")])
.order_by(vec![col("c").sort(true, true)])
.build()
.unwrap();

let plan = LogicalPlanBuilder::from(table_scan)
.window(vec![window])?
.filter(col("c").gt(lit(10i64)))?
.build()?;

let expected = "\
Filter: test.c > Int64(10)\
\n WindowAggr: windowExpr=[[rank() PARTITION BY [test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
\n TableScan: test";
assert_optimized_plan_eq(plan, expected)
}

/// verifies that when we use multiple window functions with a common partition key, the filter
/// on that key is pushed
#[test]
fn filter_multiple_windows_common_partitions() -> Result<()> {
let table_scan = test_table_scan()?;

let window1 = Expr::WindowFunction(WindowFunction::new(
WindowFunctionDefinition::WindowUDF(
datafusion_functions_window::rank::rank_udwf(),
),
vec![],
))
.partition_by(vec![col("a")])
.order_by(vec![col("c").sort(true, true)])
.build()
.unwrap();

let window2 = Expr::WindowFunction(WindowFunction::new(
WindowFunctionDefinition::WindowUDF(
datafusion_functions_window::rank::rank_udwf(),
),
vec![],
))
.partition_by(vec![col("b"), col("a")])
.order_by(vec![col("c").sort(true, true)])
.build()
.unwrap();

let plan = LogicalPlanBuilder::from(table_scan)
.window(vec![window1, window2])?
.filter(col("a").gt(lit(10i64)))? // a appears in both window functions
.build()?;

let expected = "\
WindowAggr: windowExpr=[[rank() PARTITION BY [test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, rank() PARTITION BY [test.b, test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
\n TableScan: test, full_filters=[test.a > Int64(10)]";
assert_optimized_plan_eq(plan, expected)
}

/// verifies that when we use multiple window functions with different partitions keys, the
/// filter cannot be pushed
#[test]
fn filter_multiple_windows_disjoint_partitions() -> Result<()> {
let table_scan = test_table_scan()?;

let window1 = Expr::WindowFunction(WindowFunction::new(
WindowFunctionDefinition::WindowUDF(
datafusion_functions_window::rank::rank_udwf(),
),
vec![],
))
.partition_by(vec![col("a")])
.order_by(vec![col("c").sort(true, true)])
.build()
.unwrap();

let window2 = Expr::WindowFunction(WindowFunction::new(
WindowFunctionDefinition::WindowUDF(
datafusion_functions_window::rank::rank_udwf(),
),
vec![],
))
.partition_by(vec![col("b"), col("a")])
.order_by(vec![col("c").sort(true, true)])
.build()
.unwrap();

let plan = LogicalPlanBuilder::from(table_scan)
.window(vec![window1, window2])?
.filter(col("b").gt(lit(10i64)))? // b only appears in one window function
.build()?;

let expected = "\
Filter: test.b > Int64(10)\
\n WindowAggr: windowExpr=[[rank() PARTITION BY [test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, rank() PARTITION BY [test.b, test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
\n TableScan: test";
assert_optimized_plan_eq(plan, expected)
}

/// verifies that a filter is pushed to before a projection, the filter expression is correctly re-written
#[test]
fn alias() -> Result<()> {
Expand Down
Loading

0 comments on commit ad5a04f

Please sign in to comment.