Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert nth_value builtIn function to User Defined Window Function #13201

Merged
merged 23 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions datafusion/functions-window/src/nth_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,8 @@ pub fn last_value(arg: datafusion_expr::Expr) -> datafusion_expr::Expr {

/// Create an expression to represent the `nth_value` window function
///
pub fn nth_value(arg: datafusion_expr::Expr, n: Option<i64>) -> datafusion_expr::Expr {
let n_lit = n.map(|v| v.lit()).unwrap_or(ScalarValue::Null.lit());
nth_value_udwf().call(vec![arg, n_lit])
pub fn nth_value(arg: datafusion_expr::Expr, n: i64) -> datafusion_expr::Expr {
nth_value_udwf().call(vec![arg, n.lit()])
}

/// Tag to differentiate special use cases of the NTH_VALUE built-in window function.
Expand Down
4 changes: 4 additions & 0 deletions datafusion/proto/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use datafusion::functions_aggregate::expr_fn::{
};
use datafusion::functions_aggregate::min_max::max_udaf;
use datafusion::functions_nested::map::map;
use datafusion::functions_window;
use datafusion::functions_window::expr_fn::{
cume_dist, dense_rank, lag, lead, ntile, percent_rank, rank, row_number,
};
Expand Down Expand Up @@ -911,6 +912,9 @@ async fn roundtrip_expr_api() -> Result<()> {
count_distinct(lit(1)),
first_value(lit(1), None),
first_value(lit(1), Some(vec![lit(2).sort(true, true)])),
functions_window::nth_value::first_value(lit(1)),
functions_window::nth_value::last_value(lit(1)),
buraksenn marked this conversation as resolved.
Show resolved Hide resolved
functions_window::nth_value::nth_value(lit(1), 1),
avg(lit(1.5)),
covar_samp(lit(1.5), lit(2.2)),
covar_pop(lit(1.5), lit(2.2)),
Expand Down
45 changes: 3 additions & 42 deletions datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,9 @@ use datafusion::datasource::physical_plan::{
};
use datafusion::execution::FunctionRegistry;
use datafusion::functions_aggregate::sum::sum_udaf;
use datafusion::functions_window::nth_value::first_value_udwf;
use datafusion::logical_expr::{create_udf, JoinType, Operator, Volatility};
use datafusion::physical_expr::expressions::Literal;
use datafusion::physical_expr::window::{BuiltInWindowExpr, SlidingAggregateWindowExpr};
use datafusion::physical_expr::window::SlidingAggregateWindowExpr;
use datafusion::physical_expr::{
LexOrdering, LexRequirement, PhysicalSortRequirement, ScalarFunctionExpr,
};
Expand All @@ -74,9 +73,7 @@ use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::union::{InterleaveExec, UnionExec};
use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec};
use datafusion::physical_plan::windows::{
create_udwf_window_expr, PlainAggregateWindowExpr, WindowAggExec,
};
use datafusion::physical_plan::windows::{PlainAggregateWindowExpr, WindowAggExec};
use datafusion::physical_plan::{ExecutionPlan, Partitioning, PhysicalExpr, Statistics};
use datafusion::prelude::SessionContext;
use datafusion::scalar::ScalarValue;
Expand All @@ -88,11 +85,9 @@ use datafusion_common::stats::Precision;
use datafusion_common::{
internal_err, not_impl_err, DataFusionError, Result, UnnestOptions,
};
use datafusion_expr::WindowFunctionDefinition::WindowUDF;
use datafusion_expr::{
Accumulator, AccumulatorFactoryFunction, AggregateUDF, ColumnarValue, ScalarUDF,
Signature, SimpleAggregateUDF, WindowFrame, WindowFrameBound,
WindowFunctionDefinition,
};
use datafusion_functions_aggregate::average::avg_udaf;
use datafusion_functions_aggregate::nth_value::nth_value_udaf;
Expand All @@ -101,7 +96,6 @@ use datafusion_proto::physical_plan::{
AsExecutionPlan, DefaultPhysicalExtensionCodec, PhysicalExtensionCodec,
};
use datafusion_proto::protobuf;
use datafusion_proto::protobuf::logical_expr_node::ExprType::WindowExpr;

/// Perform a serde roundtrip and assert that the string representation of the before and after plans
/// are identical. Note that this often isn't sufficient to guarantee that no information is
Expand Down Expand Up @@ -275,35 +269,6 @@ fn roundtrip_window() -> Result<()> {
let field_b = Field::new("b", DataType::Int64, false);
let schema = Arc::new(Schema::new(vec![field_a, field_b]));

let window_frame = WindowFrame::new_bounds(
datafusion_expr::WindowFrameUnits::Range,
WindowFrameBound::Preceding(ScalarValue::Int64(None)),
WindowFrameBound::CurrentRow,
);

let first_value_window = create_udwf_window_expr(
&first_value_udwf(),
&[col("a", &schema)?],
schema.as_ref(),
"FIRST_VALUE(a) PARTITION BY [b] ORDER BY [a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW".to_string(),
false,
)?;
// "FIRST_VALUE(a) PARTITION BY [b] ORDER BY [a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW",
let builtin_window_expr = Arc::new(BuiltInWindowExpr::new(
first_value_window,
&[col("b", &schema)?],
&LexOrdering {
inner: vec![PhysicalSortExpr {
expr: col("a", &schema)?,
options: SortOptions {
descending: false,
nulls_first: false,
},
}],
},
Arc::new(window_frame),
));

let plain_aggr_window_expr = Arc::new(PlainAggregateWindowExpr::new(
AggregateExprBuilder::new(
avg_udaf(),
Expand Down Expand Up @@ -341,11 +306,7 @@ fn roundtrip_window() -> Result<()> {
let input = Arc::new(EmptyExec::new(schema.clone()));

roundtrip_test(Arc::new(WindowAggExec::try_new(
vec![
plain_aggr_window_expr,
sliding_aggr_window_expr,
builtin_window_expr,
],
vec![plain_aggr_window_expr, sliding_aggr_window_expr],
input,
vec![col("b", &schema)?],
)?))
Expand Down
6 changes: 3 additions & 3 deletions datafusion/sqllogictest/test_files/errors.slt
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ regr_slope(c11, '2') over () as min1
from aggregate_test_100
order by c9

# WindowFunction with BuiltInWindowFunction wrong signature
# WindowFunction wrong signature
statement error
select
c9,
Expand All @@ -132,17 +132,17 @@ DataFusion error: Error during planning: Error during planning: Coercion from [I
nth_value()
nth_value(Any)
nth_value(Any, Any)
from aggregate_test_100
order by c9


# nth_value with wrong name
statement error DataFusion error: Error during planning: Invalid function 'nth_vlue'.\nDid you mean 'nth_value'?
buraksenn marked this conversation as resolved.
Show resolved Hide resolved
SELECT
NTH_VLUE(c4, 2) OVER()
FROM aggregate_test_100
ORDER BY c9
LIMIT 5;

# first_value with wrong name
statement error DataFusion error: Error during planning: Invalid function 'frst_value'.\nDid you mean 'first_value'?
SELECT
FRST_VALUE(c4, 2) OVER()
Expand Down
Loading