From ff89539efa72dc8fc4bdb2470d42261b4cf30265 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Thu, 2 Jan 2025 17:47:45 +0800 Subject: [PATCH] [VL] Various fixes for gluten-it (#8396) --- tools/gluten-it/common/pom.xml | 5 +++- .../apache/gluten/integration/BaseMixin.java | 9 ++++--- .../org/apache/gluten/integration/Suite.scala | 26 ++++++++++++------- .../deploy/history/HistoryServerHelper.scala | 3 ++- tools/gluten-it/pom.xml | 24 ++++++++++------- 5 files changed, 42 insertions(+), 25 deletions(-) diff --git a/tools/gluten-it/common/pom.xml b/tools/gluten-it/common/pom.xml index 098b04922b42..e6ec5ffe1992 100644 --- a/tools/gluten-it/common/pom.xml +++ b/tools/gluten-it/common/pom.xml @@ -68,7 +68,10 @@ provided test-jar - + + org.slf4j + slf4j-simple + io.trino.tpcds tpcds diff --git a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/BaseMixin.java b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/BaseMixin.java index b369fffd740c..4ebf6106fd5f 100644 --- a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/BaseMixin.java +++ b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/BaseMixin.java @@ -21,8 +21,8 @@ import org.apache.gluten.integration.command.SparkRunModes; import org.apache.gluten.integration.ds.TpcdsSuite; import org.apache.gluten.integration.h.TpchSuite; -import org.apache.log4j.Level; import org.apache.log4j.LogManager; +import org.apache.log4j.Level; import org.apache.spark.SparkConf; import picocli.CommandLine; import scala.Predef; @@ -104,8 +104,6 @@ private SparkConf pickSparkConf(String preset) { } public Integer runActions(Action[] actions) { - final SparkConf baselineConf = pickSparkConf(baselinePreset); - final SparkConf testConf = pickSparkConf(preset); final Level level; switch (logLevel) { case 0: @@ -120,9 +118,12 @@ public Integer runActions(Action[] actions) { default: throw new IllegalArgumentException("Log level not found: " + logLevel); } - + System.setProperty(org.slf4j.impl.SimpleLogger.DEFAULT_LOG_LEVEL_KEY, level.toString()); LogManager.getRootLogger().setLevel(level); + final SparkConf baselineConf = pickSparkConf(baselinePreset); + final SparkConf testConf = pickSparkConf(preset); + scala.collection.immutable.Map extraSparkConfScala = JavaConverters.mapAsScalaMapConverter( mergeMapSafe(extraSparkConf, runModeEnumeration.extraSparkConf())).asScala().toMap( diff --git a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala index 64fc179ce4ad..86847ed09a5a 100644 --- a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala +++ b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala @@ -17,13 +17,15 @@ package org.apache.gluten.integration import org.apache.gluten.integration.action.Action -import org.apache.log4j.{Level, LogManager} + import org.apache.spark.SparkConf import org.apache.spark.deploy.history.HistoryServerHelper import org.apache.spark.network.util.ByteUnit import org.apache.spark.sql.ConfUtils.ConfImplicits._ import org.apache.spark.sql.SparkSessionSwitcher +import org.apache.log4j.{Level, LogManager} + import java.io.File import java.util.Scanner @@ -46,6 +48,8 @@ abstract class Suite( resetLogLevel() + private var hsUiBoundPort: Int = -1 + private[integration] val sessionSwitcher: SparkSessionSwitcher = new SparkSessionSwitcher(masterUrl, logLevel.toString) @@ -113,18 +117,18 @@ abstract class Suite( .setWarningOnOverriding("spark.sql.files.openCostInBytes", "0") sessionSwitcher .defaultConf() - .setWarningOnOverriding("spark.sql.files.minPartitionNum", s"${(scanPartitions - 1) max 1}") + .setWarningOnOverriding("spark.sql.files.minPartitionNum", s"${(scanPartitions - 1).max(1)}") } - extraSparkConf.toStream.foreach { kv => - sessionSwitcher.defaultConf().setWarningOnOverriding(kv._1, kv._2) + extraSparkConf.toStream.foreach { + kv => sessionSwitcher.defaultConf().setWarningOnOverriding(kv._1, kv._2) } // register sessions sessionSwitcher.registerSession("test", testConf) sessionSwitcher.registerSession("baseline", baselineConf) - def startHistoryServer(): Unit = { + private def startHistoryServer(): Int = { val hsConf = new SparkConf(false) hsConf.setWarningOnOverriding("spark.history.ui.port", s"$hsUiPort") hsConf.setWarningOnOverriding("spark.history.fs.logDirectory", historyWritePath()) @@ -133,13 +137,14 @@ abstract class Suite( // boot up history server if (enableHsUi) { - startHistoryServer() + hsUiBoundPort = startHistoryServer() } def run(): Boolean = { - val succeed = actions.forall { action => - resetLogLevel() // to prevent log level from being set by unknown external codes - action.execute(this) + val succeed = actions.forall { + action => + resetLogLevel() // to prevent log level from being set by unknown external codes + action.execute(this) } succeed } @@ -148,7 +153,7 @@ abstract class Suite( sessionSwitcher.close() // wait for input, if history server was started if (enableHsUi) { - printf("History server was running at port %d. Press enter to exit... \n", hsUiPort) + printf("History server was running at port %d. Press enter to exit... \n", hsUiBoundPort) print("> ") new Scanner(System.in).nextLine } @@ -157,6 +162,7 @@ abstract class Suite( def tableCreator(): TableCreator private def resetLogLevel(): Unit = { + System.setProperty(org.slf4j.impl.SimpleLogger.DEFAULT_LOG_LEVEL_KEY, logLevel.toString) LogManager.getRootLogger.setLevel(logLevel) } diff --git a/tools/gluten-it/common/src/main/scala/org/apache/spark/deploy/history/HistoryServerHelper.scala b/tools/gluten-it/common/src/main/scala/org/apache/spark/deploy/history/HistoryServerHelper.scala index 649ef130ddad..7a9b1cc3af26 100644 --- a/tools/gluten-it/common/src/main/scala/org/apache/spark/deploy/history/HistoryServerHelper.scala +++ b/tools/gluten-it/common/src/main/scala/org/apache/spark/deploy/history/HistoryServerHelper.scala @@ -27,7 +27,7 @@ import org.apache.spark.util.ShutdownHookManager import java.net.ServerSocket object HistoryServerHelper { - def startHistoryServer(conf: SparkConf): Unit = { + def startHistoryServer(conf: SparkConf): Int = { initSecurity() val securityManager = createSecurityManager(conf) val providerName = conf @@ -42,6 +42,7 @@ object HistoryServerHelper { server.bind() provider.start() ShutdownHookManager.addShutdownHook(() => server.stop()) + server.boundPort } private def findFreePort(): Int = { diff --git a/tools/gluten-it/pom.xml b/tools/gluten-it/pom.xml index eb2c3d87e761..8120bc5ea1eb 100644 --- a/tools/gluten-it/pom.xml +++ b/tools/gluten-it/pom.xml @@ -41,31 +41,36 @@ gluten-substrait - runtime org.apache.spark spark-core_${scala.binary.version} ${spark.version} - provided + + + org.apache.avro + avro-mapred + + + org.apache.avro + avro + + org.apache.spark spark-repl_${scala.binary.version} ${spark.version} - provided org.apache.spark spark-catalyst_${scala.binary.version} ${spark.version} - provided org.apache.spark spark-hive_${scala.binary.version} ${spark.version} - provided jline @@ -77,14 +82,12 @@ org.apache.spark spark-core_${scala.binary.version} ${spark.version} - provided test-jar org.apache.spark spark-catalyst_${scala.binary.version} ${spark.version} - provided test-jar @@ -97,14 +100,17 @@ protobuf-java - provided org.apache.spark spark-sql_${scala.binary.version} ${spark.version} test-jar - provided + + + org.slf4j + slf4j-simple + 1.7.30