From 88f480914226281dd19e149ff243ec91e9ccf932 Mon Sep 17 00:00:00 2001 From: Ye Zhou Date: Wed, 6 Oct 2021 15:42:25 +0800 Subject: [PATCH] [SPARK-36892][CORE] Disable batch fetch for a shuffle when push based shuffle is enabled We found an issue where user configured both AQE and push based shuffle, but the job started to hang after running some stages. We took the thread dump from the Executors, which showed the task is still waiting to fetch shuffle blocks. Proposed changes in the PR to fix the issue. ### What changes were proposed in this pull request? Disabled Batch fetch when push based shuffle is enabled. ### Why are the changes needed? Without this patch, enabling AQE and Push based shuffle will have a chance to hang the tasks. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Tested the PR within our PR, with Spark shell and the queries are: sql("""SELECT CASE WHEN rand() < 0.8 THEN 100 ELSE CAST(rand() * 30000000 AS INT) END AS s_item_id, CAST(rand() * 100 AS INT) AS s_quantity, DATE_ADD(current_date(), - CAST(rand() * 360 AS INT)) AS s_date FROM RANGE(1000000000)""").createOrReplaceTempView("sales") // Dynamically coalesce partitions sql("""SELECT s_date, sum(s_quantity) AS q FROM sales GROUP BY s_date ORDER BY q DESC""").collect Unit tests to be added. Closes #34156 from zhouyejoe/SPARK-36892. Authored-by: Ye Zhou Signed-off-by: Gengliang Wang (cherry picked from commit 31b6f614d3173c8a5852243bf7d0b6200788432d) Signed-off-by: Gengliang Wang --- .../org/apache/spark/MapOutputTracker.scala | 62 +++++++-- .../shuffle/sort/SortShuffleManager.scala | 16 ++- .../apache/spark/MapOutputTrackerSuite.scala | 120 +++++++++++++++++- docs/configuration.md | 2 +- 4 files changed, 183 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index ca1229a737d27..588f7d28155b9 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -436,6 +436,8 @@ private[spark] case class GetMapOutputMessage(shuffleId: Int, context: RpcCallContext) extends MapOutputTrackerMasterMessage private[spark] case class GetMapAndMergeOutputMessage(shuffleId: Int, context: RpcCallContext) extends MapOutputTrackerMasterMessage +private[spark] case class MapSizesByExecutorId( + iter: Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])], enableBatchFetch: Boolean) /** RpcEndpoint class for MapOutputTrackerMaster */ private[spark] class MapOutputTrackerMasterEndpoint( @@ -512,12 +514,19 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging getMapSizesByExecutorId(shuffleId, 0, Int.MaxValue, reduceId, reduceId + 1) } + // For testing + def getPushBasedShuffleMapSizesByExecutorId(shuffleId: Int, reduceId: Int) + : MapSizesByExecutorId = { + getPushBasedShuffleMapSizesByExecutorId(shuffleId, 0, Int.MaxValue, reduceId, reduceId + 1) + } + /** * Called from executors to get the server URIs and output sizes for each shuffle block that * needs to be read from a given range of map output partitions (startPartition is included but * endPartition is excluded from the range) within a range of mappers (startMapIndex is included - * but endMapIndex is excluded). If endMapIndex=Int.MaxValue, the actual endMapIndex will be - * changed to the length of total map outputs. + * but endMapIndex is excluded) when push based shuffle is not enabled for the specific shuffle + * dependency. If endMapIndex=Int.MaxValue, the actual endMapIndex will be changed to the length + * of total map outputs. * * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId, * and the second item is a sequence of (shuffle block id, shuffle block size, map index) @@ -529,7 +538,34 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging startMapIndex: Int, endMapIndex: Int, startPartition: Int, - endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] + endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { + val mapSizesByExecutorId = getPushBasedShuffleMapSizesByExecutorId( + shuffleId, startMapIndex, endMapIndex, startPartition, endPartition) + assert(mapSizesByExecutorId.enableBatchFetch == true) + mapSizesByExecutorId.iter + } + + /** + * Called from executors to get the server URIs and output sizes for each shuffle block that + * needs to be read from a given range of map output partitions (startPartition is included but + * endPartition is excluded from the range) within a range of mappers (startMapIndex is included + * but endMapIndex is excluded) when push based shuffle is enabled for the specific shuffle + * dependency. If endMapIndex=Int.MaxValue, the actual endMapIndex will be changed to the length + * of total map outputs. + * + * @return A case class object which includes two attributes. The first attribute is a sequence + * of 2-item tuples, where the first item in the tuple is a BlockManagerId, and the + * second item is a sequence of (shuffle block id, shuffle block size, map index) tuples + * tuples describing the shuffle blocks that are stored at that block manager. Note that + * zero-sized blocks are excluded in the result. The second attribute is a boolean flag, + * indicating whether batch fetch can be enabled. + */ + def getPushBasedShuffleMapSizesByExecutorId( + shuffleId: Int, + startMapIndex: Int, + endMapIndex: Int, + startPartition: Int, + endPartition: Int): MapSizesByExecutorId /** * Called from executors upon fetch failure on an entire merged shuffle reduce partition. @@ -1060,12 +1096,12 @@ private[spark] class MapOutputTrackerMaster( } // This method is only called in local-mode. - def getMapSizesByExecutorId( + def getPushBasedShuffleMapSizesByExecutorId( shuffleId: Int, startMapIndex: Int, endMapIndex: Int, startPartition: Int, - endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { + endPartition: Int): MapSizesByExecutorId = { logDebug(s"Fetching outputs for shuffle $shuffleId") shuffleStatuses.get(shuffleId) match { case Some(shuffleStatus) => @@ -1077,7 +1113,7 @@ private[spark] class MapOutputTrackerMaster( shuffleId, startPartition, endPartition, statuses, startMapIndex, actualEndMapIndex) } case None => - Iterator.empty + MapSizesByExecutorId(Iterator.empty, true) } } @@ -1138,12 +1174,12 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr */ private val fetchingLock = new KeyLock[Int] - override def getMapSizesByExecutorId( + override def getPushBasedShuffleMapSizesByExecutorId( shuffleId: Int, startMapIndex: Int, endMapIndex: Int, startPartition: Int, - endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { + endPartition: Int): MapSizesByExecutorId = { logDebug(s"Fetching outputs for shuffle $shuffleId") val (mapOutputStatuses, mergedOutputStatuses) = getStatuses(shuffleId, conf) try { @@ -1439,10 +1475,10 @@ private[spark] object MapOutputTracker extends Logging { mapStatuses: Array[MapStatus], startMapIndex : Int, endMapIndex: Int, - mergeStatuses: Option[Array[MergeStatus]] = None): - Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { + mergeStatuses: Option[Array[MergeStatus]] = None): MapSizesByExecutorId = { assert (mapStatuses != null) val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, Long, Int)]] + var enableBatchFetch = true // Only use MergeStatus for reduce tasks that fetch all map outputs. Since a merged shuffle // partition consists of blocks merged in random order, we are unable to serve map index // subrange requests. However, when a reduce task needs to fetch blocks from a subrange of @@ -1451,8 +1487,10 @@ private[spark] object MapOutputTracker extends Logging { // TODO: SPARK-35036: Instead of reading map blocks in case of AQE with Push based shuffle, // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end // TODO: map indexes - if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0 + if (mergeStatuses.exists(_.exists(_ != null)) && startMapIndex == 0 && endMapIndex == mapStatuses.length) { + enableBatchFetch = false + logDebug(s"Disable shuffle batch fetch as Push based shuffle is enabled for $shuffleId.") // We have MergeStatus and full range of mapIds are requested so return a merged block. val numMaps = mapStatuses.length mergeStatuses.get.zipWithIndex.slice(startPartition, endPartition).foreach { @@ -1497,7 +1535,7 @@ private[spark] object MapOutputTracker extends Logging { } } - splitsByAddress.mapValues(_.toSeq).iterator + MapSizesByExecutorId(splitsByAddress.mapValues(_.toSeq).iterator, enableBatchFetch) } /** diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index d3cc5ed10799e..e8c7f1f4d91c3 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -129,11 +129,21 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager endPartition: Int, context: TaskContext, metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = { - val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId( - handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition) + val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, _, C]] + val (blocksByAddress, canEnableBatchFetch) = + if (baseShuffleHandle.dependency.shuffleMergeEnabled) { + val res = SparkEnv.get.mapOutputTracker.getPushBasedShuffleMapSizesByExecutorId( + handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition) + (res.iter, res.enableBatchFetch) + } else { + val address = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId( + handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition) + (address, true) + } new BlockStoreShuffleReader( handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics, - shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context)) + shouldBatchFetch = + canEnableBatchFetch && canUseBatchFetch(startPartition, endPartition, context)) } /** Get a writer for a given partition. Called on executors by map tasks. */ diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 40511185721d9..8bebecfe14914 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -362,6 +362,7 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { test("SPARK-32921: get map sizes with merged shuffle") { conf.set(PUSH_BASED_SHUFFLE_ENABLED, true) conf.set(IS_TESTING, true) + conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer") val hostname = "localhost" val rpcEnv = createRpcEnv("spark", hostname, 0, new SecurityManager(conf)) @@ -391,7 +392,9 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { bitmap, 3000L)) slaveTracker.updateEpoch(masterTracker.getEpoch) val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L)) - assert(slaveTracker.getMapSizesByExecutorId(10, 0).toSeq === + val mapSizesByExecutorId = slaveTracker.getPushBasedShuffleMapSizesByExecutorId(10, 0) + assert(mapSizesByExecutorId.enableBatchFetch === false) + assert(mapSizesByExecutorId.iter.toSeq === Seq((blockMgrId, ArrayBuffer((ShuffleMergedBlockId(10, 0, 0), 3000, -1), (ShuffleBlockId(10, 2, 0), size1000, 2))))) @@ -404,6 +407,7 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { test("SPARK-32921: get map statuses from merged shuffle") { conf.set(PUSH_BASED_SHUFFLE_ENABLED, true) conf.set(IS_TESTING, true) + conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer") val hostname = "localhost" val rpcEnv = createRpcEnv("spark", hostname, 0, new SecurityManager(conf)) @@ -436,6 +440,8 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { bitmap, 4000L)) slaveTracker.updateEpoch(masterTracker.getEpoch) val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L)) + val mapSizesByExecutorId = slaveTracker.getPushBasedShuffleMapSizesByExecutorId(10, 0) + assert(mapSizesByExecutorId.enableBatchFetch === false) assert(slaveTracker.getMapSizesForMergeResult(10, 0).toSeq === Seq((blockMgrId, ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000, 0), (ShuffleBlockId(10, 1, 0), size1000, 1), (ShuffleBlockId(10, 2, 0), size1000, 2), @@ -449,6 +455,7 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { test("SPARK-32921: get map statuses for merged shuffle block chunks") { conf.set(PUSH_BASED_SHUFFLE_ENABLED, true) conf.set(IS_TESTING, true) + conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer") val hostname = "localhost" val rpcEnv = createRpcEnv("spark", hostname, 0, new SecurityManager(conf)) @@ -736,4 +743,115 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { tracker.stop() } } + + test("SPARK-36892: Batch fetch should be enabled in some scenarios with push based shuffle") { + conf.set(PUSH_BASED_SHUFFLE_ENABLED, true) + conf.set(IS_TESTING, true) + conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer") + + val hostname = "localhost" + val rpcEnv = createRpcEnv("spark", hostname, 0, new SecurityManager(conf)) + + val masterTracker = newTrackerMaster() + masterTracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, + new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, conf)) + + val slaveRpcEnv = createRpcEnv("spark-slave", hostname, 0, new SecurityManager(conf)) + val slaveTracker = new MapOutputTrackerWorker(conf) + slaveTracker.trackerEndpoint = + slaveRpcEnv.setupEndpointRef(rpcEnv.address, MapOutputTracker.ENDPOINT_NAME) + + masterTracker.registerShuffle(10, 4, 1) + slaveTracker.updateEpoch(masterTracker.getEpoch) + + val blockMgrId = BlockManagerId("a", "hostA", 1000) + masterTracker.registerMapOutput(10, 0, MapStatus(blockMgrId, Array(1000L), 0)) + masterTracker.registerMapOutput(10, 1, MapStatus(blockMgrId, Array(1000L), 1)) + masterTracker.registerMapOutput(10, 2, MapStatus(blockMgrId, Array(1000L), 2)) + masterTracker.registerMapOutput(10, 3, MapStatus(blockMgrId, Array(1000L), 3)) + + slaveTracker.updateEpoch(masterTracker.getEpoch) + val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L)) + val mapSizesByExecutorId = slaveTracker.getPushBasedShuffleMapSizesByExecutorId(10, 0) + // Batch fetch should be enabled when there are no merged shuffle files + assert(mapSizesByExecutorId.enableBatchFetch === true) + assert(mapSizesByExecutorId.iter.toSeq === + Seq((blockMgrId, ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000, 0), + (ShuffleBlockId(10, 1, 0), size1000, 1), (ShuffleBlockId(10, 2, 0), size1000, 2), + (ShuffleBlockId(10, 3, 0), size1000, 3))))) + + masterTracker.registerShuffle(11, 4, 1) + slaveTracker.updateEpoch(masterTracker.getEpoch) + + val bitmap = new RoaringBitmap() + bitmap.add(0) + bitmap.add(1) + bitmap.add(3) + + masterTracker.registerMergeResult(11, 0, MergeStatus(blockMgrId, 0, + bitmap, 3000L)) + masterTracker.registerMapOutput(11, 0, MapStatus(blockMgrId, Array(1000L), 0)) + masterTracker.registerMapOutput(11, 1, MapStatus(blockMgrId, Array(1000L), 1)) + masterTracker.registerMapOutput(11, 2, MapStatus(blockMgrId, Array(1000L), 2)) + masterTracker.registerMapOutput(11, 3, MapStatus(blockMgrId, Array(1000L), 3)) + + slaveTracker.updateEpoch(masterTracker.getEpoch) + val mapSizesByExecutorId2 = slaveTracker.getPushBasedShuffleMapSizesByExecutorId(11, 0, 2, 0, 1) + // Batch fetch should be enabled when it only fetches subsets of mapper outputs + assert(mapSizesByExecutorId2.enableBatchFetch === true) + assert(mapSizesByExecutorId2.iter.toSeq === + Seq((blockMgrId, ArrayBuffer((ShuffleBlockId(11, 0, 0), size1000, 0), + (ShuffleBlockId(11, 1, 0), size1000, 1))))) + + masterTracker.unregisterShuffle(10) + masterTracker.unregisterShuffle(11) + masterTracker.stop() + slaveTracker.stop() + rpcEnv.shutdown() + slaveRpcEnv.shutdown() + } + + test("SPARK-36892: Batch fetch should be disabled in some scenarios with push based shuffle") { + conf.set(PUSH_BASED_SHUFFLE_ENABLED, true) + conf.set(IS_TESTING, true) + conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer") + + val hostname = "localhost" + val rpcEnv = createRpcEnv("spark", hostname, 0, new SecurityManager(conf)) + + val masterTracker = newTrackerMaster() + masterTracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, + new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, conf)) + + val slaveRpcEnv = createRpcEnv("spark-slave", hostname, 0, new SecurityManager(conf)) + val slaveTracker = new MapOutputTrackerWorker(conf) + slaveTracker.trackerEndpoint = + slaveRpcEnv.setupEndpointRef(rpcEnv.address, MapOutputTracker.ENDPOINT_NAME) + masterTracker.registerShuffle(10, 4, 2) + assert(masterTracker.containsShuffle(10)) + + val blockMgrId = BlockManagerId("a", "hostA", 1000) + masterTracker.registerMapOutput(10, 0, MapStatus(blockMgrId, Array(1000L, 1000L), 0)) + masterTracker.registerMapOutput(10, 1, MapStatus(blockMgrId, Array(1000L, 1000L), 1)) + masterTracker.registerMapOutput(10, 2, MapStatus(blockMgrId, Array(1000L, 1000L), 2)) + masterTracker.registerMapOutput(10, 3, MapStatus(blockMgrId, Array(1000L, 1000L), 3)) + slaveTracker.updateEpoch(masterTracker.getEpoch) + + val bitmap = new RoaringBitmap() + bitmap.add(0) + bitmap.add(1) + masterTracker.registerMergeResult(10, 0, MergeStatus(blockMgrId, 0, + bitmap, 2000L)) + masterTracker.registerMergeResult(10, 1, MergeStatus(blockMgrId, 0, + bitmap, 2000L)) + slaveTracker.updateEpoch(masterTracker.getEpoch) + // Query for all mappers output for multiple reducers, since there are merged shuffles, + // batch fetch should be disabled. + val mapSizesByExecutorId = + slaveTracker.getPushBasedShuffleMapSizesByExecutorId(10, 0, Int.MaxValue, 0, 2) + assert(mapSizesByExecutorId.enableBatchFetch === false) + masterTracker.unregisterShuffle(10) + masterTracker.stop() + rpcEnv.shutdown() + } } diff --git a/docs/configuration.md b/docs/configuration.md index 48555d847ab3f..625cb2390f475 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -3155,7 +3155,7 @@ See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this fe # Push-based shuffle overview -Push-based shuffle helps improve the reliability and performance of spark shuffle. It takes a best-effort approach to push the shuffle blocks generated by the map tasks to remote external shuffle services to be merged per shuffle partition. Reduce tasks fetch a combination of merged shuffle partitions and original shuffle blocks as their input data, resulting in converting small random disk reads by external shuffle services into large sequential reads. Possibility of better data locality for reduce tasks additionally helps minimize network IO. +Push-based shuffle helps improve the reliability and performance of spark shuffle. It takes a best-effort approach to push the shuffle blocks generated by the map tasks to remote external shuffle services to be merged per shuffle partition. Reduce tasks fetch a combination of merged shuffle partitions and original shuffle blocks as their input data, resulting in converting small random disk reads by external shuffle services into large sequential reads. Possibility of better data locality for reduce tasks additionally helps minimize network IO. Push-based shuffle takes priority over batch fetch for some scenarios, like partition coalesce when merged output is available.

Push-based shuffle improves performance for long running jobs/queries which involves large disk I/O during shuffle. Currently it is not well suited for jobs/queries which runs quickly dealing with lesser amount of shuffle data. This will be further improved in the future releases.