Skip to content

Commit

Permalink
Remove PinotAggregateToSemiJoinRule which can mistakenly remove DISTI…
Browse files Browse the repository at this point in the history
…NCT from IN clause (#14719)
  • Loading branch information
Jackie-Jiang authored Dec 26, 2024
1 parent 2e6e3d6 commit c50ed0d
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 135 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ private PinotQueryRuleSets() {

// join and semi-join rules
CoreRules.PROJECT_TO_SEMI_JOIN,
PinotAggregateToSemiJoinRule.INSTANCE,

// convert non-all union into all-union + distinct
CoreRules.UNION_TO_DISTINCT,
Expand Down
72 changes: 70 additions & 2 deletions pinot-query-planner/src/test/resources/queries/JoinPlans.json
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
},
{
"description": "Inner join with group by",
"sql": "EXPLAIN PLAN FOR SELECT a.col1, AVG(b.col3) FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0 GROUP BY a.col1",
"sql": "EXPLAIN PLAN FOR SELECT a.col1, AVG(b.col3) FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0 GROUP BY a.col1",
"output": [
"Execution Plan",
"\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($1):DOUBLE NOT NULL, $2)])",
Expand Down Expand Up @@ -222,6 +222,21 @@
},
{
"description": "Semi join with IN clause",
"sql": "EXPLAIN PLAN FOR SELECT col1, col2 FROM a WHERE col3 IN (SELECT col3 FROM b)",
"output": [
"Execution Plan",
"\nLogicalProject(col1=[$0], col2=[$1])",
"\n LogicalJoin(condition=[=($2, $3)], joinType=[semi])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[default, a]])",
"\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
"\n LogicalProject(col3=[$2])",
"\n LogicalTableScan(table=[[default, b]])",
"\n"
]
},
{
"description": "Semi join with IN clause and join strategy override",
"sql": "EXPLAIN PLAN FOR SELECT /*+ joinOptions(join_strategy = 'hash') */ col1, col2 FROM a WHERE col3 IN (SELECT col3 FROM b)",
"output": [
"Execution Plan",
Expand All @@ -237,7 +252,60 @@
]
},
{
"description": "Semi join with multiple IN clause",
"description": "Semi join with IN clause on distinct values",
"sql": "EXPLAIN PLAN FOR SELECT col1, col2 FROM a WHERE col3 IN (SELECT DISTINCT col3 FROM b)",
"output": [
"Execution Plan",
"\nLogicalProject(col1=[$0], col2=[$1])",
"\n LogicalJoin(condition=[=($2, $3)], joinType=[semi])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[default, a]])",
"\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
"\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n PinotLogicalAggregate(group=[{2}], aggType=[LEAF])",
"\n LogicalTableScan(table=[[default, b]])",
"\n"
]
},
{
"description": "Semi join with IN clause then aggregate with group by",
"sql": "EXPLAIN PLAN FOR SELECT col1, SUM(col6) FROM a WHERE col3 IN (SELECT col3 FROM b) GROUP BY col1",
"output": [
"Execution Plan",
"\nPinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], aggType=[FINAL])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($2)], aggType=[LEAF])",
"\n LogicalJoin(condition=[=($1, $3)], joinType=[semi])",
"\n LogicalProject(col1=[$0], col3=[$2], col6=[$5])",
"\n LogicalTableScan(table=[[default, a]])",
"\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
"\n LogicalProject(col3=[$2])",
"\n LogicalTableScan(table=[[default, b]])",
"\n"
]
},
{
"description": "Semi join with IN clause of distinct values then aggregate with group by",
"sql": "EXPLAIN PLAN FOR SELECT col1, SUM(col6) FROM a WHERE col3 IN (SELECT DISTINCT col3 FROM b) GROUP BY col1",
"output": [
"Execution Plan",
"\nPinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], aggType=[FINAL])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($2)], aggType=[LEAF])",
"\n LogicalJoin(condition=[=($1, $3)], joinType=[semi])",
"\n LogicalProject(col1=[$0], col3=[$2], col6=[$5])",
"\n LogicalTableScan(table=[[default, a]])",
"\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
"\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n PinotLogicalAggregate(group=[{2}], aggType=[LEAF])",
"\n LogicalTableScan(table=[[default, b]])",
"\n"
]
},
{
"description": "Semi join with multiple IN clause and join strategy override",
"sql": "EXPLAIN PLAN FOR SELECT /*+ joinOptions(join_strategy = 'hash') */ col1, col2 FROM a WHERE col2 = 'test' AND col3 IN (SELECT col3 FROM b WHERE col1='foo') AND col3 IN (SELECT col3 FROM b WHERE col1='bar') AND col3 IN (SELECT col3 FROM b WHERE col1='foobar')",
"output": [
"Execution Plan",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,58 @@
"\n"
]
},
{
"description": "agg + semi-join on colocated tables then group by on partition column with join and agg hint",
"sql": "EXPLAIN PLAN FOR SELECT /*+ joinOptions(is_colocated_by_join_keys='true'), aggOptions(is_partitioned_by_group_by_keys='true') */ a.col2, SUM(a.col3) FROM a /*+ tableOptions(partition_function='hashcode', partition_key='col2', partition_size='4') */ WHERE a.col2 IN (SELECT col1 FROM b /*+ tableOptions(partition_function='hashcode', partition_key='col1', partition_size='4') */ WHERE b.col3 > 0) GROUP BY 1",
"output": [
"Execution Plan",
"\nPinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], aggType=[DIRECT])",
"\n LogicalJoin(condition=[=($0, $2)], joinType=[semi])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[default, a]])",
"\n PinotLogicalExchange(distribution=[hash[0]], relExchangeType=[PIPELINE_BREAKER])",
"\n LogicalProject(col1=[$0])",
"\n LogicalFilter(condition=[>($2, 0)])",
"\n LogicalTableScan(table=[[default, b]])",
"\n"
]
},
{
"description": "agg + semi-join with distinct values on colocated tables then group by on partition column",
"sql": "EXPLAIN PLAN FOR SELECT a.col2, SUM(a.col3) FROM a /*+ tableOptions(partition_function='hashcode', partition_key='col2', partition_size='4') */ WHERE a.col2 IN (SELECT DISTINCT col1 FROM b /*+ tableOptions(partition_function='hashcode', partition_key='col1', partition_size='4') */ WHERE b.col3 > 0) GROUP BY 1",
"output": [
"Execution Plan",
"\nPinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], aggType=[FINAL])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], aggType=[LEAF])",
"\n LogicalJoin(condition=[=($0, $2)], joinType=[semi])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[default, a]])",
"\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
"\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n PinotLogicalAggregate(group=[{0}], aggType=[LEAF])",
"\n LogicalFilter(condition=[>($2, 0)])",
"\n LogicalTableScan(table=[[default, b]])",
"\n"
]
},
{
"description": "agg + semi-join with distinct values on colocated tables then group by on partition column with join and agg hint",
"sql": "EXPLAIN PLAN FOR SELECT /*+ joinOptions(is_colocated_by_join_keys='true'), aggOptions(is_partitioned_by_group_by_keys='true') */ a.col2, SUM(a.col3) FROM a /*+ tableOptions(partition_function='hashcode', partition_key='col2', partition_size='4') */ WHERE a.col2 IN (SELECT DISTINCT col1 FROM b /*+ tableOptions(partition_function='hashcode', partition_key='col1', partition_size='4') */ WHERE b.col3 > 0) GROUP BY 1",
"output": [
"Execution Plan",
"\nPinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], aggType=[DIRECT])",
"\n LogicalJoin(condition=[=($0, $2)], joinType=[semi])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[default, a]])",
"\n PinotLogicalExchange(distribution=[hash[0]], relExchangeType=[PIPELINE_BREAKER])",
"\n PinotLogicalAggregate(group=[{0}], aggType=[DIRECT])",
"\n LogicalFilter(condition=[>($2, 0)])",
"\n LogicalTableScan(table=[[default, b]])",
"\n"
]
},
{
"description": "agg + semi-join on pre-partitioned main tables then group by on partition column",
"sql": "EXPLAIN PLAN FOR SELECT a.col2, SUM(a.col3) FROM a /*+ tableOptions(partition_function='hashcode', partition_key='col2', partition_size='4') */ WHERE a.col2 IN (SELECT col1 FROM b WHERE b.col3 > 0) GROUP BY 1",
Expand Down

0 comments on commit c50ed0d

Please sign in to comment.