Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert nth_value builtIn function to User Defined Window Function #13201

Merged
merged 23 commits into from
Nov 13, 2024

Conversation

buraksenn
Copy link
Contributor

@buraksenn buraksenn commented Oct 31, 2024

Which issue does this PR close?

Closes #12649

Rationale for this change

Context: #8709

What changes are included in this PR?

  • cleanup BuiltinWindowFunctions
  • move builtin nth_value function to udwf

Are these changes tested?

  • yes

Are there any user-facing changes?

no

@github-actions github-actions bot added sql SQL Planner logical-expr Logical plan and expressions physical-expr Physical Expressions core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) substrait proto Related to proto crate labels Oct 31, 2024
@alamb
Copy link
Contributor

alamb commented Oct 31, 2024

THis is so exciting. FYI @jonathanc-n and @Omega359

@alamb
Copy link
Contributor

alamb commented Oct 31, 2024

It got pretty out of hand because of nth_value and BuiltinWindowFunction references across the repo so I wanted to open a draft PR to get feedback.

I personally think it would be fine to leave BuiltInWindowFunction initially while we port over the function, and them remove it as a follow on PR

Perhaps you can leave a stub in like

enum BuiltInWindowFunction {
  // Never created, will be removed in a follow on PR
  Stub
};

Then we can focus this PR on making sure that nth_value is working as a user defined function and then move on in the next PR to mechanically ripping out the old BuiltInWindowFunction

@buraksenn
Copy link
Contributor Author

It got pretty out of hand because of nth_value and BuiltinWindowFunction references across the repo so I wanted to open a draft PR to get feedback.

I personally think it would be fine to leave BuiltInWindowFunction initially while we port over the function, and them remove it as a follow on PR

Perhaps you can leave a stub in like

enum BuiltInWindowFunction {
  // Never created, will be removed in a follow on PR
  Stub
};

Then we can focus this PR on making sure that nth_value is working as a user defined function and then move on in the next PR to mechanically ripping out the old BuiltInWindowFunction

Thanks @alamb will continue with what you've said

@github-actions github-actions bot removed the sqllogictest SQL Logic Tests (.slt) label Nov 3, 2024
@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Nov 3, 2024
@buraksenn
Copy link
Contributor Author

Wanted to update here. I think I'm almost finished but probably encountered a side effect. This query fails in slt file:

External error: query failed: DataFusion error: Arrow error: Invalid argument error: It is not possible to concatenate arrays of different data types.
[SQL] select
c9,
sum(c5) over (order by c9) as sum1,
avg(c5) over (order by c9) as avg1,
count(c5) over (order by c9) as count1,
max(c5) over (order by c9) as max1,
min(c5) over (order by c9) as min1,
first_value(c5) over (order by c9) as fv1,
last_value(c5) over (order by c9) as lv1,
nth_value(c5, 2) over (order by c9) as nv1
from aggregate_test_100
order by c9
limit 5
at test_files/window.slt:97

Error: Execution("1 failures")
error: test failed, to rerun pass `-p datafusion-sqllogictest --test sqllogictests`

I hope to fix this and make this ready tomorrow

@jcsherin
Copy link
Contributor

jcsherin commented Nov 4, 2024

External error: query failed: DataFusion error: Arrow error: Invalid argument error: It is not possible to concatenate arrays of different data types.

In the built-in (older) version the output field is defined like:

    fn field(&self) -> Result<Field> {
        let nullable = true;
        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
    }

In the current code, the data type of the field is hard-coded as DataType::UInt64:

    fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
        let nullable = true;

        Ok(Field::new(field_args.name(), DataType::UInt64, nullable))
    }

To fix this use field_args to get the data type of the input expression rather than hard-coding DataType::UInt64.

@buraksenn
Copy link
Contributor Author

External error: query failed: DataFusion error: Arrow error: Invalid argument error: It is not possible to concatenate arrays of different data types.

In the built-in (older) version the output field is defined like:

    fn field(&self) -> Result<Field> {
        let nullable = true;
        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
    }

In the current code, the data type of the field is hard-coded as DataType::UInt64:

    fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
        let nullable = true;

        Ok(Field::new(field_args.name(), DataType::UInt64, nullable))
    }

To fix this use field_args to get the data type of the input expression rather than hard-coding DataType::UInt64.

Thanks, @jcsherin that was the fix. I've fixed that issue but encountered another one. I return Error from partition evaluator but I think it is not honored.

External error: query is expected to fail, but actually succeed:
[SQL] SELECT NTH_VALUE('+Inf'::Double, v1) OVER (PARTITION BY v1) FROM t1;
at test_files/window.slt:4895

Error: Execution("1 failures")
error: test failed, to rerun pass `-p datafusion-sqllogictest --test sqllogictests`

Caused by:
  process didn't exit successfully: `/Users/buraksen/d/datafusion/target/debug/deps/sqllogictests-267639e9b963ccc2 window` (exit status: 1)

But it should not succeed since:

        let n =
            match get_scalar_value_from_args(partition_evaluator_args.input_exprs(), 1)?
                .map(get_signed_integer)
            {
                Some(Ok(n)) => {
                    if partition_evaluator_args.is_reversed() {
                        -n
                    } else {
                        n
                    }
                }
                _ => {
                    return exec_err!(
                "Expected a signed integer literal for the second argument of nth_value"
            )
                }
            };

@jcsherin
Copy link
Contributor

jcsherin commented Nov 4, 2024

TL;DR

For invalid input expressions, built-in window functions fail early when converting logical plan to physical plan. But user-defined window functions will complete planning, and fail only during physical execution.

Validation of input expressions in user-defined window runs only during physical execution.

In this case is it not better for udwf to fail early when converting to physical plan?

A possible solution is to update WindowUDFImpl API so that we parse input expressions when creating the window expressions (similar to built-in window functions). And hook it here so argument parsing happens earlier:

fn create_udwf_window_expr(
fun: &Arc<WindowUDF>,
args: &[Arc<dyn PhysicalExpr>],
input_schema: &Schema,
name: String,
ignore_nulls: bool,
) -> Result<Arc<dyn BuiltInWindowFunctionExpr>> {

Edge Case: Empty Table

DataFusion CLI v42.2.0
> CREATE TABLE t1(v1 BIGINT);
0 row(s) fetched.
Elapsed 0.020 seconds.

There are currently no rows in t1 so an early return happens in WindowAggStream::compute_aggregates.

if batch.num_rows() == 0 {
return Ok(RecordBatch::new_empty(Arc::clone(&self.schema)));
}

The nth_value window function is never executed when this query is run on the empty table t1.

> SELECT NTH_VALUE('+Inf'::Double, v1) OVER (PARTITION BY v1) FROM t1;
+-------------------------------------------------------------------------------------------------------------+
| nth_value(Utf8("+Inf"),t1.v1) PARTITION BY [t1.v1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING |
+-------------------------------------------------------------------------------------------------------------+
+-------------------------------------------------------------------------------------------------------------+
0 row(s) fetched.
Elapsed 0.018 seconds.

After we insert a few values into t1 the nth_value aggregation will be computed. Only then do we see the expected error message,

> insert into t1 values (123), (456);
+-------+
| count |
+-------+
| 2     |
+-------+
1 row(s) fetched.
Elapsed 0.007 seconds.

> SELECT NTH_VALUE('+Inf'::Double, v1) OVER (PARTITION BY v1) FROM t1;
This feature is not implemented: There is only support Literal types for field at idx: 1 in Window Function

Planning divergence between built-in & user-defined window functions

In main branch where nth_value is a BuiltInWindowFunction the argument parsing fails early when mapping logical to physical plan. In explain output there is no physical plan.

> EXPLAIN SELECT NTH_VALUE('+Inf'::Double, v1) OVER (PARTITION BY v1) FROM t1;
+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type    | plan                                                                                                                                                                                                                                                     |
+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: NTH_VALUE(Utf8("+Inf"),t1.v1) PARTITION BY [t1.v1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING                                                                                                                                  |
|              |   WindowAggr: windowExpr=[[NTH_VALUE(Float64(inf), t1.v1) PARTITION BY [t1.v1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS NTH_VALUE(Utf8("+Inf"),t1.v1) PARTITION BY [t1.v1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] |
|              |     TableScan: t1 projection=[v1]                                                                                                                                                                                                                        |
+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched.
Elapsed 0.009 seconds.

But this is not the case for user-defined window functions. In this branch we instead see that a complete plan is built and failure is happening only when the query executes,

> EXPLAIN SELECT NTH_VALUE('+Inf'::Double, v1) OVER (PARTITION BY v1) FROM t1;
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Projection: nth_value(Utf8("+Inf"),t1.v1) PARTITION BY [t1.v1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING                                                                                                                                                                                                                                                                                                                                                                       |
|               |   WindowAggr: windowExpr=[[nth_value(Float64(inf), t1.v1) PARTITION BY [t1.v1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS nth_value(Utf8("+Inf"),t1.v1) PARTITION BY [t1.v1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]                                                                                                                                                                                                                                      |
|               |     TableScan: t1 projection=[v1]                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
| physical_plan | ProjectionExec: expr=[nth_value(Utf8("+Inf"),t1.v1) PARTITION BY [t1.v1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as nth_value(Utf8("+Inf"),t1.v1) PARTITION BY [t1.v1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]                                                                                                                                                                                                                                           |
|               |   WindowAggExec: wdw=[nth_value(Utf8("+Inf"),t1.v1) PARTITION BY [t1.v1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "nth_value(Utf8(\"+Inf\"),t1.v1) PARTITION BY [t1.v1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }] |
|               |     SortExec: expr=[v1@0 ASC NULLS LAST], preserve_partitioning=[false]                                                                                                                                                                                                                                                                                                                                                                                                                       |
|               |       MemoryExec: partitions=1, partition_sizes=[0]                                                                                                                                                                                                                                                                                                                                                                                                                                           |
|               |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 row(s) fetched.
Elapsed 0.019 seconds.

@buraksenn
Copy link
Contributor Author

@jcsherin thanks for the very detailed explanation. In this case, I think it would be better to update WindowUDFImpl in a followup PR for enhancement right? I can skip this test case in the scope of this PR. Correct me if I'm wrong please

@jcsherin
Copy link
Contributor

jcsherin commented Nov 5, 2024

I think it would be better to update WindowUDFImpl in a followup PR for enhancement right? I can skip this test case in the scope of this PR. Correct me if I'm wrong please

Sure, we can improve the API in another PR.

Here is a workaround that fixes the failing test:

// In datafusion/physical-plan/src/windows/mod.rs
fn create_udwf_window_expr(
    fun: &Arc<WindowUDF>,
    args: &[Arc<dyn PhysicalExpr>],
    input_schema: &Schema,
    name: String,
    ignore_nulls: bool,
) -> Result<Arc<dyn BuiltInWindowFunctionExpr>> {
    // need to get the types into an owned vec for some reason
    let input_types: Vec<_> = args
        .iter()
        .map(|arg| arg.data_type(input_schema))
        .collect::<Result<_>>()?;

    let udwf_expr =
        Arc::new(WindowUDFExpr {
            fun: Arc::clone(fun),
            args: args.to_vec(),
            input_types,
            name,
            is_reversed: false,
            ignore_nulls,
        });

    /// Early validation of input expressions
    ///
    /// We create a partition evaluator because in the user-defined window
    /// implementation this is where code for parsing input expressions
    /// exist. The benefits are:
    /// - If any of the input expressions are invalid we catch them early
    /// in the planning phase, rather than during execution.
    /// - Maintains compatibility with built-in (now removed) window
    /// functions validation behavior.
    /// - Predictable and reliable error handling.
    ///
    /// See discussion here:
    /// https://github.com/apache/datafusion/pull/13201#issuecomment-2454209975
    let _ = udwf_expr.create_evaluator()?;

    Ok(udwf_expr)
}

I verified that this works in your branch.

DataFusion CLI v42.2.0
> CREATE TABLE t1(v1 BIGINT);
0 row(s) fetched.
Elapsed 0.019 seconds.

> EXPLAIN SELECT NTH_VALUE('+Inf'::Double, v1) OVER (PARTITION BY v1) FROM t1;
+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type    | plan                                                                                                                                                                                                                                                     |
+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: nth_value(Utf8("+Inf"),t1.v1) PARTITION BY [t1.v1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING                                                                                                                                  |
|              |   WindowAggr: windowExpr=[[nth_value(Float64(inf), t1.v1) PARTITION BY [t1.v1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS nth_value(Utf8("+Inf"),t1.v1) PARTITION BY [t1.v1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] |
|              |     TableScan: t1 projection=[v1]                                                                                                                                                                                                                        |
+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched.
Elapsed 0.018 seconds.

> SELECT NTH_VALUE('+Inf'::Double, v1) OVER (PARTITION BY v1) FROM t1;
This feature is not implemented: There is only support Literal types for field at idx: 1 in Window Function

This workaround may not be ideal, but at least we do not have to skip this test. Also please feel free to update the code/comments as you see fit.


/// Create an expression to represent the `nth_value` window function
///
pub fn nth_value(arg: datafusion_expr::Expr, n: Option<i64>) -> datafusion_expr::Expr {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add a roundtrip logical plan test for this API here:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed and added test

@jcsherin
Copy link
Contributor

@buraksenn Tremendous effort 🙌. These changes look good to me.

@jcsherin
Copy link
Contributor

@buraksenn and @berkaysynnada Thanks!

@alamb This PR is ready.

@alamb
Copy link
Contributor

alamb commented Nov 13, 2024

Awesome -- thank you so much. I will review this PR hopefully later today

@alamb alamb changed the title Convert nth_value builtIn function to UDWF Convert nth_value builtIn function to User Defined Window Function Nov 13, 2024
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much @buraksenn , @jcsherin -- it is just so beautiful to see this PR now after all the work. It is basically perfect from my perspective 🏆

@@ -70,6 +70,7 @@ tokio = { workspace = true }
[dev-dependencies]
criterion = { version = "0.5", features = ["async_futures"] }
datafusion-functions-aggregate = { workspace = true }
datafusion-functions-window = { workspace = true }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some day I hope we can remove these dependencies (so we can make testing physical-plan faster, but not a part of this PR

// We create a partition evaluator because in the user-defined window
// implementation this is where code for parsing input expressions
// exist. The benefits are:
// - If any of the input expressions are invalid we catch them early
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯 for these comments that explain the rationale

@alamb alamb added the api change Changes the API exposed to users of the crate label Nov 13, 2024
@alamb alamb changed the title Convert nth_value builtIn function to User Defined Window Function Convert nth_value builtIn function to User Defined Window Function Nov 13, 2024
@alamb
Copy link
Contributor

alamb commented Nov 13, 2024

I also took the liberty of merging up from main to make sure we haven't hit any logical conflicts with this PR

@alamb
Copy link
Contributor

alamb commented Nov 13, 2024

I don't think there is any reason to wait around for this PR -- people know it is coming, so let's get this in 🚀

@alamb alamb merged commit 54ab128 into apache:main Nov 13, 2024
28 checks passed
mwylde pushed a commit to ArroyoSystems/arrow-datafusion that referenced this pull request Nov 22, 2024
…pache#13201)

* refactored nth_value

* continue

* test

* proto and rustlint

* fix datatype

* cont

* cont

* apply jcsherins early validation

* docs

* doc

* Apply suggestions from code review

Co-authored-by: Sherin Jacob <[email protected]>

* passes lint but does not have tests

* continue

* Update roundtrip_physical_plan.rs

* udwf, not udaf

* fix bounded but not fixed roundtrip

* added

* Update datafusion/sqllogictest/test_files/errors.slt

Co-authored-by: Sherin Jacob <[email protected]>

---------

Co-authored-by: Sherin Jacob <[email protected]>
Co-authored-by: berkaysynnada <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api change Changes the API exposed to users of the crate core Core DataFusion crate documentation Improvements or additions to documentation logical-expr Logical plan and expressions physical-expr Physical Expressions proto Related to proto crate sql SQL Planner sqllogictest SQL Logic Tests (.slt) substrait
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Convert BuiltInWindowFunction::{NthValue} to a user defined functions
4 participants