Skip to content

Commit

Permalink
SPJ : fix bucket reducer function
Browse files Browse the repository at this point in the history
  • Loading branch information
himadripal committed Oct 19, 2024
1 parent 25b03f9 commit b85847c
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1390,7 +1390,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
}
}

test("SPARK-44647: test join key is subset of cluster key " +
test("SPARK-44647: SPJ: test join key is subset of cluster key " +
"with push values and partially-clustered") {
val table1 = "tab1e1"
val table2 = "table2"
Expand Down Expand Up @@ -1487,7 +1487,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
}
}

test("SPARK-47094: Support compatible buckets") {
test("SPARK-47094: SPJ: Support compatible buckets") {
val table1 = "tab1e1"
val table2 = "table2"

Expand Down Expand Up @@ -1580,11 +1580,11 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
val shuffles = collectShuffles(df.queryExecution.executedPlan)
assert(shuffles.isEmpty, "SPJ should be triggered")

val scans = collectScans(df.queryExecution.executedPlan).map(_.inputRDD.
val partions = collectScans(df.queryExecution.executedPlan).map(_.inputRDD.
partitions.length)
val expectedBuckets = Math.min(table1buckets1, table2buckets1) *
Math.min(table1buckets2, table2buckets2)
assert(scans == Seq(expectedBuckets, expectedBuckets))
assert(partions == Seq(expectedBuckets, expectedBuckets))

checkAnswer(df, Seq(
Row(0, 0, "aa", "aa"),
Expand Down Expand Up @@ -1647,7 +1647,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
}
}

test("SPARK-47094: Support compatible buckets with common divisor") {
test("SPARK-47094: SPJ:Support compatible buckets with common divisor") {
val table1 = "tab1e1"
val table2 = "table2"

Expand Down Expand Up @@ -1744,9 +1744,9 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
partitions.length)

def gcd(a: Int, b: Int): Int = BigInt(a).gcd(BigInt(b)).toInt
val expectedBuckets = gcd(table1buckets1, table2buckets1) *
val expectedPartitions = gcd(table1buckets1, table2buckets1) *
gcd(table1buckets2, table2buckets2)
assert(scans == Seq(expectedBuckets, expectedBuckets))
assert(scans == Seq(expectedPartitions, expectedPartitions))

checkAnswer(df, Seq(
Row(0, 0, "aa", "aa"),
Expand Down Expand Up @@ -1809,6 +1809,56 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
}
}


test("SPARK-47094: SPJ: Does not trigger when incompatible number of buckets on both side") {
val table1 = "tab1e1"
val table2 = "table2"

Seq(
(2, 3),
(3, 4)
).foreach {
case (table1buckets1, table2buckets1) =>
catalog.clearTables()

val partition1 = Array(bucket(table1buckets1, "store_id"))
val partition2 = Array(bucket(table2buckets1, "store_id"))

Seq((table1, partition1), (table2, partition2)).foreach { case (tab, part) =>
createTable(tab, columns2, part)
val insertStr = s"INSERT INTO testcat.ns.$tab VALUES " +
"(0, 0, 'aa'), " +
"(1, 0, 'ab'), " + // duplicate partition key
"(2, 2, 'ac'), " +
"(3, 3, 'ad'), " +
"(4, 2, 'bc') "

sql(insertStr)
}

Seq(true, false).foreach { allowJoinKeysSubsetOfPartitionKeys =>
withSQLConf(
SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> "false",
SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key ->
allowJoinKeysSubsetOfPartitionKeys.toString,
SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true") {
val df = sql(
s"""
|${selectWithMergeJoinHint("t1", "t2")}
|t1.store_id, t1.dept_id, t1.data, t2.data
|FROM testcat.ns.$table1 t1 JOIN testcat.ns.$table2 t2
|ON t1.store_id = t2.store_id AND t1.dept_id = t2.dept_id
|""".stripMargin)

val shuffles = collectShuffles(df.queryExecution.executedPlan)
assert(shuffles.nonEmpty, "SPJ should not be triggered")
}
}
}
}

test("SPARK-47094: Support compatible buckets with less join keys than partition keys") {
val table1 = "tab1e1"
val table2 = "table2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ object BucketFunction extends ScalarFunction[Int] with ReducibleFunction[Int, In

if (otherFunc == BucketFunction) {
val gcd = this.gcd(thisNumBuckets, otherNumBuckets)
if (gcd != thisNumBuckets) {
return BucketReducer(thisNumBuckets, gcd)
if (gcd > 1 && gcd != thisNumBuckets) {
return BucketReducer(gcd)
}
}
null
Expand All @@ -111,7 +111,7 @@ object BucketFunction extends ScalarFunction[Int] with ReducibleFunction[Int, In
private def gcd(a: Int, b: Int): Int = BigInt(a).gcd(BigInt(b)).toInt
}

case class BucketReducer(thisNumBuckets: Int, divisor: Int) extends Reducer[Int, Int] {
case class BucketReducer(divisor: Int) extends Reducer[Int, Int] {
override def reduce(bucket: Int): Int = bucket % divisor
}

Expand Down

0 comments on commit b85847c

Please sign in to comment.