diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 2c71cb80d755..bcf803573cdf 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -1946,12 +1946,12 @@ mod tests { use datafusion_common_runtime::SpawnedTask; use datafusion_expr::expr::WindowFunction; use datafusion_expr::{ - cast, create_udf, lit, BuiltInWindowFunction, ExprFunctionExt, - ScalarFunctionImplementation, Volatility, WindowFrame, WindowFrameBound, - WindowFrameUnits, WindowFunctionDefinition, + cast, create_udf, lit, ExprFunctionExt, ScalarFunctionImplementation, Volatility, + WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition, }; use datafusion_functions_aggregate::expr_fn::{array_agg, count_distinct}; use datafusion_functions_window::expr_fn::row_number; + use datafusion_functions_window::nth_value::first_value_udwf; use datafusion_physical_expr::expressions::Column; use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties}; use sqlparser::ast::NullTreatment; @@ -2177,9 +2177,7 @@ mod tests { // build plan using Table API let t = test_table().await?; let first_row = Expr::WindowFunction(WindowFunction::new( - WindowFunctionDefinition::BuiltInWindowFunction( - BuiltInWindowFunction::FirstValue, - ), + WindowFunctionDefinition::WindowUDF(first_value_udwf()), vec![col("aggregate_test_100.c1")], )) .partition_by(vec![col("aggregate_test_100.c2")]) diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs b/datafusion/core/tests/fuzz_cases/window_fuzz.rs index 5bfb4d97ed70..dd1fefa8af13 100644 --- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs @@ -34,8 +34,7 @@ use datafusion_common::{Result, ScalarValue}; use datafusion_common_runtime::SpawnedTask; use datafusion_expr::type_coercion::functions::data_types_with_aggregate_udf; use datafusion_expr::{ - BuiltInWindowFunction, WindowFrame, WindowFrameBound, WindowFrameUnits, - WindowFunctionDefinition, + WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition, }; use datafusion_functions_aggregate::count::count_udaf; use datafusion_functions_aggregate::min_max::{max_udaf, min_udaf}; @@ -46,6 +45,9 @@ use test_utils::add_empty_batches; use datafusion::functions_window::row_number::row_number_udwf; use datafusion_functions_window::lead_lag::{lag_udwf, lead_udwf}; +use datafusion_functions_window::nth_value::{ + first_value_udwf, last_value_udwf, nth_value_udwf, +}; use datafusion_functions_window::rank::{dense_rank_udwf, rank_udwf}; use datafusion_physical_expr_common::sort_expr::LexOrdering; use hashbrown::HashMap; @@ -418,27 +420,21 @@ fn get_random_function( window_fn_map.insert( "first_value", ( - WindowFunctionDefinition::BuiltInWindowFunction( - BuiltInWindowFunction::FirstValue, - ), + WindowFunctionDefinition::WindowUDF(first_value_udwf()), vec![arg.clone()], ), ); window_fn_map.insert( "last_value", ( - WindowFunctionDefinition::BuiltInWindowFunction( - BuiltInWindowFunction::LastValue, - ), + WindowFunctionDefinition::WindowUDF(last_value_udwf()), vec![arg.clone()], ), ); window_fn_map.insert( "nth_value", ( - WindowFunctionDefinition::BuiltInWindowFunction( - BuiltInWindowFunction::NthValue, - ), + WindowFunctionDefinition::WindowUDF(nth_value_udwf()), vec![ arg.clone(), lit(ScalarValue::Int64(Some(rng.gen_range(1..10)))), diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index a9c183952fc7..129edac09666 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -21,7 +21,6 @@ use std::collections::{HashMap, HashSet}; use std::fmt::{self, Display, Formatter, Write}; use std::hash::{Hash, Hasher}; use std::mem; -use std::str::FromStr; use std::sync::Arc; use crate::expr_fn::binary_expr; @@ -823,23 +822,6 @@ impl WindowFunction { } } -/// Find DataFusion's built-in window function by name. -pub fn find_df_window_func(name: &str) -> Option { - let name = name.to_lowercase(); - // Code paths for window functions leveraging ordinary aggregators and - // built-in window functions are quite different, and the same function - // may have different implementations for these cases. If the sought - // function is not found among built-in window functions, we search for - // it among aggregate functions. - if let Ok(built_in_function) = BuiltInWindowFunction::from_str(name.as_str()) { - Some(WindowFunctionDefinition::BuiltInWindowFunction( - built_in_function, - )) - } else { - None - } -} - /// EXISTS expression #[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)] pub struct Exists { @@ -2530,77 +2512,6 @@ mod test { use super::*; - #[test] - fn test_first_value_return_type() -> Result<()> { - let fun = find_df_window_func("first_value").unwrap(); - let observed = fun.return_type(&[DataType::Utf8], &[true], "")?; - assert_eq!(DataType::Utf8, observed); - - let observed = fun.return_type(&[DataType::UInt64], &[true], "")?; - assert_eq!(DataType::UInt64, observed); - - Ok(()) - } - - #[test] - fn test_last_value_return_type() -> Result<()> { - let fun = find_df_window_func("last_value").unwrap(); - let observed = fun.return_type(&[DataType::Utf8], &[true], "")?; - assert_eq!(DataType::Utf8, observed); - - let observed = fun.return_type(&[DataType::Float64], &[true], "")?; - assert_eq!(DataType::Float64, observed); - - Ok(()) - } - - #[test] - fn test_nth_value_return_type() -> Result<()> { - let fun = find_df_window_func("nth_value").unwrap(); - let observed = - fun.return_type(&[DataType::Utf8, DataType::UInt64], &[true, true], "")?; - assert_eq!(DataType::Utf8, observed); - - let observed = - fun.return_type(&[DataType::Float64, DataType::UInt64], &[true, true], "")?; - assert_eq!(DataType::Float64, observed); - - Ok(()) - } - - #[test] - fn test_window_function_case_insensitive() -> Result<()> { - let names = vec!["first_value", "last_value", "nth_value"]; - for name in names { - let fun = find_df_window_func(name).unwrap(); - let fun2 = find_df_window_func(name.to_uppercase().as_str()).unwrap(); - assert_eq!(fun, fun2); - if fun.to_string() == "first_value" || fun.to_string() == "last_value" { - assert_eq!(fun.to_string(), name); - } else { - assert_eq!(fun.to_string(), name.to_uppercase()); - } - } - Ok(()) - } - - #[test] - fn test_find_df_window_function() { - assert_eq!( - find_df_window_func("first_value"), - Some(WindowFunctionDefinition::BuiltInWindowFunction( - BuiltInWindowFunction::FirstValue - )) - ); - assert_eq!( - find_df_window_func("LAST_value"), - Some(WindowFunctionDefinition::BuiltInWindowFunction( - BuiltInWindowFunction::LastValue - )) - ); - assert_eq!(find_df_window_func("not_exist"), None) - } - #[test] fn test_display_wildcard() { assert_eq!(format!("{}", wildcard()), "*"); diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index 849d9604808c..d1ec7e8f7bc2 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -64,7 +64,6 @@ pub mod type_coercion; pub mod utils; pub mod var_provider; pub mod window_frame; -pub mod window_function; pub mod window_state; pub use built_in_window_function::BuiltInWindowFunction; diff --git a/datafusion/expr/src/window_function.rs b/datafusion/expr/src/window_function.rs deleted file mode 100644 index be2b6575e2e9..000000000000 --- a/datafusion/expr/src/window_function.rs +++ /dev/null @@ -1,26 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::{expr::WindowFunction, BuiltInWindowFunction, Expr, Literal}; - -/// Create an expression to represent the `nth_value` window function -pub fn nth_value(arg: Expr, n: i64) -> Expr { - Expr::WindowFunction(WindowFunction::new( - BuiltInWindowFunction::NthValue, - vec![arg, n.lit()], - )) -} diff --git a/datafusion/functions-window/src/lib.rs b/datafusion/functions-window/src/lib.rs index ff8542838df9..9f8e54a0423b 100644 --- a/datafusion/functions-window/src/lib.rs +++ b/datafusion/functions-window/src/lib.rs @@ -22,6 +22,7 @@ //! //! [DataFusion]: https://crates.io/crates/datafusion //! + use std::sync::Arc; use log::debug; @@ -34,6 +35,7 @@ pub mod macros; pub mod cume_dist; pub mod lead_lag; +pub mod nth_value; pub mod ntile; pub mod rank; pub mod row_number; @@ -44,6 +46,7 @@ pub mod expr_fn { pub use super::cume_dist::cume_dist; pub use super::lead_lag::lag; pub use super::lead_lag::lead; + pub use super::nth_value::{first_value, last_value, nth_value}; pub use super::ntile::ntile; pub use super::rank::{dense_rank, percent_rank, rank}; pub use super::row_number::row_number; @@ -60,6 +63,9 @@ pub fn all_default_window_functions() -> Vec> { rank::dense_rank_udwf(), rank::percent_rank_udwf(), ntile::ntile_udwf(), + nth_value::first_value_udwf(), + nth_value::last_value_udwf(), + nth_value::nth_value_udwf(), ] } /// Registers all enabled packages with a [`FunctionRegistry`] diff --git a/datafusion/functions-window/src/nth_value.rs b/datafusion/functions-window/src/nth_value.rs new file mode 100644 index 000000000000..cff49c1f4c85 --- /dev/null +++ b/datafusion/functions-window/src/nth_value.rs @@ -0,0 +1,554 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! `nth_value` window function implementation + +use crate::utils::{get_scalar_value_from_args, get_signed_integer}; + +use std::any::Any; +use std::cmp::Ordering; +use std::fmt::Debug; +use std::ops::Range; +use std::sync::OnceLock; + +use datafusion_common::arrow::array::ArrayRef; +use datafusion_common::arrow::datatypes::{DataType, Field}; +use datafusion_common::{exec_datafusion_err, exec_err, Result, ScalarValue}; +use datafusion_expr::window_doc_sections::DOC_SECTION_ANALYTICAL; +use datafusion_expr::window_state::WindowAggState; +use datafusion_expr::{ + Documentation, Literal, PartitionEvaluator, ReversedUDWF, Signature, TypeSignature, + Volatility, WindowUDFImpl, +}; +use datafusion_functions_window_common::field; +use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; +use field::WindowUDFFieldArgs; + +get_or_init_udwf!( + First, + first_value, + "returns the first value in the window frame", + NthValue::first +); +get_or_init_udwf!( + Last, + last_value, + "returns the last value in the window frame", + NthValue::last +); +get_or_init_udwf!( + NthValue, + nth_value, + "returns the nth value in the window frame", + NthValue::nth +); + +/// Create an expression to represent the `first_value` window function +/// +pub fn first_value(arg: datafusion_expr::Expr) -> datafusion_expr::Expr { + first_value_udwf().call(vec![arg]) +} + +/// Create an expression to represent the `last_value` window function +/// +pub fn last_value(arg: datafusion_expr::Expr) -> datafusion_expr::Expr { + last_value_udwf().call(vec![arg]) +} + +/// Create an expression to represent the `nth_value` window function +/// +pub fn nth_value(arg: datafusion_expr::Expr, n: i64) -> datafusion_expr::Expr { + nth_value_udwf().call(vec![arg, n.lit()]) +} + +/// Tag to differentiate special use cases of the NTH_VALUE built-in window function. +#[derive(Debug, Copy, Clone)] +pub enum NthValueKind { + First, + Last, + Nth, +} + +impl NthValueKind { + fn name(&self) -> &'static str { + match self { + NthValueKind::First => "first_value", + NthValueKind::Last => "last_value", + NthValueKind::Nth => "nth_value", + } + } +} + +#[derive(Debug)] +pub struct NthValue { + signature: Signature, + kind: NthValueKind, +} + +impl NthValue { + /// Create a new `nth_value` function + pub fn new(kind: NthValueKind) -> Self { + Self { + signature: Signature::one_of( + vec![ + TypeSignature::Any(0), + TypeSignature::Any(1), + TypeSignature::Any(2), + ], + Volatility::Immutable, + ), + kind, + } + } + + pub fn first() -> Self { + Self::new(NthValueKind::First) + } + + pub fn last() -> Self { + Self::new(NthValueKind::Last) + } + pub fn nth() -> Self { + Self::new(NthValueKind::Nth) + } +} + +static FIRST_VALUE_DOCUMENTATION: OnceLock = OnceLock::new(); + +fn get_first_value_doc() -> &'static Documentation { + FIRST_VALUE_DOCUMENTATION.get_or_init(|| { + Documentation::builder() + .with_doc_section(DOC_SECTION_ANALYTICAL) + .with_description( + "Returns value evaluated at the row that is the first row of the window \ + frame.", + ) + .with_syntax_example("first_value(expression)") + .with_argument("expression", "Expression to operate on") + .build() + .unwrap() + }) +} + +static LAST_VALUE_DOCUMENTATION: OnceLock = OnceLock::new(); + +fn get_last_value_doc() -> &'static Documentation { + LAST_VALUE_DOCUMENTATION.get_or_init(|| { + Documentation::builder() + .with_doc_section(DOC_SECTION_ANALYTICAL) + .with_description( + "Returns value evaluated at the row that is the last row of the window \ + frame.", + ) + .with_syntax_example("last_value(expression)") + .with_argument("expression", "Expression to operate on") + .build() + .unwrap() + }) +} + +static NTH_VALUE_DOCUMENTATION: OnceLock = OnceLock::new(); + +fn get_nth_value_doc() -> &'static Documentation { + NTH_VALUE_DOCUMENTATION.get_or_init(|| { + Documentation::builder() + .with_doc_section(DOC_SECTION_ANALYTICAL) + .with_description( + "Returns value evaluated at the row that is the nth row of the window \ + frame (counting from 1); null if no such row.", + ) + .with_syntax_example("nth_value(expression, n)") + .with_argument( + "expression", + "The name the column of which nth \ + value to retrieve", + ) + .with_argument("n", "Integer. Specifies the n in nth") + .build() + .unwrap() + }) +} + +impl WindowUDFImpl for NthValue { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + self.kind.name() + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn partition_evaluator( + &self, + partition_evaluator_args: PartitionEvaluatorArgs, + ) -> Result> { + let state = NthValueState { + finalized_result: None, + kind: self.kind, + }; + + if !matches!(self.kind, NthValueKind::Nth) { + return Ok(Box::new(NthValueEvaluator { + state, + ignore_nulls: partition_evaluator_args.ignore_nulls(), + n: 0, + })); + } + + let n = + match get_scalar_value_from_args(partition_evaluator_args.input_exprs(), 1) + .map_err(|_e| { + exec_datafusion_err!( + "Expected a signed integer literal for the second argument of nth_value") + })? + .map(get_signed_integer) + { + Some(Ok(n)) => { + if partition_evaluator_args.is_reversed() { + -n + } else { + n + } + } + _ => { + return exec_err!( + "Expected a signed integer literal for the second argument of nth_value" + ) + } + }; + + Ok(Box::new(NthValueEvaluator { + state, + ignore_nulls: partition_evaluator_args.ignore_nulls(), + n, + })) + } + + fn field(&self, field_args: WindowUDFFieldArgs) -> Result { + let nullable = true; + let return_type = field_args.input_types().first().unwrap_or(&DataType::Null); + + Ok(Field::new(field_args.name(), return_type.clone(), nullable)) + } + + fn reverse_expr(&self) -> ReversedUDWF { + match self.kind { + NthValueKind::First => ReversedUDWF::Reversed(last_value_udwf()), + NthValueKind::Last => ReversedUDWF::Reversed(first_value_udwf()), + NthValueKind::Nth => ReversedUDWF::Reversed(nth_value_udwf()), + } + } + + fn documentation(&self) -> Option<&Documentation> { + match self.kind { + NthValueKind::First => Some(get_first_value_doc()), + NthValueKind::Last => Some(get_last_value_doc()), + NthValueKind::Nth => Some(get_nth_value_doc()), + } + } +} + +#[derive(Debug, Clone)] +pub struct NthValueState { + // In certain cases, we can finalize the result early. Consider this usage: + // ``` + // FIRST_VALUE(increasing_col) OVER window AS my_first_value + // WINDOW (ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) AS window + // ``` + // The result will always be the first entry in the table. We can store such + // early-finalizing results and then just reuse them as necessary. This opens + // opportunities to prune our datasets. + pub finalized_result: Option, + pub kind: NthValueKind, +} + +#[derive(Debug)] +pub(crate) struct NthValueEvaluator { + state: NthValueState, + ignore_nulls: bool, + n: i64, +} + +impl PartitionEvaluator for NthValueEvaluator { + /// When the window frame has a fixed beginning (e.g UNBOUNDED PRECEDING), + /// for some functions such as FIRST_VALUE, LAST_VALUE and NTH_VALUE, we + /// can memoize the result. Once result is calculated, it will always stay + /// same. Hence, we do not need to keep past data as we process the entire + /// dataset. + fn memoize(&mut self, state: &mut WindowAggState) -> Result<()> { + let out = &state.out_col; + let size = out.len(); + let mut buffer_size = 1; + // Decide if we arrived at a final result yet: + let (is_prunable, is_reverse_direction) = match self.state.kind { + NthValueKind::First => { + let n_range = + state.window_frame_range.end - state.window_frame_range.start; + (n_range > 0 && size > 0, false) + } + NthValueKind::Last => (true, true), + NthValueKind::Nth => { + let n_range = + state.window_frame_range.end - state.window_frame_range.start; + match self.n.cmp(&0) { + Ordering::Greater => ( + n_range >= (self.n as usize) && size > (self.n as usize), + false, + ), + Ordering::Less => { + let reverse_index = (-self.n) as usize; + buffer_size = reverse_index; + // Negative index represents reverse direction. + (n_range >= reverse_index, true) + } + Ordering::Equal => (false, false), + } + } + }; + // Do not memoize results when nulls are ignored. + if is_prunable && !self.ignore_nulls { + if self.state.finalized_result.is_none() && !is_reverse_direction { + let result = ScalarValue::try_from_array(out, size - 1)?; + self.state.finalized_result = Some(result); + } + state.window_frame_range.start = + state.window_frame_range.end.saturating_sub(buffer_size); + } + Ok(()) + } + + fn evaluate( + &mut self, + values: &[ArrayRef], + range: &Range, + ) -> Result { + if let Some(ref result) = self.state.finalized_result { + Ok(result.clone()) + } else { + // FIRST_VALUE, LAST_VALUE, NTH_VALUE window functions take a single column, values will have size 1. + let arr = &values[0]; + let n_range = range.end - range.start; + if n_range == 0 { + // We produce None if the window is empty. + return ScalarValue::try_from(arr.data_type()); + } + + // Extract valid indices if ignoring nulls. + let valid_indices = if self.ignore_nulls { + // Calculate valid indices, inside the window frame boundaries + let slice = arr.slice(range.start, n_range); + let valid_indices = slice + .nulls() + .map(|nulls| { + nulls + .valid_indices() + // Add offset `range.start` to valid indices, to point correct index in the original arr. + .map(|idx| idx + range.start) + .collect::>() + }) + .unwrap_or_default(); + if valid_indices.is_empty() { + return ScalarValue::try_from(arr.data_type()); + } + Some(valid_indices) + } else { + None + }; + match self.state.kind { + NthValueKind::First => { + if let Some(valid_indices) = &valid_indices { + ScalarValue::try_from_array(arr, valid_indices[0]) + } else { + ScalarValue::try_from_array(arr, range.start) + } + } + NthValueKind::Last => { + if let Some(valid_indices) = &valid_indices { + ScalarValue::try_from_array( + arr, + valid_indices[valid_indices.len() - 1], + ) + } else { + ScalarValue::try_from_array(arr, range.end - 1) + } + } + NthValueKind::Nth => { + match self.n.cmp(&0) { + Ordering::Greater => { + // SQL indices are not 0-based. + let index = (self.n as usize) - 1; + if index >= n_range { + // Outside the range, return NULL: + ScalarValue::try_from(arr.data_type()) + } else if let Some(valid_indices) = valid_indices { + if index >= valid_indices.len() { + return ScalarValue::try_from(arr.data_type()); + } + ScalarValue::try_from_array(&arr, valid_indices[index]) + } else { + ScalarValue::try_from_array(arr, range.start + index) + } + } + Ordering::Less => { + let reverse_index = (-self.n) as usize; + if n_range < reverse_index { + // Outside the range, return NULL: + ScalarValue::try_from(arr.data_type()) + } else if let Some(valid_indices) = valid_indices { + if reverse_index > valid_indices.len() { + return ScalarValue::try_from(arr.data_type()); + } + let new_index = + valid_indices[valid_indices.len() - reverse_index]; + ScalarValue::try_from_array(&arr, new_index) + } else { + ScalarValue::try_from_array( + arr, + range.start + n_range - reverse_index, + ) + } + } + Ordering::Equal => ScalarValue::try_from(arr.data_type()), + } + } + } + } + } + + fn supports_bounded_execution(&self) -> bool { + true + } + + fn uses_window_frame(&self) -> bool { + true + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::*; + use datafusion_common::cast::as_int32_array; + use datafusion_physical_expr::expressions::{Column, Literal}; + use datafusion_physical_expr_common::physical_expr::PhysicalExpr; + use std::sync::Arc; + + fn test_i32_result( + expr: NthValue, + partition_evaluator_args: PartitionEvaluatorArgs, + expected: Int32Array, + ) -> Result<()> { + let arr: ArrayRef = Arc::new(Int32Array::from(vec![1, -2, 3, -4, 5, -6, 7, 8])); + let values = vec![arr]; + let mut ranges: Vec> = vec![]; + for i in 0..8 { + ranges.push(Range { + start: 0, + end: i + 1, + }) + } + let mut evaluator = expr.partition_evaluator(partition_evaluator_args)?; + let result = ranges + .iter() + .map(|range| evaluator.evaluate(&values, range)) + .collect::>>()?; + let result = ScalarValue::iter_to_array(result.into_iter())?; + let result = as_int32_array(&result)?; + assert_eq!(expected, *result); + Ok(()) + } + + #[test] + fn first_value() -> Result<()> { + let expr = Arc::new(Column::new("c3", 0)) as Arc; + test_i32_result( + NthValue::first(), + PartitionEvaluatorArgs::new(&[expr], &[DataType::Int32], false, false), + Int32Array::from(vec![1; 8]).iter().collect::(), + ) + } + + #[test] + fn last_value() -> Result<()> { + let expr = Arc::new(Column::new("c3", 0)) as Arc; + test_i32_result( + NthValue::last(), + PartitionEvaluatorArgs::new(&[expr], &[DataType::Int32], false, false), + Int32Array::from(vec![ + Some(1), + Some(-2), + Some(3), + Some(-4), + Some(5), + Some(-6), + Some(7), + Some(8), + ]), + ) + } + + #[test] + fn nth_value_1() -> Result<()> { + let expr = Arc::new(Column::new("c3", 0)) as Arc; + let n_value = + Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc; + + test_i32_result( + NthValue::nth(), + PartitionEvaluatorArgs::new( + &[expr, n_value], + &[DataType::Int32], + false, + false, + ), + Int32Array::from(vec![1; 8]), + )?; + Ok(()) + } + + #[test] + fn nth_value_2() -> Result<()> { + let expr = Arc::new(Column::new("c3", 0)) as Arc; + let n_value = + Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) as Arc; + + test_i32_result( + NthValue::nth(), + PartitionEvaluatorArgs::new( + &[expr, n_value], + &[DataType::Int32], + false, + false, + ), + Int32Array::from(vec![ + None, + Some(-2), + Some(-2), + Some(-2), + Some(-2), + Some(-2), + Some(-2), + Some(-2), + ]), + )?; + Ok(()) + } +} diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index 7d71bd9ff17b..f00b49f50314 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -35,7 +35,6 @@ mod unknown_column; /// Module with some convenient methods used in expression building pub use crate::aggregate::stats::StatsType; -pub use crate::window::nth_value::NthValue; pub use crate::PhysicalSortExpr; pub use binary::{binary, similar_to, BinaryExpr}; diff --git a/datafusion/physical-expr/src/window/mod.rs b/datafusion/physical-expr/src/window/mod.rs index 3c37fff7a1ba..e7a318b860fd 100644 --- a/datafusion/physical-expr/src/window/mod.rs +++ b/datafusion/physical-expr/src/window/mod.rs @@ -18,7 +18,6 @@ mod aggregate; mod built_in; mod built_in_window_function_expr; -pub(crate) mod nth_value; mod sliding_aggregate; mod window_expr; @@ -26,7 +25,6 @@ pub use aggregate::PlainAggregateWindowExpr; pub use built_in::BuiltInWindowExpr; pub use built_in_window_function_expr::BuiltInWindowFunctionExpr; pub use sliding_aggregate::SlidingAggregateWindowExpr; -pub use window_expr::NthValueKind; pub use window_expr::PartitionBatches; pub use window_expr::PartitionKey; pub use window_expr::PartitionWindowAggStates; diff --git a/datafusion/physical-expr/src/window/nth_value.rs b/datafusion/physical-expr/src/window/nth_value.rs deleted file mode 100644 index 6ec3a23fc586..000000000000 --- a/datafusion/physical-expr/src/window/nth_value.rs +++ /dev/null @@ -1,415 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Defines physical expressions for `FIRST_VALUE`, `LAST_VALUE`, and `NTH_VALUE` -//! functions that can be evaluated at run time during query execution. - -use std::any::Any; -use std::cmp::Ordering; -use std::ops::Range; -use std::sync::Arc; - -use crate::window::window_expr::{NthValueKind, NthValueState}; -use crate::window::BuiltInWindowFunctionExpr; -use crate::PhysicalExpr; - -use arrow::array::{Array, ArrayRef}; -use arrow::datatypes::{DataType, Field}; -use datafusion_common::Result; -use datafusion_common::ScalarValue; -use datafusion_expr::window_state::WindowAggState; -use datafusion_expr::PartitionEvaluator; - -/// nth_value expression -#[derive(Debug)] -pub struct NthValue { - name: String, - expr: Arc, - /// Output data type - data_type: DataType, - kind: NthValueKind, - ignore_nulls: bool, -} - -impl NthValue { - /// Create a new FIRST_VALUE window aggregate function - pub fn first( - name: impl Into, - expr: Arc, - data_type: DataType, - ignore_nulls: bool, - ) -> Self { - Self { - name: name.into(), - expr, - data_type, - kind: NthValueKind::First, - ignore_nulls, - } - } - - /// Create a new LAST_VALUE window aggregate function - pub fn last( - name: impl Into, - expr: Arc, - data_type: DataType, - ignore_nulls: bool, - ) -> Self { - Self { - name: name.into(), - expr, - data_type, - kind: NthValueKind::Last, - ignore_nulls, - } - } - - /// Create a new NTH_VALUE window aggregate function - pub fn nth( - name: impl Into, - expr: Arc, - data_type: DataType, - n: i64, - ignore_nulls: bool, - ) -> Result { - Ok(Self { - name: name.into(), - expr, - data_type, - kind: NthValueKind::Nth(n), - ignore_nulls, - }) - } - - /// Get the NTH_VALUE kind - pub fn get_kind(&self) -> NthValueKind { - self.kind - } -} - -impl BuiltInWindowFunctionExpr for NthValue { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - fn field(&self) -> Result { - let nullable = true; - Ok(Field::new(&self.name, self.data_type.clone(), nullable)) - } - - fn expressions(&self) -> Vec> { - vec![Arc::clone(&self.expr)] - } - - fn name(&self) -> &str { - &self.name - } - - fn create_evaluator(&self) -> Result> { - let state = NthValueState { - finalized_result: None, - kind: self.kind, - }; - Ok(Box::new(NthValueEvaluator { - state, - ignore_nulls: self.ignore_nulls, - })) - } - - fn reverse_expr(&self) -> Option> { - let reversed_kind = match self.kind { - NthValueKind::First => NthValueKind::Last, - NthValueKind::Last => NthValueKind::First, - NthValueKind::Nth(idx) => NthValueKind::Nth(-idx), - }; - Some(Arc::new(Self { - name: self.name.clone(), - expr: Arc::clone(&self.expr), - data_type: self.data_type.clone(), - kind: reversed_kind, - ignore_nulls: self.ignore_nulls, - })) - } -} - -/// Value evaluator for nth_value functions -#[derive(Debug)] -pub(crate) struct NthValueEvaluator { - state: NthValueState, - ignore_nulls: bool, -} - -impl PartitionEvaluator for NthValueEvaluator { - /// When the window frame has a fixed beginning (e.g UNBOUNDED PRECEDING), - /// for some functions such as FIRST_VALUE, LAST_VALUE and NTH_VALUE, we - /// can memoize the result. Once result is calculated, it will always stay - /// same. Hence, we do not need to keep past data as we process the entire - /// dataset. - fn memoize(&mut self, state: &mut WindowAggState) -> Result<()> { - let out = &state.out_col; - let size = out.len(); - let mut buffer_size = 1; - // Decide if we arrived at a final result yet: - let (is_prunable, is_reverse_direction) = match self.state.kind { - NthValueKind::First => { - let n_range = - state.window_frame_range.end - state.window_frame_range.start; - (n_range > 0 && size > 0, false) - } - NthValueKind::Last => (true, true), - NthValueKind::Nth(n) => { - let n_range = - state.window_frame_range.end - state.window_frame_range.start; - match n.cmp(&0) { - Ordering::Greater => { - (n_range >= (n as usize) && size > (n as usize), false) - } - Ordering::Less => { - let reverse_index = (-n) as usize; - buffer_size = reverse_index; - // Negative index represents reverse direction. - (n_range >= reverse_index, true) - } - Ordering::Equal => (false, false), - } - } - }; - // Do not memoize results when nulls are ignored. - if is_prunable && !self.ignore_nulls { - if self.state.finalized_result.is_none() && !is_reverse_direction { - let result = ScalarValue::try_from_array(out, size - 1)?; - self.state.finalized_result = Some(result); - } - state.window_frame_range.start = - state.window_frame_range.end.saturating_sub(buffer_size); - } - Ok(()) - } - - fn evaluate( - &mut self, - values: &[ArrayRef], - range: &Range, - ) -> Result { - if let Some(ref result) = self.state.finalized_result { - Ok(result.clone()) - } else { - // FIRST_VALUE, LAST_VALUE, NTH_VALUE window functions take a single column, values will have size 1. - let arr = &values[0]; - let n_range = range.end - range.start; - if n_range == 0 { - // We produce None if the window is empty. - return ScalarValue::try_from(arr.data_type()); - } - - // Extract valid indices if ignoring nulls. - let valid_indices = if self.ignore_nulls { - // Calculate valid indices, inside the window frame boundaries - let slice = arr.slice(range.start, n_range); - let valid_indices = slice - .nulls() - .map(|nulls| { - nulls - .valid_indices() - // Add offset `range.start` to valid indices, to point correct index in the original arr. - .map(|idx| idx + range.start) - .collect::>() - }) - .unwrap_or_default(); - if valid_indices.is_empty() { - return ScalarValue::try_from(arr.data_type()); - } - Some(valid_indices) - } else { - None - }; - match self.state.kind { - NthValueKind::First => { - if let Some(valid_indices) = &valid_indices { - ScalarValue::try_from_array(arr, valid_indices[0]) - } else { - ScalarValue::try_from_array(arr, range.start) - } - } - NthValueKind::Last => { - if let Some(valid_indices) = &valid_indices { - ScalarValue::try_from_array( - arr, - valid_indices[valid_indices.len() - 1], - ) - } else { - ScalarValue::try_from_array(arr, range.end - 1) - } - } - NthValueKind::Nth(n) => { - match n.cmp(&0) { - Ordering::Greater => { - // SQL indices are not 0-based. - let index = (n as usize) - 1; - if index >= n_range { - // Outside the range, return NULL: - ScalarValue::try_from(arr.data_type()) - } else if let Some(valid_indices) = valid_indices { - if index >= valid_indices.len() { - return ScalarValue::try_from(arr.data_type()); - } - ScalarValue::try_from_array(&arr, valid_indices[index]) - } else { - ScalarValue::try_from_array(arr, range.start + index) - } - } - Ordering::Less => { - let reverse_index = (-n) as usize; - if n_range < reverse_index { - // Outside the range, return NULL: - ScalarValue::try_from(arr.data_type()) - } else if let Some(valid_indices) = valid_indices { - if reverse_index > valid_indices.len() { - return ScalarValue::try_from(arr.data_type()); - } - let new_index = - valid_indices[valid_indices.len() - reverse_index]; - ScalarValue::try_from_array(&arr, new_index) - } else { - ScalarValue::try_from_array( - arr, - range.start + n_range - reverse_index, - ) - } - } - Ordering::Equal => ScalarValue::try_from(arr.data_type()), - } - } - } - } - } - - fn supports_bounded_execution(&self) -> bool { - true - } - - fn uses_window_frame(&self) -> bool { - true - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::expressions::Column; - use arrow::{array::*, datatypes::*}; - use datafusion_common::cast::as_int32_array; - - fn test_i32_result(expr: NthValue, expected: Int32Array) -> Result<()> { - let arr: ArrayRef = Arc::new(Int32Array::from(vec![1, -2, 3, -4, 5, -6, 7, 8])); - let values = vec![arr]; - let schema = Schema::new(vec![Field::new("arr", DataType::Int32, false)]); - let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?; - let mut ranges: Vec> = vec![]; - for i in 0..8 { - ranges.push(Range { - start: 0, - end: i + 1, - }) - } - let mut evaluator = expr.create_evaluator()?; - let values = expr.evaluate_args(&batch)?; - let result = ranges - .iter() - .map(|range| evaluator.evaluate(&values, range)) - .collect::>>()?; - let result = ScalarValue::iter_to_array(result.into_iter())?; - let result = as_int32_array(&result)?; - assert_eq!(expected, *result); - Ok(()) - } - - #[test] - fn first_value() -> Result<()> { - let first_value = NthValue::first( - "first_value".to_owned(), - Arc::new(Column::new("arr", 0)), - DataType::Int32, - false, - ); - test_i32_result(first_value, Int32Array::from(vec![1; 8]))?; - Ok(()) - } - - #[test] - fn last_value() -> Result<()> { - let last_value = NthValue::last( - "last_value".to_owned(), - Arc::new(Column::new("arr", 0)), - DataType::Int32, - false, - ); - test_i32_result( - last_value, - Int32Array::from(vec![ - Some(1), - Some(-2), - Some(3), - Some(-4), - Some(5), - Some(-6), - Some(7), - Some(8), - ]), - )?; - Ok(()) - } - - #[test] - fn nth_value_1() -> Result<()> { - let nth_value = NthValue::nth( - "nth_value".to_owned(), - Arc::new(Column::new("arr", 0)), - DataType::Int32, - 1, - false, - )?; - test_i32_result(nth_value, Int32Array::from(vec![1; 8]))?; - Ok(()) - } - - #[test] - fn nth_value_2() -> Result<()> { - let nth_value = NthValue::nth( - "nth_value".to_owned(), - Arc::new(Column::new("arr", 0)), - DataType::Int32, - 2, - false, - )?; - test_i32_result( - nth_value, - Int32Array::from(vec![ - None, - Some(-2), - Some(-2), - Some(-2), - Some(-2), - Some(-2), - Some(-2), - Some(-2), - ]), - )?; - Ok(()) - } -} diff --git a/datafusion/physical-expr/src/window/window_expr.rs b/datafusion/physical-expr/src/window/window_expr.rs index 828e5ad20625..8b130506cdea 100644 --- a/datafusion/physical-expr/src/window/window_expr.rs +++ b/datafusion/physical-expr/src/window/window_expr.rs @@ -530,28 +530,6 @@ pub enum WindowFn { Aggregate(Box), } -/// Tag to differentiate special use cases of the NTH_VALUE built-in window function. -#[derive(Debug, Copy, Clone)] -pub enum NthValueKind { - First, - Last, - Nth(i64), -} - -#[derive(Debug, Clone)] -pub struct NthValueState { - // In certain cases, we can finalize the result early. Consider this usage: - // ``` - // FIRST_VALUE(increasing_col) OVER window AS my_first_value - // WINDOW (ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) AS window - // ``` - // The result will always be the first entry in the table. We can store such - // early-finalizing results and then just reuse them as necessary. This opens - // opportunities to prune our datasets. - pub finalized_result: Option, - pub kind: NthValueKind, -} - /// Key for IndexMap for each unique partition /// /// For instance, if window frame is `OVER(PARTITION BY a,b)`, diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index a9f9b22fafda..64fd0f49a233 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -70,6 +70,7 @@ tokio = { workspace = true } [dev-dependencies] criterion = { version = "0.5", features = ["async_futures"] } datafusion-functions-aggregate = { workspace = true } +datafusion-functions-window = { workspace = true } rstest = { workspace = true } rstest_reuse = "0.7.0" tokio = { workspace = true, features = [ diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index fd1e1ed6eb3a..20fd53a04f7c 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -1163,12 +1163,12 @@ mod tests { use std::task::{Context, Poll}; use std::time::Duration; - use crate::common::collect; use crate::expressions::PhysicalSortExpr; - use crate::memory::MemoryExec; use crate::projection::ProjectionExec; use crate::streaming::{PartitionStream, StreamingTableExec}; - use crate::windows::{create_window_expr, BoundedWindowAggExec, InputOrderMode}; + use crate::windows::{ + create_udwf_window_expr, create_window_expr, BoundedWindowAggExec, InputOrderMode, + }; use crate::{execute_stream, get_plan_string, ExecutionPlan}; use arrow_array::builder::{Int64Builder, UInt64Builder}; @@ -1185,12 +1185,14 @@ mod tests { WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition, }; use datafusion_functions_aggregate::count::count_udaf; - use datafusion_physical_expr::expressions::{col, Column, NthValue}; - use datafusion_physical_expr::window::{ - BuiltInWindowExpr, BuiltInWindowFunctionExpr, - }; + use datafusion_functions_window::nth_value::last_value_udwf; + use datafusion_functions_window::nth_value::nth_value_udwf; + use datafusion_physical_expr::expressions::{col, Column, Literal}; use datafusion_physical_expr::{LexOrdering, PhysicalExpr}; + use crate::common::collect; + use crate::memory::MemoryExec; + use datafusion_physical_expr::window::BuiltInWindowExpr; use futures::future::Shared; use futures::{pin_mut, ready, FutureExt, Stream, StreamExt}; use itertools::Itertools; @@ -1504,7 +1506,7 @@ mod tests { Ok(source) } - // Tests NTH_VALUE(negative index) with memoize feature. + // Tests NTH_VALUE(negative index) with memoize feature // To be able to trigger memoize feature for NTH_VALUE we need to // - feed BoundedWindowAggExec with batch stream data. // - Window frame should contain UNBOUNDED PRECEDING. @@ -1528,30 +1530,39 @@ mod tests { ) .map(|e| Arc::new(e) as Arc)?; let col_a = col("a", &schema)?; - let nth_value_func1 = NthValue::nth( - "nth_value(-1)", - Arc::clone(&col_a), - DataType::Int32, - 1, + let nth_value_func1 = create_udwf_window_expr( + &nth_value_udwf(), + &[ + Arc::clone(&col_a), + Arc::new(Literal::new(ScalarValue::Int32(Some(1)))), + ], + &schema, + "nth_value(-1)".to_string(), false, )? .reverse_expr() .unwrap(); - let nth_value_func2 = NthValue::nth( - "nth_value(-2)", - Arc::clone(&col_a), - DataType::Int32, - 2, + let nth_value_func2 = create_udwf_window_expr( + &nth_value_udwf(), + &[ + Arc::clone(&col_a), + Arc::new(Literal::new(ScalarValue::Int32(Some(2)))), + ], + &schema, + "nth_value(-2)".to_string(), false, )? .reverse_expr() .unwrap(); - let last_value_func = Arc::new(NthValue::last( - "last", - Arc::clone(&col_a), - DataType::Int32, + + let last_value_func = create_udwf_window_expr( + &last_value_udwf(), + &[Arc::clone(&col_a)], + &schema, + "last".to_string(), false, - )) as _; + )?; + let window_exprs = vec![ // LAST_VALUE(a) Arc::new(BuiltInWindowExpr::new( diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index da7f6d79e578..bf34f1fe567c 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -21,16 +21,15 @@ use std::borrow::Borrow; use std::sync::Arc; use crate::{ - expressions::{Literal, NthValue, PhysicalSortExpr}, - ExecutionPlan, ExecutionPlanProperties, InputOrderMode, PhysicalExpr, + expressions::PhysicalSortExpr, ExecutionPlan, ExecutionPlanProperties, + InputOrderMode, PhysicalExpr, }; use arrow::datatypes::Schema; use arrow_schema::{DataType, Field, SchemaRef}; -use datafusion_common::{exec_datafusion_err, exec_err, Result, ScalarValue}; +use datafusion_common::{exec_err, Result}; use datafusion_expr::{ - BuiltInWindowFunction, PartitionEvaluator, ReversedUDWF, WindowFrame, - WindowFunctionDefinition, WindowUDF, + PartitionEvaluator, ReversedUDWF, WindowFrame, WindowFunctionDefinition, WindowUDF, }; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; use datafusion_physical_expr::equivalence::collapse_lex_req; @@ -104,13 +103,8 @@ pub fn create_window_expr( ignore_nulls: bool, ) -> Result> { Ok(match fun { - WindowFunctionDefinition::BuiltInWindowFunction(fun) => { - Arc::new(BuiltInWindowExpr::new( - create_built_in_window_expr(fun, args, input_schema, name, ignore_nulls)?, - partition_by, - order_by, - window_frame, - )) + WindowFunctionDefinition::BuiltInWindowFunction(_fun) => { + unreachable!() } WindowFunctionDefinition::AggregateUDF(fun) => { let aggregate = AggregateExprBuilder::new(Arc::clone(fun), args.to_vec()) @@ -163,72 +157,8 @@ fn window_expr_from_aggregate_expr( } } -fn get_signed_integer(value: ScalarValue) -> Result { - if value.is_null() { - return Ok(0); - } - - if !value.data_type().is_integer() { - return exec_err!("Expected an integer value"); - } - - value.cast_to(&DataType::Int64)?.try_into() -} - -fn create_built_in_window_expr( - fun: &BuiltInWindowFunction, - args: &[Arc], - input_schema: &Schema, - name: String, - ignore_nulls: bool, -) -> Result> { - // derive the output datatype from incoming schema - let out_data_type: &DataType = input_schema.field_with_name(&name)?.data_type(); - - Ok(match fun { - BuiltInWindowFunction::NthValue => { - let arg = Arc::clone(&args[0]); - let n = get_signed_integer( - args[1] - .as_any() - .downcast_ref::() - .ok_or_else(|| { - exec_datafusion_err!("Expected a signed integer literal for the second argument of nth_value, got {}", args[1]) - })? - .value() - .clone(), - )?; - Arc::new(NthValue::nth( - name, - arg, - out_data_type.clone(), - n, - ignore_nulls, - )?) - } - BuiltInWindowFunction::FirstValue => { - let arg = Arc::clone(&args[0]); - Arc::new(NthValue::first( - name, - arg, - out_data_type.clone(), - ignore_nulls, - )) - } - BuiltInWindowFunction::LastValue => { - let arg = Arc::clone(&args[0]); - Arc::new(NthValue::last( - name, - arg, - out_data_type.clone(), - ignore_nulls, - )) - } - }) -} - /// Creates a `BuiltInWindowFunctionExpr` suitable for a user defined window function -fn create_udwf_window_expr( +pub fn create_udwf_window_expr( fun: &Arc, args: &[Arc], input_schema: &Schema, @@ -241,14 +171,29 @@ fn create_udwf_window_expr( .map(|arg| arg.data_type(input_schema)) .collect::>()?; - Ok(Arc::new(WindowUDFExpr { + let udwf_expr = Arc::new(WindowUDFExpr { fun: Arc::clone(fun), args: args.to_vec(), input_types, name, is_reversed: false, ignore_nulls, - })) + }); + + // Early validation of input expressions + // We create a partition evaluator because in the user-defined window + // implementation this is where code for parsing input expressions + // exist. The benefits are: + // - If any of the input expressions are invalid we catch them early + // in the planning phase, rather than during execution. + // - Maintains compatibility with built-in (now removed) window + // functions validation behavior. + // - Predictable and reliable error handling. + // See discussion here: + // https://github.com/apache/datafusion/pull/13201#issuecomment-2454209975 + let _ = udwf_expr.create_evaluator()?; + + Ok(udwf_expr) } /// Implements [`BuiltInWindowFunctionExpr`] for [`WindowUDF`] diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index f6c3b48c5951..8d733e1f83d4 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -515,9 +515,9 @@ enum BuiltInWindowFunction { // NTILE = 5; // LAG = 6; // LEAD = 7; - FIRST_VALUE = 8; - LAST_VALUE = 9; - NTH_VALUE = 10; + // FIRST_VALUE = 8; + // LAST_VALUE = 9; + // NTH_VALUE = 10; } message WindowExprNode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 625ec26885bf..dee8ac8a08db 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -1665,9 +1665,6 @@ impl serde::Serialize for BuiltInWindowFunction { { let variant = match self { Self::Unspecified => "UNSPECIFIED", - Self::FirstValue => "FIRST_VALUE", - Self::LastValue => "LAST_VALUE", - Self::NthValue => "NTH_VALUE", }; serializer.serialize_str(variant) } @@ -1680,9 +1677,6 @@ impl<'de> serde::Deserialize<'de> for BuiltInWindowFunction { { const FIELDS: &[&str] = &[ "UNSPECIFIED", - "FIRST_VALUE", - "LAST_VALUE", - "NTH_VALUE", ]; struct GeneratedVisitor; @@ -1724,9 +1718,6 @@ impl<'de> serde::Deserialize<'de> for BuiltInWindowFunction { { match value { "UNSPECIFIED" => Ok(BuiltInWindowFunction::Unspecified), - "FIRST_VALUE" => Ok(BuiltInWindowFunction::FirstValue), - "LAST_VALUE" => Ok(BuiltInWindowFunction::LastValue), - "NTH_VALUE" => Ok(BuiltInWindowFunction::NthValue), _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index d0d2fde933b0..69d557177a14 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1814,17 +1814,6 @@ pub struct PartitionStats { pub enum BuiltInWindowFunction { /// Unspecified = 0, - /// ROW_NUMBER = 0; - /// RANK = 1; - /// DENSE_RANK = 2; - /// PERCENT_RANK = 3; - /// CUME_DIST = 4; - /// NTILE = 5; - /// LAG = 6; - /// LEAD = 7; - FirstValue = 8, - LastValue = 9, - NthValue = 10, } impl BuiltInWindowFunction { /// String value of the enum field names used in the ProtoBuf definition. @@ -1834,18 +1823,12 @@ impl BuiltInWindowFunction { pub fn as_str_name(&self) -> &'static str { match self { Self::Unspecified => "UNSPECIFIED", - Self::FirstValue => "FIRST_VALUE", - Self::LastValue => "LAST_VALUE", - Self::NthValue => "NTH_VALUE", } } /// Creates an enum from field names used in the ProtoBuf definition. pub fn from_str_name(value: &str) -> ::core::option::Option { match value { "UNSPECIFIED" => Some(Self::Unspecified), - "FIRST_VALUE" => Some(Self::FirstValue), - "LAST_VALUE" => Some(Self::LastValue), - "NTH_VALUE" => Some(Self::NthValue), _ => None, } } diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index f25fb0bf2561..50ad9ca6a685 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -151,9 +151,6 @@ impl From for BuiltInWindowFunction { fn from(built_in_function: protobuf::BuiltInWindowFunction) -> Self { match built_in_function { protobuf::BuiltInWindowFunction::Unspecified => todo!(), - protobuf::BuiltInWindowFunction::FirstValue => Self::FirstValue, - protobuf::BuiltInWindowFunction::NthValue => Self::NthValue, - protobuf::BuiltInWindowFunction::LastValue => Self::LastValue, } } } diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index 8af7b19d9091..f442b96c7ac1 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -25,9 +25,9 @@ use datafusion_expr::expr::{ ScalarFunction, Unnest, }; use datafusion_expr::{ - logical_plan::PlanType, logical_plan::StringifiedPlan, BuiltInWindowFunction, Expr, - JoinConstraint, JoinType, SortExpr, TryCast, WindowFrame, WindowFrameBound, - WindowFrameUnits, WindowFunctionDefinition, + logical_plan::PlanType, logical_plan::StringifiedPlan, Expr, JoinConstraint, + JoinType, SortExpr, TryCast, WindowFrame, WindowFrameBound, WindowFrameUnits, + WindowFunctionDefinition, }; use crate::protobuf::RecursionUnnestOption; @@ -121,16 +121,6 @@ impl From<&StringifiedPlan> for protobuf::StringifiedPlan { } } -impl From<&BuiltInWindowFunction> for protobuf::BuiltInWindowFunction { - fn from(value: &BuiltInWindowFunction) -> Self { - match value { - BuiltInWindowFunction::FirstValue => Self::FirstValue, - BuiltInWindowFunction::LastValue => Self::LastValue, - BuiltInWindowFunction::NthValue => Self::NthValue, - } - } -} - impl From for protobuf::WindowFrameUnits { fn from(units: WindowFrameUnits) -> Self { match units { @@ -312,12 +302,7 @@ pub fn serialize_expr( null_treatment: _, }) => { let (window_function, fun_definition) = match fun { - WindowFunctionDefinition::BuiltInWindowFunction(fun) => ( - protobuf::window_expr_node::WindowFunction::BuiltInFunction( - protobuf::BuiltInWindowFunction::from(fun).into(), - ), - None, - ), + WindowFunctionDefinition::BuiltInWindowFunction(_fun) => unreachable!(), WindowFunctionDefinition::AggregateUDF(aggr_udf) => { let mut buf = Vec::new(); let _ = codec.try_encode_udaf(aggr_udf, &mut buf); diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index dc94ad075c53..183f14d00d54 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -20,14 +20,14 @@ use std::sync::Arc; #[cfg(feature = "parquet")] use datafusion::datasource::file_format::parquet::ParquetSink; -use datafusion::physical_expr::window::{NthValueKind, SlidingAggregateWindowExpr}; +use datafusion::physical_expr::window::SlidingAggregateWindowExpr; use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr}; use datafusion::physical_plan::expressions::{ BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr, - Literal, NegativeExpr, NotExpr, NthValue, TryCastExpr, + Literal, NegativeExpr, NotExpr, TryCastExpr, }; use datafusion::physical_plan::udaf::AggregateFunctionExpr; -use datafusion::physical_plan::windows::{BuiltInWindowExpr, PlainAggregateWindowExpr}; +use datafusion::physical_plan::windows::PlainAggregateWindowExpr; use datafusion::physical_plan::{Partitioning, PhysicalExpr, WindowExpr}; use datafusion::{ datasource::{ @@ -102,39 +102,10 @@ pub fn serialize_physical_window_expr( codec: &dyn PhysicalExtensionCodec, ) -> Result { let expr = window_expr.as_any(); - let mut args = window_expr.expressions().to_vec(); + let args = window_expr.expressions().to_vec(); let window_frame = window_expr.get_window_frame(); - let (window_function, fun_definition) = if let Some(built_in_window_expr) = - expr.downcast_ref::() - { - let expr = built_in_window_expr.get_built_in_func_expr(); - let built_in_fn_expr = expr.as_any(); - - let builtin_fn = - if let Some(nth_value_expr) = built_in_fn_expr.downcast_ref::() { - match nth_value_expr.get_kind() { - NthValueKind::First => protobuf::BuiltInWindowFunction::FirstValue, - NthValueKind::Last => protobuf::BuiltInWindowFunction::LastValue, - NthValueKind::Nth(n) => { - args.insert( - 1, - Arc::new(Literal::new( - datafusion_common::ScalarValue::Int64(Some(n)), - )), - ); - protobuf::BuiltInWindowFunction::NthValue - } - } - } else { - return not_impl_err!("BuiltIn function not supported: {expr:?}"); - }; - - ( - physical_window_expr_node::WindowFunction::BuiltInFunction(builtin_fn as i32), - None, - ) - } else if let Some(plain_aggr_window_expr) = + let (window_function, fun_definition) = if let Some(plain_aggr_window_expr) = expr.downcast_ref::() { serialize_physical_window_aggr_expr( diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index 14d91913e7cd..e2add03e85e6 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -47,6 +47,7 @@ use datafusion::functions_aggregate::expr_fn::{ }; use datafusion::functions_aggregate::min_max::max_udaf; use datafusion::functions_nested::map::map; +use datafusion::functions_window; use datafusion::functions_window::expr_fn::{ cume_dist, dense_rank, lag, lead, ntile, percent_rank, rank, row_number, }; @@ -910,6 +911,9 @@ async fn roundtrip_expr_api() -> Result<()> { count_distinct(lit(1)), first_value(lit(1), None), first_value(lit(1), Some(vec![lit(2).sort(true, true)])), + functions_window::nth_value::first_value(lit(1)), + functions_window::nth_value::last_value(lit(1)), + functions_window::nth_value::nth_value(lit(1), 1), avg(lit(1.5)), covar_samp(lit(1.5), lit(2.2)), covar_pop(lit(1.5), lit(2.2)), diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 8c8dcccee376..04cd65196b36 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -60,8 +60,7 @@ use datafusion::physical_plan::aggregates::{ use datafusion::physical_plan::analyze::AnalyzeExec; use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::expressions::{ - binary, cast, col, in_list, like, lit, BinaryExpr, Column, NotExpr, NthValue, - PhysicalSortExpr, + binary, cast, col, in_list, like, lit, BinaryExpr, Column, NotExpr, PhysicalSortExpr, }; use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::insert::DataSinkExec; @@ -75,9 +74,7 @@ use datafusion::physical_plan::repartition::RepartitionExec; use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::union::{InterleaveExec, UnionExec}; use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec}; -use datafusion::physical_plan::windows::{ - BuiltInWindowExpr, PlainAggregateWindowExpr, WindowAggExec, -}; +use datafusion::physical_plan::windows::{PlainAggregateWindowExpr, WindowAggExec}; use datafusion::physical_plan::{ExecutionPlan, Partitioning, PhysicalExpr, Statistics}; use datafusion::prelude::SessionContext; use datafusion::scalar::ScalarValue; @@ -273,32 +270,6 @@ fn roundtrip_window() -> Result<()> { let field_b = Field::new("b", DataType::Int64, false); let schema = Arc::new(Schema::new(vec![field_a, field_b])); - let window_frame = WindowFrame::new_bounds( - datafusion_expr::WindowFrameUnits::Range, - WindowFrameBound::Preceding(ScalarValue::Int64(None)), - WindowFrameBound::CurrentRow, - ); - - let builtin_window_expr = Arc::new(BuiltInWindowExpr::new( - Arc::new(NthValue::first( - "FIRST_VALUE(a) PARTITION BY [b] ORDER BY [a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", - col("a", &schema)?, - DataType::Int64, - false, - )), - &[col("b", &schema)?], - &LexOrdering{ - inner: vec![PhysicalSortExpr { - expr: col("a", &schema)?, - options: SortOptions { - descending: false, - nulls_first: false, - }, - }] - }, - Arc::new(window_frame), - )); - let plain_aggr_window_expr = Arc::new(PlainAggregateWindowExpr::new( AggregateExprBuilder::new( avg_udaf(), @@ -336,11 +307,7 @@ fn roundtrip_window() -> Result<()> { let input = Arc::new(EmptyExec::new(schema.clone())); roundtrip_test(Arc::new(WindowAggExec::try_new( - vec![ - builtin_window_expr, - plain_aggr_window_expr, - sliding_aggr_window_expr, - ], + vec![plain_aggr_window_expr, sliding_aggr_window_expr], input, vec![col("b", &schema)?], )?)) diff --git a/datafusion/sql/src/expr/function.rs b/datafusion/sql/src/expr/function.rs index 619eadcf0fb8..cb7255bb7873 100644 --- a/datafusion/sql/src/expr/function.rs +++ b/datafusion/sql/src/expr/function.rs @@ -23,20 +23,16 @@ use datafusion_common::{ DFSchema, Dependency, Result, }; use datafusion_expr::expr::WildcardOptions; +use datafusion_expr::expr::{ScalarFunction, Unnest}; use datafusion_expr::planner::PlannerResult; use datafusion_expr::{ expr, Expr, ExprFunctionExt, ExprSchemable, WindowFrame, WindowFunctionDefinition, }; -use datafusion_expr::{ - expr::{ScalarFunction, Unnest}, - BuiltInWindowFunction, -}; use sqlparser::ast::{ DuplicateTreatment, Expr as SQLExpr, Function as SQLFunction, FunctionArg, FunctionArgExpr, FunctionArgumentClause, FunctionArgumentList, FunctionArguments, NullTreatment, ObjectName, OrderByExpr, WindowType, }; -use strum::IntoEnumIterator; /// Suggest a valid function based on an invalid input function name /// @@ -52,7 +48,6 @@ pub fn suggest_valid_function( let mut funcs = Vec::new(); funcs.extend(ctx.udaf_names()); - funcs.extend(BuiltInWindowFunction::iter().map(|func| func.to_string())); funcs.extend(ctx.udwf_names()); funcs @@ -393,12 +388,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { }) { Ok(WindowFunctionDefinition::AggregateUDF(udaf.unwrap())) } else { - expr::find_df_window_func(name) - .or_else(|| { - self.context_provider - .get_window_meta(name) - .map(WindowFunctionDefinition::WindowUDF) - }) + self.context_provider + .get_window_meta(name) + .map(WindowFunctionDefinition::WindowUDF) .ok_or_else(|| { plan_datafusion_err!("There is no window function named {name}") }) diff --git a/datafusion/sqllogictest/test_files/errors.slt b/datafusion/sqllogictest/test_files/errors.slt index da46a7e5e679..92eb8cf12300 100644 --- a/datafusion/sqllogictest/test_files/errors.slt +++ b/datafusion/sqllogictest/test_files/errors.slt @@ -119,8 +119,8 @@ regr_slope(c11, '2') over () as min1 from aggregate_test_100 order by c9 -# WindowFunction with BuiltInWindowFunction wrong signature -statement error DataFusion error: Error during planning: No function matches the given name and argument types 'NTH_VALUE\(Int32, Int64, Int64\)'\. You might need to add explicit type casts\.\n\tCandidate functions:\n\tNTH_VALUE\(Any, Any\) +# WindowFunction wrong signature +statement error DataFusion error: Error during planning: Error during planning: Coercion from \[Int32, Int64, Int64\] to the signature OneOf\(\[Any\(0\), Any\(1\), Any\(2\)\]\) failed select c9, nth_value(c5, 2, 3) over (order by c9) as nv1 @@ -128,6 +128,23 @@ from aggregate_test_100 order by c9 +# nth_value with wrong name +statement error DataFusion error: Error during planning: Invalid function 'nth_vlue'.\nDid you mean 'nth_value'? +SELECT + NTH_VLUE(c4, 2) OVER() + FROM aggregate_test_100 + ORDER BY c9 + LIMIT 5; + +# first_value with wrong name +statement error DataFusion error: Error during planning: Invalid function 'frst_value'.\nDid you mean 'first_value'? +SELECT + FRST_VALUE(c4, 2) OVER() + FROM aggregate_test_100 + ORDER BY c9 + LIMIT 5; + + query error DataFusion error: Arrow error: Cast error: Cannot cast string 'foo' to value of Int64 type create table foo as values (1), ('foo'); diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index d593a985c458..8e3559a32684 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -2660,14 +2660,14 @@ EXPLAIN SELECT ---- logical_plan 01)Sort: annotated_data_finite.ts DESC NULLS FIRST, fetch=5 -02)--Projection: annotated_data_finite.ts, first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fv1, first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fv2, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lv1, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lv2, NTH_VALUE(annotated_data_finite.inc_col,Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS nv1, NTH_VALUE(annotated_data_finite.inc_col,Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS nv2, row_number() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING AS rn1, row_number() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS rn2, rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING AS rank1, rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS rank2, dense_rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING AS dense_rank1, dense_rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS dense_rank2, lag(annotated_data_finite.inc_col,Int64(1),Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING AS lag1, lag(annotated_data_finite.inc_col,Int64(2),Int64(1002)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lag2, lead(annotated_data_finite.inc_col,Int64(-1),Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING AS lead1, lead(annotated_data_finite.inc_col,Int64(4),Int64(1004)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lead2, first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fvr1, first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fvr2, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lvr1, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lvr2, lag(annotated_data_finite.inc_col,Int64(1),Int64(1001)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING AS lagr1, lag(annotated_data_finite.inc_col,Int64(2),Int64(1002)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lagr2, lead(annotated_data_finite.inc_col,Int64(-1),Int64(1001)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING AS leadr1, lead(annotated_data_finite.inc_col,Int64(4),Int64(1004)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS leadr2 -03)----WindowAggr: windowExpr=[[first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, NTH_VALUE(annotated_data_finite.inc_col, Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, NTH_VALUE(annotated_data_finite.inc_col, Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, row_number() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING, row_number() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING, rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, dense_rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING, dense_rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, lag(annotated_data_finite.inc_col, Int64(1), Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING, lag(annotated_data_finite.inc_col, Int64(2), Int64(1002)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, lead(annotated_data_finite.inc_col, Int64(-1), Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING, lead(annotated_data_finite.inc_col, Int64(4), Int64(1004)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING]] +02)--Projection: annotated_data_finite.ts, first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fv1, first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fv2, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lv1, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lv2, nth_value(annotated_data_finite.inc_col,Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS nv1, nth_value(annotated_data_finite.inc_col,Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS nv2, row_number() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING AS rn1, row_number() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS rn2, rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING AS rank1, rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS rank2, dense_rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING AS dense_rank1, dense_rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS dense_rank2, lag(annotated_data_finite.inc_col,Int64(1),Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING AS lag1, lag(annotated_data_finite.inc_col,Int64(2),Int64(1002)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lag2, lead(annotated_data_finite.inc_col,Int64(-1),Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING AS lead1, lead(annotated_data_finite.inc_col,Int64(4),Int64(1004)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lead2, first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fvr1, first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fvr2, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lvr1, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lvr2, lag(annotated_data_finite.inc_col,Int64(1),Int64(1001)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING AS lagr1, lag(annotated_data_finite.inc_col,Int64(2),Int64(1002)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lagr2, lead(annotated_data_finite.inc_col,Int64(-1),Int64(1001)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING AS leadr1, lead(annotated_data_finite.inc_col,Int64(4),Int64(1004)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS leadr2 +03)----WindowAggr: windowExpr=[[first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, nth_value(annotated_data_finite.inc_col, Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, nth_value(annotated_data_finite.inc_col, Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, row_number() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING, row_number() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING, rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, dense_rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING, dense_rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, lag(annotated_data_finite.inc_col, Int64(1), Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING, lag(annotated_data_finite.inc_col, Int64(2), Int64(1002)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, lead(annotated_data_finite.inc_col, Int64(-1), Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING, lead(annotated_data_finite.inc_col, Int64(4), Int64(1004)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING]] 04)------WindowAggr: windowExpr=[[first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, lag(annotated_data_finite.inc_col, Int64(1), Int64(1001)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING, lag(annotated_data_finite.inc_col, Int64(2), Int64(1002)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, lead(annotated_data_finite.inc_col, Int64(-1), Int64(1001)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING, lead(annotated_data_finite.inc_col, Int64(4), Int64(1004)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING]] 05)--------TableScan: annotated_data_finite projection=[ts, inc_col] physical_plan 01)SortExec: TopK(fetch=5), expr=[ts@0 DESC], preserve_partitioning=[false] -02)--ProjectionExec: expr=[ts@0 as ts, first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@10 as fv1, first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@11 as fv2, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@12 as lv1, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@13 as lv2, NTH_VALUE(annotated_data_finite.inc_col,Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@14 as nv1, NTH_VALUE(annotated_data_finite.inc_col,Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@15 as nv2, row_number() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@16 as rn1, row_number() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@17 as rn2, rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@18 as rank1, rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@19 as rank2, dense_rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@20 as dense_rank1, dense_rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@21 as dense_rank2, lag(annotated_data_finite.inc_col,Int64(1),Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@22 as lag1, lag(annotated_data_finite.inc_col,Int64(2),Int64(1002)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@23 as lag2, lead(annotated_data_finite.inc_col,Int64(-1),Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@24 as lead1, lead(annotated_data_finite.inc_col,Int64(4),Int64(1004)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@25 as lead2, first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@2 as fvr1, first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@3 as fvr2, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@4 as lvr1, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@5 as lvr2, lag(annotated_data_finite.inc_col,Int64(1),Int64(1001)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@6 as lagr1, lag(annotated_data_finite.inc_col,Int64(2),Int64(1002)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@7 as lagr2, lead(annotated_data_finite.inc_col,Int64(-1),Int64(1001)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@8 as leadr1, lead(annotated_data_finite.inc_col,Int64(4),Int64(1004)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@9 as leadr2] -03)----BoundedWindowAggExec: wdw=[first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)), is_causal: false }, first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)), is_causal: false }, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)), is_causal: false }, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)), is_causal: false }, NTH_VALUE(annotated_data_finite.inc_col,Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "NTH_VALUE(annotated_data_finite.inc_col,Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)), is_causal: false }, NTH_VALUE(annotated_data_finite.inc_col,Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "NTH_VALUE(annotated_data_finite.inc_col,Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)), is_causal: false }, row_number() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING: Ok(Field { name: "row_number() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)), is_causal: false }, row_number() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "row_number() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)), is_causal: false }, rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING: Ok(Field { name: "rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)), is_causal: false }, rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)), is_causal: false }, dense_rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING: Ok(Field { name: "dense_rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)), is_causal: false }, dense_rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "dense_rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)), is_causal: false }, lag(annotated_data_finite.inc_col,Int64(1),Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING: Ok(Field { name: "lag(annotated_data_finite.inc_col,Int64(1),Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)), is_causal: false }, lag(annotated_data_finite.inc_col,Int64(2),Int64(1002)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "lag(annotated_data_finite.inc_col,Int64(2),Int64(1002)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)), is_causal: false }, lead(annotated_data_finite.inc_col,Int64(-1),Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING: Ok(Field { name: "lead(annotated_data_finite.inc_col,Int64(-1),Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)), is_causal: false }, lead(annotated_data_finite.inc_col,Int64(4),Int64(1004)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "lead(annotated_data_finite.inc_col,Int64(4),Int64(1004)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)), is_causal: false }], mode=[Sorted] +02)--ProjectionExec: expr=[ts@0 as ts, first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@10 as fv1, first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@11 as fv2, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@12 as lv1, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@13 as lv2, nth_value(annotated_data_finite.inc_col,Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@14 as nv1, nth_value(annotated_data_finite.inc_col,Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@15 as nv2, row_number() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@16 as rn1, row_number() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@17 as rn2, rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@18 as rank1, rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@19 as rank2, dense_rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@20 as dense_rank1, dense_rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@21 as dense_rank2, lag(annotated_data_finite.inc_col,Int64(1),Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@22 as lag1, lag(annotated_data_finite.inc_col,Int64(2),Int64(1002)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@23 as lag2, lead(annotated_data_finite.inc_col,Int64(-1),Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@24 as lead1, lead(annotated_data_finite.inc_col,Int64(4),Int64(1004)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@25 as lead2, first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@2 as fvr1, first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@3 as fvr2, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@4 as lvr1, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@5 as lvr2, lag(annotated_data_finite.inc_col,Int64(1),Int64(1001)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@6 as lagr1, lag(annotated_data_finite.inc_col,Int64(2),Int64(1002)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@7 as lagr2, lead(annotated_data_finite.inc_col,Int64(-1),Int64(1001)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@8 as leadr1, lead(annotated_data_finite.inc_col,Int64(4),Int64(1004)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@9 as leadr2] +03)----BoundedWindowAggExec: wdw=[first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)), is_causal: false }, first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)), is_causal: false }, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)), is_causal: false }, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)), is_causal: false }, nth_value(annotated_data_finite.inc_col,Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "nth_value(annotated_data_finite.inc_col,Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)), is_causal: false }, nth_value(annotated_data_finite.inc_col,Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "nth_value(annotated_data_finite.inc_col,Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)), is_causal: false }, row_number() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING: Ok(Field { name: "row_number() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)), is_causal: false }, row_number() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "row_number() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)), is_causal: false }, rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING: Ok(Field { name: "rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)), is_causal: false }, rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)), is_causal: false }, dense_rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING: Ok(Field { name: "dense_rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)), is_causal: false }, dense_rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "dense_rank() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)), is_causal: false }, lag(annotated_data_finite.inc_col,Int64(1),Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING: Ok(Field { name: "lag(annotated_data_finite.inc_col,Int64(1),Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)), is_causal: false }, lag(annotated_data_finite.inc_col,Int64(2),Int64(1002)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "lag(annotated_data_finite.inc_col,Int64(2),Int64(1002)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)), is_causal: false }, lead(annotated_data_finite.inc_col,Int64(-1),Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING: Ok(Field { name: "lead(annotated_data_finite.inc_col,Int64(-1),Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)), is_causal: false }, lead(annotated_data_finite.inc_col,Int64(4),Int64(1004)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "lead(annotated_data_finite.inc_col,Int64(4),Int64(1004)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)), is_causal: false }], mode=[Sorted] 04)------BoundedWindowAggExec: wdw=[first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)), is_causal: false }, first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(10)), is_causal: false }, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)), is_causal: false }, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(10)), is_causal: false }, lag(annotated_data_finite.inc_col,Int64(1),Int64(1001)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING: Ok(Field { name: "lag(annotated_data_finite.inc_col,Int64(1),Int64(1001)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)), is_causal: false }, lag(annotated_data_finite.inc_col,Int64(2),Int64(1002)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "lag(annotated_data_finite.inc_col,Int64(2),Int64(1002)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(10)), is_causal: false }, lead(annotated_data_finite.inc_col,Int64(-1),Int64(1001)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING: Ok(Field { name: "lead(annotated_data_finite.inc_col,Int64(-1),Int64(1001)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)), is_causal: false }, lead(annotated_data_finite.inc_col,Int64(4),Int64(1004)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "lead(annotated_data_finite.inc_col,Int64(4),Int64(1004)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(10)), is_causal: false }], mode=[Sorted] 05)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true @@ -2783,15 +2783,15 @@ EXPLAIN SELECT logical_plan 01)Projection: first_value1, first_value2, last_value1, last_value2, nth_value1 02)--Sort: annotated_data_finite.inc_col ASC NULLS LAST, fetch=5 -03)----Projection: first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING AS first_value1, first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS first_value2, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING AS last_value1, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS last_value2, NTH_VALUE(annotated_data_finite.inc_col,Int64(2)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING AS nth_value1, annotated_data_finite.inc_col -04)------WindowAggr: windowExpr=[[first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING, NTH_VALUE(annotated_data_finite.inc_col, Int64(2)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING]] +03)----Projection: first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING AS first_value1, first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS first_value2, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING AS last_value1, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS last_value2, nth_value(annotated_data_finite.inc_col,Int64(2)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING AS nth_value1, annotated_data_finite.inc_col +04)------WindowAggr: windowExpr=[[first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING, nth_value(annotated_data_finite.inc_col, Int64(2)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING]] 05)--------WindowAggr: windowExpr=[[first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING]] 06)----------TableScan: annotated_data_finite projection=[ts, inc_col] physical_plan 01)ProjectionExec: expr=[first_value1@0 as first_value1, first_value2@1 as first_value2, last_value1@2 as last_value1, last_value2@3 as last_value2, nth_value1@4 as nth_value1] 02)--SortExec: TopK(fetch=5), expr=[inc_col@5 ASC NULLS LAST], preserve_partitioning=[false] -03)----ProjectionExec: expr=[first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@4 as first_value1, first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as first_value2, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@5 as last_value1, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@3 as last_value2, NTH_VALUE(annotated_data_finite.inc_col,Int64(2)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@6 as nth_value1, inc_col@1 as inc_col] -04)------BoundedWindowAggExec: wdw=[first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING: Ok(Field { name: "first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(1)), is_causal: false }, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING: Ok(Field { name: "last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(1)), is_causal: false }, NTH_VALUE(annotated_data_finite.inc_col,Int64(2)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING: Ok(Field { name: "NTH_VALUE(annotated_data_finite.inc_col,Int64(2)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(1)), is_causal: false }], mode=[Sorted] +03)----ProjectionExec: expr=[first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@4 as first_value1, first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as first_value2, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@5 as last_value1, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@3 as last_value2, nth_value(annotated_data_finite.inc_col,Int64(2)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@6 as nth_value1, inc_col@1 as inc_col] +04)------BoundedWindowAggExec: wdw=[first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING: Ok(Field { name: "first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(1)), is_causal: false }, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING: Ok(Field { name: "last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(1)), is_causal: false }, nth_value(annotated_data_finite.inc_col,Int64(2)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING: Ok(Field { name: "nth_value(annotated_data_finite.inc_col,Int64(2)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(1)), is_causal: false }], mode=[Sorted] 05)--------BoundedWindowAggExec: wdw=[first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(3)), is_causal: false }, last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(3)), is_causal: false }], mode=[Sorted] 06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true @@ -3565,13 +3565,13 @@ EXPLAIN SELECT c, NTH_VALUE(c, 2) OVER(order by c DESC) as nv1 ---- logical_plan 01)Sort: multiple_ordered_table.c ASC NULLS LAST, fetch=5 -02)--Projection: multiple_ordered_table.c, NTH_VALUE(multiple_ordered_table.c,Int64(2)) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS nv1 -03)----WindowAggr: windowExpr=[[NTH_VALUE(multiple_ordered_table.c, Int64(2)) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +02)--Projection: multiple_ordered_table.c, nth_value(multiple_ordered_table.c,Int64(2)) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS nv1 +03)----WindowAggr: windowExpr=[[nth_value(multiple_ordered_table.c, Int64(2)) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] 04)------TableScan: multiple_ordered_table projection=[c] physical_plan -01)ProjectionExec: expr=[c@0 as c, NTH_VALUE(multiple_ordered_table.c,Int64(2)) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as nv1] +01)ProjectionExec: expr=[c@0 as c, nth_value(multiple_ordered_table.c,Int64(2)) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as nv1] 02)--GlobalLimitExec: skip=0, fetch=5 -03)----WindowAggExec: wdw=[NTH_VALUE(multiple_ordered_table.c,Int64(2)) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "NTH_VALUE(multiple_ordered_table.c,Int64(2)) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int32(NULL)), is_causal: false }] +03)----WindowAggExec: wdw=[nth_value(multiple_ordered_table.c,Int64(2)) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "nth_value(multiple_ordered_table.c,Int64(2)) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int32(NULL)), is_causal: false }] 04)------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], has_header=true query II @@ -4864,7 +4864,7 @@ SELECT lag(column2, 1.1) OVER (order by column1) FROM t; query error DataFusion error: Execution error: Expected an integer value SELECT lead(column2, 1.1) OVER (order by column1) FROM t; -query error DataFusion error: Execution error: Expected an integer value +query error DataFusion error: Execution error: Expected a signed integer literal for the second argument of nth_value SELECT nth_value(column2, 1.1) OVER (order by column1) FROM t; statement ok @@ -4892,7 +4892,7 @@ DROP TABLE t1; statement ok CREATE TABLE t1(v1 BIGINT); -query error DataFusion error: Execution error: Expected a signed integer literal for the second argument of nth_value, got v1@0 +query error DataFusion error: Execution error: Expected a signed integer literal for the second argument of nth_value SELECT NTH_VALUE('+Inf'::Double, v1) OVER (PARTITION BY v1) FROM t1; statement ok diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index 890da7361d7c..1cce228527ec 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -30,8 +30,8 @@ use datafusion::execution::FunctionRegistry; use datafusion::logical_expr::expr::{Exists, InSubquery, Sort}; use datafusion::logical_expr::{ - expr::find_df_window_func, Aggregate, BinaryExpr, Case, EmptyRelation, Expr, - ExprSchemable, LogicalPlan, Operator, Projection, SortExpr, Values, + Aggregate, BinaryExpr, Case, EmptyRelation, Expr, ExprSchemable, LogicalPlan, + Operator, Projection, SortExpr, Values, }; use substrait::proto::aggregate_rel::Grouping; use substrait::proto::expression::subquery::set_predicate::PredicateOp; @@ -1683,8 +1683,6 @@ pub async fn from_substrait_rex( Ok(WindowFunctionDefinition::WindowUDF(udwf)) } else if let Ok(udaf) = ctx.udaf(fn_name) { Ok(WindowFunctionDefinition::AggregateUDF(udaf)) - } else if let Some(fun) = find_df_window_func(fn_name) { - Ok(fun) } else { not_impl_err!( "Window function {} is not supported: function anchor = {:?}", diff --git a/docs/source/user-guide/sql/window_functions_new.md b/docs/source/user-guide/sql/window_functions_new.md index ae3edb832fcb..1727dececbeb 100644 --- a/docs/source/user-guide/sql/window_functions_new.md +++ b/docs/source/user-guide/sql/window_functions_new.md @@ -218,8 +218,23 @@ row_number() ## Analytical Functions +- [first_value](#first_value) - [lag](#lag) +- [last_value](#last_value) - [lead](#lead) +- [nth_value](#nth_value) + +### `first_value` + +Returns value evaluated at the row that is the first row of the window frame. + +``` +first_value(expression) +``` + +#### Arguments + +- **expression**: Expression to operate on ### `lag` @@ -235,6 +250,18 @@ lag(expression, offset, default) - **offset**: Integer. Specifies how many rows back the value of expression should be retrieved. Defaults to 1. - **default**: The default value if the offset is not within the partition. Must be of the same type as expression. +### `last_value` + +Returns value evaluated at the row that is the last row of the window frame. + +``` +last_value(expression) +``` + +#### Arguments + +- **expression**: Expression to operate on + ### `lead` Returns value evaluated at the row that is offset rows after the current row within the partition; if there is no such row, instead return default (which must be of the same type as value). @@ -248,3 +275,16 @@ lead(expression, offset, default) - **expression**: Expression to operate on - **offset**: Integer. Specifies how many rows forward the value of expression should be retrieved. Defaults to 1. - **default**: The default value if the offset is not within the partition. Must be of the same type as expression. + +### `nth_value` + +Returns value evaluated at the row that is the nth row of the window frame (counting from 1); null if no such row. + +``` +nth_value(expression, n) +``` + +#### Arguments + +- **expression**: The name the column of which nth value to retrieve +- **n**: Integer. Specifies the n in nth