Skip to content

Commit

Permalink
[SPARK-36892][CORE] Disable batch fetch for a shuffle when push based…
Browse files Browse the repository at this point in the history
… shuffle is enabled

We found an issue where user configured both AQE and push based shuffle, but the job started to hang after running some  stages. We took the thread dump from the Executors, which showed the task is still waiting to fetch shuffle blocks.
Proposed changes in the PR to fix the issue.

### What changes were proposed in this pull request?
Disabled Batch fetch when push based shuffle is enabled.

### Why are the changes needed?
Without this patch, enabling AQE and Push based shuffle will have a chance to hang the tasks.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Tested the PR within our PR, with Spark shell and the queries are:

sql("""SELECT CASE WHEN rand() < 0.8 THEN 100 ELSE CAST(rand() * 30000000 AS INT) END AS s_item_id, CAST(rand() * 100 AS INT) AS s_quantity, DATE_ADD(current_date(), - CAST(rand() * 360 AS INT)) AS s_date FROM RANGE(1000000000)""").createOrReplaceTempView("sales")
// Dynamically coalesce partitions
sql("""SELECT s_date, sum(s_quantity) AS q FROM sales GROUP BY s_date ORDER BY q DESC""").collect

Unit tests to be added.

Closes apache#34156 from zhouyejoe/SPARK-36892.

Authored-by: Ye Zhou <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
(cherry picked from commit 31b6f61)
Signed-off-by: Gengliang Wang <[email protected]>
  • Loading branch information
zhouyejoe authored and gengliangwang committed Oct 6, 2021
1 parent 6888089 commit 88f4809
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 17 deletions.
62 changes: 50 additions & 12 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,8 @@ private[spark] case class GetMapOutputMessage(shuffleId: Int,
context: RpcCallContext) extends MapOutputTrackerMasterMessage
private[spark] case class GetMapAndMergeOutputMessage(shuffleId: Int,
context: RpcCallContext) extends MapOutputTrackerMasterMessage
private[spark] case class MapSizesByExecutorId(
iter: Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])], enableBatchFetch: Boolean)

/** RpcEndpoint class for MapOutputTrackerMaster */
private[spark] class MapOutputTrackerMasterEndpoint(
Expand Down Expand Up @@ -512,12 +514,19 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
getMapSizesByExecutorId(shuffleId, 0, Int.MaxValue, reduceId, reduceId + 1)
}

// For testing
def getPushBasedShuffleMapSizesByExecutorId(shuffleId: Int, reduceId: Int)
: MapSizesByExecutorId = {
getPushBasedShuffleMapSizesByExecutorId(shuffleId, 0, Int.MaxValue, reduceId, reduceId + 1)
}

/**
* Called from executors to get the server URIs and output sizes for each shuffle block that
* needs to be read from a given range of map output partitions (startPartition is included but
* endPartition is excluded from the range) within a range of mappers (startMapIndex is included
* but endMapIndex is excluded). If endMapIndex=Int.MaxValue, the actual endMapIndex will be
* changed to the length of total map outputs.
* but endMapIndex is excluded) when push based shuffle is not enabled for the specific shuffle
* dependency. If endMapIndex=Int.MaxValue, the actual endMapIndex will be changed to the length
* of total map outputs.
*
* @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
* and the second item is a sequence of (shuffle block id, shuffle block size, map index)
Expand All @@ -529,7 +538,34 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
val mapSizesByExecutorId = getPushBasedShuffleMapSizesByExecutorId(
shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
assert(mapSizesByExecutorId.enableBatchFetch == true)
mapSizesByExecutorId.iter
}

/**
* Called from executors to get the server URIs and output sizes for each shuffle block that
* needs to be read from a given range of map output partitions (startPartition is included but
* endPartition is excluded from the range) within a range of mappers (startMapIndex is included
* but endMapIndex is excluded) when push based shuffle is enabled for the specific shuffle
* dependency. If endMapIndex=Int.MaxValue, the actual endMapIndex will be changed to the length
* of total map outputs.
*
* @return A case class object which includes two attributes. The first attribute is a sequence
* of 2-item tuples, where the first item in the tuple is a BlockManagerId, and the
* second item is a sequence of (shuffle block id, shuffle block size, map index) tuples
* tuples describing the shuffle blocks that are stored at that block manager. Note that
* zero-sized blocks are excluded in the result. The second attribute is a boolean flag,
* indicating whether batch fetch can be enabled.
*/
def getPushBasedShuffleMapSizesByExecutorId(
shuffleId: Int,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int): MapSizesByExecutorId

/**
* Called from executors upon fetch failure on an entire merged shuffle reduce partition.
Expand Down Expand Up @@ -1060,12 +1096,12 @@ private[spark] class MapOutputTrackerMaster(
}

// This method is only called in local-mode.
def getMapSizesByExecutorId(
def getPushBasedShuffleMapSizesByExecutorId(
shuffleId: Int,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
endPartition: Int): MapSizesByExecutorId = {
logDebug(s"Fetching outputs for shuffle $shuffleId")
shuffleStatuses.get(shuffleId) match {
case Some(shuffleStatus) =>
Expand All @@ -1077,7 +1113,7 @@ private[spark] class MapOutputTrackerMaster(
shuffleId, startPartition, endPartition, statuses, startMapIndex, actualEndMapIndex)
}
case None =>
Iterator.empty
MapSizesByExecutorId(Iterator.empty, true)
}
}

Expand Down Expand Up @@ -1138,12 +1174,12 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
*/
private val fetchingLock = new KeyLock[Int]

override def getMapSizesByExecutorId(
override def getPushBasedShuffleMapSizesByExecutorId(
shuffleId: Int,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
endPartition: Int): MapSizesByExecutorId = {
logDebug(s"Fetching outputs for shuffle $shuffleId")
val (mapOutputStatuses, mergedOutputStatuses) = getStatuses(shuffleId, conf)
try {
Expand Down Expand Up @@ -1439,10 +1475,10 @@ private[spark] object MapOutputTracker extends Logging {
mapStatuses: Array[MapStatus],
startMapIndex : Int,
endMapIndex: Int,
mergeStatuses: Option[Array[MergeStatus]] = None):
Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
mergeStatuses: Option[Array[MergeStatus]] = None): MapSizesByExecutorId = {
assert (mapStatuses != null)
val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, Long, Int)]]
var enableBatchFetch = true
// Only use MergeStatus for reduce tasks that fetch all map outputs. Since a merged shuffle
// partition consists of blocks merged in random order, we are unable to serve map index
// subrange requests. However, when a reduce task needs to fetch blocks from a subrange of
Expand All @@ -1451,8 +1487,10 @@ private[spark] object MapOutputTracker extends Logging {
// TODO: SPARK-35036: Instead of reading map blocks in case of AQE with Push based shuffle,
// TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
// TODO: map indexes
if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
if (mergeStatuses.exists(_.exists(_ != null)) && startMapIndex == 0
&& endMapIndex == mapStatuses.length) {
enableBatchFetch = false
logDebug(s"Disable shuffle batch fetch as Push based shuffle is enabled for $shuffleId.")
// We have MergeStatus and full range of mapIds are requested so return a merged block.
val numMaps = mapStatuses.length
mergeStatuses.get.zipWithIndex.slice(startPartition, endPartition).foreach {
Expand Down Expand Up @@ -1497,7 +1535,7 @@ private[spark] object MapOutputTracker extends Logging {
}
}

splitsByAddress.mapValues(_.toSeq).iterator
MapSizesByExecutorId(splitsByAddress.mapValues(_.toSeq).iterator, enableBatchFetch)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,21 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, _, C]]
val (blocksByAddress, canEnableBatchFetch) =
if (baseShuffleHandle.dependency.shuffleMergeEnabled) {
val res = SparkEnv.get.mapOutputTracker.getPushBasedShuffleMapSizesByExecutorId(
handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
(res.iter, res.enableBatchFetch)
} else {
val address = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
(address, true)
}
new BlockStoreShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics,
shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context))
shouldBatchFetch =
canEnableBatchFetch && canUseBatchFetch(startPartition, endPartition, context))
}

/** Get a writer for a given partition. Called on executors by map tasks. */
Expand Down
120 changes: 119 additions & 1 deletion core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
test("SPARK-32921: get map sizes with merged shuffle") {
conf.set(PUSH_BASED_SHUFFLE_ENABLED, true)
conf.set(IS_TESTING, true)
conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
val hostname = "localhost"
val rpcEnv = createRpcEnv("spark", hostname, 0, new SecurityManager(conf))

Expand Down Expand Up @@ -391,7 +392,9 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
bitmap, 3000L))
slaveTracker.updateEpoch(masterTracker.getEpoch)
val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
assert(slaveTracker.getMapSizesByExecutorId(10, 0).toSeq ===
val mapSizesByExecutorId = slaveTracker.getPushBasedShuffleMapSizesByExecutorId(10, 0)
assert(mapSizesByExecutorId.enableBatchFetch === false)
assert(mapSizesByExecutorId.iter.toSeq ===
Seq((blockMgrId, ArrayBuffer((ShuffleMergedBlockId(10, 0, 0), 3000, -1),
(ShuffleBlockId(10, 2, 0), size1000, 2)))))

Expand All @@ -404,6 +407,7 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
test("SPARK-32921: get map statuses from merged shuffle") {
conf.set(PUSH_BASED_SHUFFLE_ENABLED, true)
conf.set(IS_TESTING, true)
conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
val hostname = "localhost"
val rpcEnv = createRpcEnv("spark", hostname, 0, new SecurityManager(conf))

Expand Down Expand Up @@ -436,6 +440,8 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
bitmap, 4000L))
slaveTracker.updateEpoch(masterTracker.getEpoch)
val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
val mapSizesByExecutorId = slaveTracker.getPushBasedShuffleMapSizesByExecutorId(10, 0)
assert(mapSizesByExecutorId.enableBatchFetch === false)
assert(slaveTracker.getMapSizesForMergeResult(10, 0).toSeq ===
Seq((blockMgrId, ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000, 0),
(ShuffleBlockId(10, 1, 0), size1000, 1), (ShuffleBlockId(10, 2, 0), size1000, 2),
Expand All @@ -449,6 +455,7 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
test("SPARK-32921: get map statuses for merged shuffle block chunks") {
conf.set(PUSH_BASED_SHUFFLE_ENABLED, true)
conf.set(IS_TESTING, true)
conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
val hostname = "localhost"
val rpcEnv = createRpcEnv("spark", hostname, 0, new SecurityManager(conf))

Expand Down Expand Up @@ -736,4 +743,115 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
tracker.stop()
}
}

test("SPARK-36892: Batch fetch should be enabled in some scenarios with push based shuffle") {
conf.set(PUSH_BASED_SHUFFLE_ENABLED, true)
conf.set(IS_TESTING, true)
conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")

val hostname = "localhost"
val rpcEnv = createRpcEnv("spark", hostname, 0, new SecurityManager(conf))

val masterTracker = newTrackerMaster()
masterTracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, conf))

val slaveRpcEnv = createRpcEnv("spark-slave", hostname, 0, new SecurityManager(conf))
val slaveTracker = new MapOutputTrackerWorker(conf)
slaveTracker.trackerEndpoint =
slaveRpcEnv.setupEndpointRef(rpcEnv.address, MapOutputTracker.ENDPOINT_NAME)

masterTracker.registerShuffle(10, 4, 1)
slaveTracker.updateEpoch(masterTracker.getEpoch)

val blockMgrId = BlockManagerId("a", "hostA", 1000)
masterTracker.registerMapOutput(10, 0, MapStatus(blockMgrId, Array(1000L), 0))
masterTracker.registerMapOutput(10, 1, MapStatus(blockMgrId, Array(1000L), 1))
masterTracker.registerMapOutput(10, 2, MapStatus(blockMgrId, Array(1000L), 2))
masterTracker.registerMapOutput(10, 3, MapStatus(blockMgrId, Array(1000L), 3))

slaveTracker.updateEpoch(masterTracker.getEpoch)
val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
val mapSizesByExecutorId = slaveTracker.getPushBasedShuffleMapSizesByExecutorId(10, 0)
// Batch fetch should be enabled when there are no merged shuffle files
assert(mapSizesByExecutorId.enableBatchFetch === true)
assert(mapSizesByExecutorId.iter.toSeq ===
Seq((blockMgrId, ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000, 0),
(ShuffleBlockId(10, 1, 0), size1000, 1), (ShuffleBlockId(10, 2, 0), size1000, 2),
(ShuffleBlockId(10, 3, 0), size1000, 3)))))

masterTracker.registerShuffle(11, 4, 1)
slaveTracker.updateEpoch(masterTracker.getEpoch)

val bitmap = new RoaringBitmap()
bitmap.add(0)
bitmap.add(1)
bitmap.add(3)

masterTracker.registerMergeResult(11, 0, MergeStatus(blockMgrId, 0,
bitmap, 3000L))
masterTracker.registerMapOutput(11, 0, MapStatus(blockMgrId, Array(1000L), 0))
masterTracker.registerMapOutput(11, 1, MapStatus(blockMgrId, Array(1000L), 1))
masterTracker.registerMapOutput(11, 2, MapStatus(blockMgrId, Array(1000L), 2))
masterTracker.registerMapOutput(11, 3, MapStatus(blockMgrId, Array(1000L), 3))

slaveTracker.updateEpoch(masterTracker.getEpoch)
val mapSizesByExecutorId2 = slaveTracker.getPushBasedShuffleMapSizesByExecutorId(11, 0, 2, 0, 1)
// Batch fetch should be enabled when it only fetches subsets of mapper outputs
assert(mapSizesByExecutorId2.enableBatchFetch === true)
assert(mapSizesByExecutorId2.iter.toSeq ===
Seq((blockMgrId, ArrayBuffer((ShuffleBlockId(11, 0, 0), size1000, 0),
(ShuffleBlockId(11, 1, 0), size1000, 1)))))

masterTracker.unregisterShuffle(10)
masterTracker.unregisterShuffle(11)
masterTracker.stop()
slaveTracker.stop()
rpcEnv.shutdown()
slaveRpcEnv.shutdown()
}

test("SPARK-36892: Batch fetch should be disabled in some scenarios with push based shuffle") {
conf.set(PUSH_BASED_SHUFFLE_ENABLED, true)
conf.set(IS_TESTING, true)
conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")

val hostname = "localhost"
val rpcEnv = createRpcEnv("spark", hostname, 0, new SecurityManager(conf))

val masterTracker = newTrackerMaster()
masterTracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, conf))

val slaveRpcEnv = createRpcEnv("spark-slave", hostname, 0, new SecurityManager(conf))
val slaveTracker = new MapOutputTrackerWorker(conf)
slaveTracker.trackerEndpoint =
slaveRpcEnv.setupEndpointRef(rpcEnv.address, MapOutputTracker.ENDPOINT_NAME)
masterTracker.registerShuffle(10, 4, 2)
assert(masterTracker.containsShuffle(10))

val blockMgrId = BlockManagerId("a", "hostA", 1000)
masterTracker.registerMapOutput(10, 0, MapStatus(blockMgrId, Array(1000L, 1000L), 0))
masterTracker.registerMapOutput(10, 1, MapStatus(blockMgrId, Array(1000L, 1000L), 1))
masterTracker.registerMapOutput(10, 2, MapStatus(blockMgrId, Array(1000L, 1000L), 2))
masterTracker.registerMapOutput(10, 3, MapStatus(blockMgrId, Array(1000L, 1000L), 3))
slaveTracker.updateEpoch(masterTracker.getEpoch)

val bitmap = new RoaringBitmap()
bitmap.add(0)
bitmap.add(1)
masterTracker.registerMergeResult(10, 0, MergeStatus(blockMgrId, 0,
bitmap, 2000L))
masterTracker.registerMergeResult(10, 1, MergeStatus(blockMgrId, 0,
bitmap, 2000L))
slaveTracker.updateEpoch(masterTracker.getEpoch)
// Query for all mappers output for multiple reducers, since there are merged shuffles,
// batch fetch should be disabled.
val mapSizesByExecutorId =
slaveTracker.getPushBasedShuffleMapSizesByExecutorId(10, 0, Int.MaxValue, 0, 2)
assert(mapSizesByExecutorId.enableBatchFetch === false)
masterTracker.unregisterShuffle(10)
masterTracker.stop()
rpcEnv.shutdown()
}
}
2 changes: 1 addition & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3155,7 +3155,7 @@ See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this fe

# Push-based shuffle overview

Push-based shuffle helps improve the reliability and performance of spark shuffle. It takes a best-effort approach to push the shuffle blocks generated by the map tasks to remote external shuffle services to be merged per shuffle partition. Reduce tasks fetch a combination of merged shuffle partitions and original shuffle blocks as their input data, resulting in converting small random disk reads by external shuffle services into large sequential reads. Possibility of better data locality for reduce tasks additionally helps minimize network IO.
Push-based shuffle helps improve the reliability and performance of spark shuffle. It takes a best-effort approach to push the shuffle blocks generated by the map tasks to remote external shuffle services to be merged per shuffle partition. Reduce tasks fetch a combination of merged shuffle partitions and original shuffle blocks as their input data, resulting in converting small random disk reads by external shuffle services into large sequential reads. Possibility of better data locality for reduce tasks additionally helps minimize network IO. Push-based shuffle takes priority over batch fetch for some scenarios, like partition coalesce when merged output is available.

<p> Push-based shuffle improves performance for long running jobs/queries which involves large disk I/O during shuffle. Currently it is not well suited for jobs/queries which runs quickly dealing with lesser amount of shuffle data. This will be further improved in the future releases.</p>

Expand Down

0 comments on commit 88f4809

Please sign in to comment.