Skip to content

Commit

Permalink
[VL] Various fixes for gluten-it (#8396)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer authored Jan 2, 2025
1 parent 7047532 commit ff89539
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 25 deletions.
5 changes: 4 additions & 1 deletion tools/gluten-it/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@
<scope>provided</scope>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</dependency>
<dependency>
<groupId>io.trino.tpcds</groupId>
<artifactId>tpcds</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.gluten.integration.command.SparkRunModes;
import org.apache.gluten.integration.ds.TpcdsSuite;
import org.apache.gluten.integration.h.TpchSuite;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Level;
import org.apache.spark.SparkConf;
import picocli.CommandLine;
import scala.Predef;
Expand Down Expand Up @@ -104,8 +104,6 @@ private SparkConf pickSparkConf(String preset) {
}

public Integer runActions(Action[] actions) {
final SparkConf baselineConf = pickSparkConf(baselinePreset);
final SparkConf testConf = pickSparkConf(preset);
final Level level;
switch (logLevel) {
case 0:
Expand All @@ -120,9 +118,12 @@ public Integer runActions(Action[] actions) {
default:
throw new IllegalArgumentException("Log level not found: " + logLevel);
}

System.setProperty(org.slf4j.impl.SimpleLogger.DEFAULT_LOG_LEVEL_KEY, level.toString());
LogManager.getRootLogger().setLevel(level);

final SparkConf baselineConf = pickSparkConf(baselinePreset);
final SparkConf testConf = pickSparkConf(preset);

scala.collection.immutable.Map<String, String> extraSparkConfScala =
JavaConverters.mapAsScalaMapConverter(
mergeMapSafe(extraSparkConf, runModeEnumeration.extraSparkConf())).asScala().toMap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
package org.apache.gluten.integration

import org.apache.gluten.integration.action.Action
import org.apache.log4j.{Level, LogManager}

import org.apache.spark.SparkConf
import org.apache.spark.deploy.history.HistoryServerHelper
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.ConfUtils.ConfImplicits._
import org.apache.spark.sql.SparkSessionSwitcher

import org.apache.log4j.{Level, LogManager}

import java.io.File
import java.util.Scanner

Expand All @@ -46,6 +48,8 @@ abstract class Suite(

resetLogLevel()

private var hsUiBoundPort: Int = -1

private[integration] val sessionSwitcher: SparkSessionSwitcher =
new SparkSessionSwitcher(masterUrl, logLevel.toString)

Expand Down Expand Up @@ -113,18 +117,18 @@ abstract class Suite(
.setWarningOnOverriding("spark.sql.files.openCostInBytes", "0")
sessionSwitcher
.defaultConf()
.setWarningOnOverriding("spark.sql.files.minPartitionNum", s"${(scanPartitions - 1) max 1}")
.setWarningOnOverriding("spark.sql.files.minPartitionNum", s"${(scanPartitions - 1).max(1)}")
}

extraSparkConf.toStream.foreach { kv =>
sessionSwitcher.defaultConf().setWarningOnOverriding(kv._1, kv._2)
extraSparkConf.toStream.foreach {
kv => sessionSwitcher.defaultConf().setWarningOnOverriding(kv._1, kv._2)
}

// register sessions
sessionSwitcher.registerSession("test", testConf)
sessionSwitcher.registerSession("baseline", baselineConf)

def startHistoryServer(): Unit = {
private def startHistoryServer(): Int = {
val hsConf = new SparkConf(false)
hsConf.setWarningOnOverriding("spark.history.ui.port", s"$hsUiPort")
hsConf.setWarningOnOverriding("spark.history.fs.logDirectory", historyWritePath())
Expand All @@ -133,13 +137,14 @@ abstract class Suite(

// boot up history server
if (enableHsUi) {
startHistoryServer()
hsUiBoundPort = startHistoryServer()
}

def run(): Boolean = {
val succeed = actions.forall { action =>
resetLogLevel() // to prevent log level from being set by unknown external codes
action.execute(this)
val succeed = actions.forall {
action =>
resetLogLevel() // to prevent log level from being set by unknown external codes
action.execute(this)
}
succeed
}
Expand All @@ -148,7 +153,7 @@ abstract class Suite(
sessionSwitcher.close()
// wait for input, if history server was started
if (enableHsUi) {
printf("History server was running at port %d. Press enter to exit... \n", hsUiPort)
printf("History server was running at port %d. Press enter to exit... \n", hsUiBoundPort)
print("> ")
new Scanner(System.in).nextLine
}
Expand All @@ -157,6 +162,7 @@ abstract class Suite(
def tableCreator(): TableCreator

private def resetLogLevel(): Unit = {
System.setProperty(org.slf4j.impl.SimpleLogger.DEFAULT_LOG_LEVEL_KEY, logLevel.toString)
LogManager.getRootLogger.setLevel(logLevel)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.util.ShutdownHookManager
import java.net.ServerSocket

object HistoryServerHelper {
def startHistoryServer(conf: SparkConf): Unit = {
def startHistoryServer(conf: SparkConf): Int = {
initSecurity()
val securityManager = createSecurityManager(conf)
val providerName = conf
Expand All @@ -42,6 +42,7 @@ object HistoryServerHelper {
server.bind()
provider.start()
ShutdownHookManager.addShutdownHook(() => server.stop())
server.boundPort
}

private def findFreePort(): Int = {
Expand Down
24 changes: 15 additions & 9 deletions tools/gluten-it/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,31 +41,36 @@
<artifactId>gluten-substrait</artifactId>
</exclusion>
</exclusions>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-repl_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>jline</groupId>
Expand All @@ -77,14 +82,12 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
<type>test-jar</type>
</dependency>
<dependency>
Expand All @@ -97,14 +100,17 @@
<artifactId>protobuf-java</artifactId>
</exclusion>
</exclusions>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<type>test-jar</type>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
</dependencyManagement>
Expand Down

0 comments on commit ff89539

Please sign in to comment.