diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index 14598f243785c..291d045b2ecda 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala @@ -1390,7 +1390,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { } } - test("SPARK-44647: test join key is subset of cluster key " + + test("SPARK-44647: SPJ: test join key is subset of cluster key " + "with push values and partially-clustered") { val table1 = "tab1e1" val table2 = "table2" @@ -1487,7 +1487,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { } } - test("SPARK-47094: Support compatible buckets") { + test("SPARK-47094: SPJ: Support compatible buckets") { val table1 = "tab1e1" val table2 = "table2" @@ -1580,11 +1580,11 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { val shuffles = collectShuffles(df.queryExecution.executedPlan) assert(shuffles.isEmpty, "SPJ should be triggered") - val scans = collectScans(df.queryExecution.executedPlan).map(_.inputRDD. + val partions = collectScans(df.queryExecution.executedPlan).map(_.inputRDD. partitions.length) val expectedBuckets = Math.min(table1buckets1, table2buckets1) * Math.min(table1buckets2, table2buckets2) - assert(scans == Seq(expectedBuckets, expectedBuckets)) + assert(partions == Seq(expectedBuckets, expectedBuckets)) checkAnswer(df, Seq( Row(0, 0, "aa", "aa"), @@ -1647,7 +1647,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { } } - test("SPARK-47094: Support compatible buckets with common divisor") { + test("SPARK-47094: SPJ:Support compatible buckets with common divisor") { val table1 = "tab1e1" val table2 = "table2" @@ -1744,9 +1744,9 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { partitions.length) def gcd(a: Int, b: Int): Int = BigInt(a).gcd(BigInt(b)).toInt - val expectedBuckets = gcd(table1buckets1, table2buckets1) * + val expectedPartitions = gcd(table1buckets1, table2buckets1) * gcd(table1buckets2, table2buckets2) - assert(scans == Seq(expectedBuckets, expectedBuckets)) + assert(scans == Seq(expectedPartitions, expectedPartitions)) checkAnswer(df, Seq( Row(0, 0, "aa", "aa"), @@ -1809,6 +1809,56 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { } } + + test("SPARK-47094: SPJ: Does not trigger when incompatible number of buckets on both side") { + val table1 = "tab1e1" + val table2 = "table2" + + Seq( + (2, 3), + (3, 4) + ).foreach { + case (table1buckets1, table2buckets1) => + catalog.clearTables() + + val partition1 = Array(bucket(table1buckets1, "store_id")) + val partition2 = Array(bucket(table2buckets1, "store_id")) + + Seq((table1, partition1), (table2, partition2)).foreach { case (tab, part) => + createTable(tab, columns2, part) + val insertStr = s"INSERT INTO testcat.ns.$tab VALUES " + + "(0, 0, 'aa'), " + + "(1, 0, 'ab'), " + // duplicate partition key + "(2, 2, 'ac'), " + + "(3, 3, 'ad'), " + + "(4, 2, 'bc') " + + sql(insertStr) + } + + Seq(true, false).foreach { allowJoinKeysSubsetOfPartitionKeys => + withSQLConf( + SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false", + SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true", + SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> "false", + SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> + allowJoinKeysSubsetOfPartitionKeys.toString, + SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true") { + val df = sql( + s""" + |${selectWithMergeJoinHint("t1", "t2")} + |t1.store_id, t1.dept_id, t1.data, t2.data + |FROM testcat.ns.$table1 t1 JOIN testcat.ns.$table2 t2 + |ON t1.store_id = t2.store_id AND t1.dept_id = t2.dept_id + |""".stripMargin) + + val shuffles = collectShuffles(df.queryExecution.executedPlan) + assert(shuffles.nonEmpty, "SPJ should not be triggered") + } + } + } + } + test("SPARK-47094: Support compatible buckets with less join keys than partition keys") { val table1 = "tab1e1" val table2 = "table2" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala index 5364fc5d62423..b82cc2392e1fc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala @@ -101,8 +101,8 @@ object BucketFunction extends ScalarFunction[Int] with ReducibleFunction[Int, In if (otherFunc == BucketFunction) { val gcd = this.gcd(thisNumBuckets, otherNumBuckets) - if (gcd != thisNumBuckets) { - return BucketReducer(thisNumBuckets, gcd) + if (gcd > 1 && gcd != thisNumBuckets) { + return BucketReducer(gcd) } } null @@ -111,7 +111,7 @@ object BucketFunction extends ScalarFunction[Int] with ReducibleFunction[Int, In private def gcd(a: Int, b: Int): Int = BigInt(a).gcd(BigInt(b)).toInt } -case class BucketReducer(thisNumBuckets: Int, divisor: Int) extends Reducer[Int, Int] { +case class BucketReducer(divisor: Int) extends Reducer[Int, Int] { override def reduce(bucket: Int): Int = bucket % divisor }