Skip to content

Commit

Permalink
feat: support group by unnest
Browse files Browse the repository at this point in the history
  • Loading branch information
JasonLi-cn committed Jul 15, 2024
1 parent bfd8156 commit f45f1b0
Show file tree
Hide file tree
Showing 3 changed files with 243 additions and 13 deletions.
108 changes: 99 additions & 9 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ use crate::planner::{
idents_to_table_reference, ContextProvider, PlannerContext, SqlToRel,
};
use crate::utils::{
check_columns_satisfy_exprs, extract_aliases, rebase_expr, resolve_aliases_to_exprs,
resolve_columns, resolve_positions_to_exprs, transform_bottom_unnest,
check_columns_satisfy_exprs, extract_aliases, rebase_expr, rebase_expr_by_name,
resolve_aliases_to_exprs, resolve_columns, resolve_positions_to_exprs,
transform_bottom_unnest,
};

use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result};
use datafusion_common::{Column, UnnestOptions};
use datafusion_expr::expr::Alias;
Expand Down Expand Up @@ -611,6 +613,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
group_by_exprs: &[Expr],
aggr_exprs: &[Expr],
) -> Result<(LogicalPlan, Vec<Expr>, Option<Expr>)> {
let (input, group_by_exprs, has_unnest) =
self.try_process_group_by_unnest(input, group_by_exprs, aggr_exprs)?;

// create the aggregate plan
let plan = LogicalPlanBuilder::from(input.clone())
.aggregate(group_by_exprs.to_vec(), aggr_exprs.to_vec())?
Expand Down Expand Up @@ -651,21 +656,34 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// now attempt to resolve columns and replace with fully-qualified columns
let aggr_projection_exprs = aggr_projection_exprs
.iter()
.map(|expr| resolve_columns(expr, input))
.map(|expr| resolve_columns(expr, &input))
.collect::<Result<Vec<Expr>>>()?;

// next we replace any expressions that are not a column with a column referencing
// an output column from the aggregate schema
let column_exprs_post_aggr = aggr_projection_exprs
.iter()
.map(|expr| expr_as_column_expr(expr, input))
.map(|expr| expr_as_column_expr(expr, &input))
.collect::<Result<Vec<Expr>>>()?;

// next we re-write the projection
let select_exprs_post_aggr = select_exprs
.iter()
.map(|expr| rebase_expr(expr, &aggr_projection_exprs, input))
.collect::<Result<Vec<Expr>>>()?;
let select_exprs_post_aggr = if has_unnest {
let aggr_projection_expr_names = aggr_projection_exprs
.iter()
.map(|expr| expr.display_name())
.collect::<Result<Vec<_>>>()?;
select_exprs
.iter()
.map(|expr| {
rebase_expr_by_name(expr, &aggr_projection_expr_names, &input)
})
.collect::<Result<Vec<Expr>>>()?
} else {
select_exprs
.iter()
.map(|expr| rebase_expr(expr, &aggr_projection_exprs, &input))
.collect::<Result<Vec<Expr>>>()?
};

// finally, we have some validation that the re-written projection can be resolved
// from the aggregate output columns
Expand All @@ -679,7 +697,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// aggregation.
let having_expr_post_aggr = if let Some(having_expr) = having_expr_opt {
let having_expr_post_aggr =
rebase_expr(having_expr, &aggr_projection_exprs, input)?;
rebase_expr(having_expr, &aggr_projection_exprs, &input)?;

check_columns_satisfy_exprs(
&column_exprs_post_aggr,
Expand All @@ -694,6 +712,78 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {

Ok((plan, select_exprs_post_aggr, having_expr_post_aggr))
}

fn try_process_group_by_unnest(
&self,
input: &LogicalPlan,
group_by_exprs: &[Expr],
aggr_exprs: &[Expr],
) -> Result<(LogicalPlan, Vec<Expr>, bool)> {
let mut aggr_expr_using_columns: Option<HashSet<Expr>> = None;

// rewrite group_by_exprs
let mut intermediate_plan = input.clone();
let mut intermediate_select_exprs = group_by_exprs.to_vec();
let mut has_unnest = false;

loop {
let mut unnest_columns = vec![];
let mut inner_projection_exprs = vec![];

let outer_projection_exprs: Vec<Expr> = intermediate_select_exprs
.iter()
.map(|expr| {
transform_bottom_unnest(
&intermediate_plan,
&mut unnest_columns,
&mut inner_projection_exprs,
expr,
)
})
.collect::<Result<Vec<_>>>()?
.into_iter()
.flatten()
.collect();

if unnest_columns.is_empty() {
break;
} else {
let columns = unnest_columns.into_iter().map(|col| col.into()).collect();
let unnest_options = UnnestOptions::new().with_preserve_nulls(false);

let mut projection_exprs = match &aggr_expr_using_columns {
Some(exprs) => (*exprs).clone(),
None => {
let mut columns = HashSet::new();
for expr in aggr_exprs {
expr.apply(|expr| {
if let Expr::Column(c) = expr {
columns.insert(Expr::Column(c.clone()));
}
Ok(TreeNodeRecursion::Continue)
})
// As the closure always returns Ok, this "can't" error
.expect("Unexpected error");
}
aggr_expr_using_columns = Some(columns.clone());
columns
}
};
projection_exprs.extend(inner_projection_exprs);

let plan = LogicalPlanBuilder::from(intermediate_plan.clone())
.project(projection_exprs)?
.unnest_columns_with_options(columns, unnest_options)?
.build()?;

intermediate_plan = plan;
intermediate_select_exprs = outer_projection_exprs;
has_unnest = true;
}
}

Ok((intermediate_plan, intermediate_select_exprs, has_unnest))
}
}

// If there are any multiple-defined windows, we raise an error.
Expand Down
16 changes: 16 additions & 0 deletions datafusion/sql/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,22 @@ pub(crate) fn rebase_expr(
.data()
}

pub(crate) fn rebase_expr_by_name(
expr: &Expr,
base_expr_names: &[String],
plan: &LogicalPlan,
) -> Result<Expr> {
expr.clone()
.transform_down(|nested_expr| {
if base_expr_names.contains(&nested_expr.display_name()?) {
Ok(Transformed::yes(expr_as_column_expr(&nested_expr, plan)?))
} else {
Ok(Transformed::no(nested_expr))
}
})
.data()
}

/// Determines if the set of `Expr`'s are a valid projection on the input
/// `Expr::Column`'s.
pub(crate) fn check_columns_satisfy_exprs(
Expand Down
132 changes: 128 additions & 4 deletions datafusion/sqllogictest/test_files/unnest.slt
Original file line number Diff line number Diff line change
Expand Up @@ -496,12 +496,10 @@ select unnest(column1) from (select * from (values([1,2,3]), ([4,5,6])) limit 1
5
6

## FIXME: https://github.com/apache/datafusion/issues/11198
## FIXME: https://github.com/apache/datafusion/issues/11198
query error DataFusion error: Error during planning: Projections require unique expression names but the expression "UNNEST\(Column\(Column \{ relation: Some\(Bare \{ table: "unnest_table" \}\), name: "column1" \}\)\)" at position 0 and "UNNEST\(Column\(Column \{ relation: Some\(Bare \{ table: "unnest_table" \}\), name: "column1" \}\)\)" at position 1 have the same name. Consider aliasing \("AS"\) one of them.
select unnest(column1), unnest(column1) from unnest_table;

statement ok
drop table unnest_table;

## unnest list followed by unnest struct
query ???
Expand Down Expand Up @@ -556,4 +554,130 @@ physical_plan
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
06)----------UnnestExec
07)------------ProjectionExec: expr=[column3@0 as unnest(recursive_unnest_table.column3), column3@0 as column3]
08)--------------MemoryExec: partitions=1, partition_sizes=[1]
08)--------------MemoryExec: partitions=1, partition_sizes=[1]


## group by unnest

### without agg exprs
query I
select unnest(column1) c1 from unnest_table group by c1 order by c1;
----
1
2
3
4
5
6
12

query II
select unnest(column1) c1, unnest(column2) c2 from unnest_table group by c1, c2 order by c1, c2;
----
1 7
2 NULL
3 NULL
4 8
5 9
6 11
12 NULL
NULL 10
NULL 12
NULL 42
NULL NULL

query III
select unnest(column1) c1, unnest(column2) c2, column3 c3 from unnest_table group by c1, c2, c3 order by c1, c2, c3;
----
1 7 1
2 NULL 1
3 NULL 1
4 8 2
5 9 2
6 11 3
12 NULL NULL
NULL 10 2
NULL 12 3
NULL 42 NULL
NULL NULL NULL

### with agg exprs

query IIII
select unnest(column1) c1, unnest(column2) c2, column3 c3, count(1) from unnest_table group by c1, c2, c3 order by c1, c2, c3;
----
1 7 1 1
2 NULL 1 1
3 NULL 1 1
4 8 2 1
5 9 2 1
6 11 3 1
12 NULL NULL 1
NULL 10 2 1
NULL 12 3 1
NULL 42 NULL 1
NULL NULL NULL 1

query IIII
select unnest(column1) c1, unnest(column2) c2, column3 c3, count(column4) from unnest_table group by c1, c2, c3 order by c1, c2, c3;
----
1 7 1 1
2 NULL 1 1
3 NULL 1 1
4 8 2 1
5 9 2 1
6 11 3 0
12 NULL NULL 0
NULL 10 2 1
NULL 12 3 0
NULL 42 NULL 0
NULL NULL NULL 0

query IIIII
select unnest(column1) c1, unnest(column2) c2, column3 c3, count(column4), sum(column3) from unnest_table group by c1, c2, c3 order by c1, c2, c3;
----
1 7 1 1 1
2 NULL 1 1 1
3 NULL 1 1 1
4 8 2 1 2
5 9 2 1 2
6 11 3 0 3
12 NULL NULL 0 NULL
NULL 10 2 1 2
NULL 12 3 0 3
NULL 42 NULL 0 NULL
NULL NULL NULL 0 NULL

### group by recursive unnest list

query ?
select unnest(unnest(column2)) c2 from recursive_unnest_table group by c2 order by c2;
----
[1]
[1, 1]
[2]
[3, 4]
[5]
[7, 8]
[, 6]
NULL

query ?I
select unnest(unnest(column2)) c2, count(column3) from recursive_unnest_table group by c2 order by c2;
----
[1] 1
[1, 1] 1
[2] 1
[3, 4] 1
[5] 1
[7, 8] 1
[, 6] 1
NULL 1

### TODO: group by unnest struct

query error
select unnest(column1) c1 from nested_unnest_table group by c1.c0;
----
DataFusion error: Internal error: unnest on struct can ony be applied at the root level of select expression.
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker

0 comments on commit f45f1b0

Please sign in to comment.