diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java index 3c676edd18e..d0fd20bb8c1 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java @@ -88,6 +88,11 @@ public static class JoinHintOptions { */ public static final String IS_COLOCATED_BY_JOIN_KEYS = "is_colocated_by_join_keys"; + /** + * Indicates that the semi join right project should be appended with a distinct + */ + public static final String APPEND_DISTINCT_TO_SEMI_JOIN_PROJECT = "append_distinct_to_semi_join_project"; + // TODO: Consider adding a Join implementation with join strategy. public static boolean useLookupJoinStrategy(Join join) { return LOOKUP_JOIN_STRATEGY.equalsIgnoreCase( diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java index 80e524e11f0..e6850f26f9a 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java @@ -73,6 +73,7 @@ private PinotQueryRuleSets() { // join and semi-join rules CoreRules.PROJECT_TO_SEMI_JOIN, + PinotSeminJoinDistinctProjectRule.INSTANCE, // convert non-all union into all-union + distinct CoreRules.UNION_TO_DISTINCT, diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotSeminJoinDistinctProjectRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotSeminJoinDistinctProjectRule.java new file mode 100644 index 00000000000..bdc45a4a9cb --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotSeminJoinDistinctProjectRule.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.calcite.rel.rules; + +import java.util.List; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.AbstractRelNode; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.logical.LogicalJoin; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.tools.RelBuilderFactory; +import org.apache.pinot.calcite.rel.hint.PinotHintOptions; +import org.apache.pinot.calcite.rel.hint.PinotHintStrategyTable; + + +/** + * Special rule for Pinot, this rule always append a distinct to the + * {@link org.apache.calcite.rel.logical.LogicalProject} on top of a Semi join + * {@link org.apache.calcite.rel.core.Join} to ensure the correctness of the query. + */ +public class PinotSeminJoinDistinctProjectRule extends RelOptRule { + public static final PinotSeminJoinDistinctProjectRule INSTANCE = + new PinotSeminJoinDistinctProjectRule(PinotRuleUtils.PINOT_REL_FACTORY); + + public PinotSeminJoinDistinctProjectRule(RelBuilderFactory factory) { + super(operand(LogicalJoin.class, operand(AbstractRelNode.class, any()), operand(LogicalProject.class, any())), + factory, null); + } + + @Override + public boolean matches(RelOptRuleCall call) { + LogicalJoin join = call.rel(0); + if (join.getJoinType() != JoinRelType.SEMI) { + return false; + } + // Do not apply this rule if join strategy is explicitly set to something other than dynamic broadcast + String hintOption = PinotHintStrategyTable.getHintOption(join.getHints(), PinotHintOptions.JOIN_HINT_OPTIONS, + PinotHintOptions.JoinHintOptions.APPEND_DISTINCT_TO_SEMI_JOIN_PROJECT); + if (!Boolean.parseBoolean(hintOption)) { + return false; + } + return ((LogicalProject) call.rel(2)).getProjects().size() == 1; + } + + @Override + public void onMatch(RelOptRuleCall call) { + LogicalJoin join = call.rel(0); + RelNode newRightProject = insertDistinctToProject(call, call.rel(2)); + call.transformTo(join.copy(join.getTraitSet(), List.of(call.rel(1), newRightProject))); + } + + private RelNode insertDistinctToProject(RelOptRuleCall call, LogicalProject project) { + RelBuilder relBuilder = call.builder(); + relBuilder.push(project); + relBuilder.distinct(); + return relBuilder.build(); + } +} diff --git a/pinot-query-planner/src/test/resources/queries/JoinPlans.json b/pinot-query-planner/src/test/resources/queries/JoinPlans.json index d48795dc30c..f275eca72f4 100644 --- a/pinot-query-planner/src/test/resources/queries/JoinPlans.json +++ b/pinot-query-planner/src/test/resources/queries/JoinPlans.json @@ -251,6 +251,23 @@ "\n" ] }, + { + "description": "Semi join with IN clause and append distinct to semi join project side", + "sql": "EXPLAIN PLAN FOR SELECT /*+ joinOptions(append_distinct_to_semi_join_project = 'true') */ col1, col2 FROM a WHERE col3 IN (SELECT col3 FROM b)", + "output": [ + "Execution Plan", + "\nLogicalProject(col1=[$0], col2=[$1])", + "\n LogicalJoin(condition=[=($2, $3)], joinType=[semi])", + "\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])", + "\n LogicalTableScan(table=[[default, a]])", + "\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{2}], aggType=[LEAF])", + "\n LogicalTableScan(table=[[default, b]])", + "\n" + ] + }, { "description": "Semi join with IN clause on distinct values", "sql": "EXPLAIN PLAN FOR SELECT col1, col2 FROM a WHERE col3 IN (SELECT DISTINCT col3 FROM b)",