From ed3957ca7a51a62444655925922947785f678dd0 Mon Sep 17 00:00:00 2001 From: lyogev Date: Mon, 4 May 2020 22:23:48 +0300 Subject: [PATCH] fix(scheduler): add fair scheduler and a new parallel mode to metorikku --- docker/spark/Dockerfile | 1 + docker/spark/fairscheduler.xml | 7 +++++ docker/spark/scripts/entrypoint-submit.sh | 6 +++-- docker/spark/scripts/entrypoint-worker.sh | 9 +++++-- docker/spark/spark-defaults.conf | 5 +++- .../scala/com/yotpo/metorikku/Metorikku.scala | 27 ++++++++++++++++--- .../configuration/job/Configuration.scala | 1 + .../com/yotpo/metorikku/test/Tester.scala | 2 +- 8 files changed, 48 insertions(+), 10 deletions(-) create mode 100644 docker/spark/fairscheduler.xml diff --git a/docker/spark/Dockerfile b/docker/spark/Dockerfile index b27083846..43e92180c 100644 --- a/docker/spark/Dockerfile +++ b/docker/spark/Dockerfile @@ -61,6 +61,7 @@ RUN rm -f /spark/jars/guava-*.jar ADD log4j.json.properties /spark/conf/ ADD spark-defaults.conf /spark/conf/ ADD spark-env.sh /spark/conf/ +ADD fairscheduler.xml /spark/conf/ ENV PYTHONHASHSEED 1 ENV SPARK_HOME /spark diff --git a/docker/spark/fairscheduler.xml b/docker/spark/fairscheduler.xml new file mode 100644 index 000000000..efce2cdc4 --- /dev/null +++ b/docker/spark/fairscheduler.xml @@ -0,0 +1,7 @@ + + + FAIR + 1 + 2 + + \ No newline at end of file diff --git a/docker/spark/scripts/entrypoint-submit.sh b/docker/spark/scripts/entrypoint-submit.sh index 2dabd5c9c..7fe816022 100755 --- a/docker/spark/scripts/entrypoint-submit.sh +++ b/docker/spark/scripts/entrypoint-submit.sh @@ -7,6 +7,7 @@ SPARK_MASTER_HOST=${SPARK_MASTER_HOST:=spark-master} MAX_RETRIES=${MAX_RETRIES:=300} MIN_WORKERS=${MIN_WORKERS:=1} SPARK_UI_PORT=${SPARK_UI_PORT:=4040} +SHUFFLE_SERVICE_PORT=${SHUFFLE_SERVICE_PORT:=7337} POST_SCRIPT=${POST_SCRIPT:=/scripts/finish-submit.sh} # Atlas @@ -36,8 +37,9 @@ fi # Run command SPARK_MASTER="spark://${SPARK_MASTER_HOST}:${SPARK_MASTER_PORT}" echo -e " -spark.master $SPARK_MASTER -spark.ui.port $SPARK_UI_PORT +spark.master=$SPARK_MASTER +spark.ui.port=$SPARK_UI_PORT +spark.shuffle.service.port=$SHUFFLE_SERVICE_PORT " >> /spark/conf/spark-defaults.conf if [[ ! -z ${HIVE_METASTORE_URI} ]]; then diff --git a/docker/spark/scripts/entrypoint-worker.sh b/docker/spark/scripts/entrypoint-worker.sh index 2c0779b57..1b6f5dd2e 100755 --- a/docker/spark/scripts/entrypoint-worker.sh +++ b/docker/spark/scripts/entrypoint-worker.sh @@ -5,7 +5,8 @@ SPARK_MASTER_PORT=${SPARK_MASTER_PORT:=7077} SPARK_WORKER_WEBUI_PORT=${SPARK_WORKER_WEBUI_PORT:=8081} SPARK_MASTER_HOST=${SPARK_MASTER_HOST:=spark-master} SPARK_WORKER_PORT=${SPARK_WORKER_PORT:=7078} -SPARK_WORKER_OPTS=${SPARK_WORKER_OPTS:="-Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.appDataTtl=1800"} +SHUFFLE_SERVICE_PORT=${SHUFFLE_SERVICE_PORT:=7337} +SPARK_WORKER_OPTS=${SPARK_WORKER_OPTS:="-Dspark.shuffle.service.enabled=true -Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.appDataTtl=1800"} # Logs /scripts/init-logs-metrics.sh @@ -13,8 +14,12 @@ SPARK_WORKER_OPTS=${SPARK_WORKER_OPTS:="-Dspark.worker.cleanup.enabled=true -Dsp # Monitor Logs /scripts/monitor-executor-logs.sh & +echo -e " +spark.shuffle.service.port=$SHUFFLE_SERVICE_PORT +" >> /spark/conf/spark-defaults.conf + . "/spark/sbin/spark-config.sh" . "/spark/bin/load-spark-env.sh" SPARK_MASTER="spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT" /spark/sbin/../bin/spark-class org.apache.spark.deploy.worker.Worker \ - --webui-port ${SPARK_WORKER_WEBUI_PORT} --port ${SPARK_WORKER_PORT} ${SPARK_MASTER} + --webui-port ${SPARK_WORKER_WEBUI_PORT} --port ${SPARK_WORKER_PORT} --properties-file /spark/conf/spark-defaults.conf ${SPARK_MASTER} diff --git a/docker/spark/spark-defaults.conf b/docker/spark/spark-defaults.conf index 55d9dd030..c99611dad 100644 --- a/docker/spark/spark-defaults.conf +++ b/docker/spark/spark-defaults.conf @@ -20,4 +20,7 @@ spark.serializer=org.apache.spark.serializer.KryoSerializer spark.sql.hive.convertMetastoreParquet=false spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation=true spark.driver.extraClassPath=/spark/pre-jars/* -spark.executor.extraClassPath=/spark/pre-jars/* \ No newline at end of file +spark.executor.extraClassPath=/spark/pre-jars/* +spark.shuffle.service.enabled=true +spark.dynamicAllocation.enabled=true +spark.scheduler.mode=FAIR \ No newline at end of file diff --git a/src/main/scala/com/yotpo/metorikku/Metorikku.scala b/src/main/scala/com/yotpo/metorikku/Metorikku.scala index 796f381d8..a12943610 100644 --- a/src/main/scala/com/yotpo/metorikku/Metorikku.scala +++ b/src/main/scala/com/yotpo/metorikku/Metorikku.scala @@ -30,12 +30,31 @@ object Metorikku extends App { ex.scheduleAtFixedRate(task, initialDelay, periodic.getTriggerDurationInSeconds(), TimeUnit.SECONDS) } - def runMetrics(job: Job): Unit = { - job.config.metrics match { - case Some(metrics) => metrics.foreach(metricSetPath => { + def runMetricsInParallel(job: Job, metrics: Seq[String]): Unit = { + val threads = metrics.map(metricSetPath => new Thread(new Runnable { + def run() { val metricSet = new MetricSet(metricSetPath) metricSet.run(job) - }) + } + })).toList + + threads.foreach(t => t.start()) + threads.foreach(t => t.join()) + } + + def runMetrics(job: Job): Unit = { + job.config.metrics match { + case Some(metrics) => { + session.config.parallel match { + case Some(true) => runMetricsInParallel(job, metrics) + case _ => { + metrics.foreach(metricSetPath => { + val metricSet = new MetricSet(metricSetPath) + metricSet.run(job) + }) + } + } + } case None => log.warn("No mertics were defined, exiting") } } diff --git a/src/main/scala/com/yotpo/metorikku/configuration/job/Configuration.scala b/src/main/scala/com/yotpo/metorikku/configuration/job/Configuration.scala index 42e7dcf9f..92793d0b2 100644 --- a/src/main/scala/com/yotpo/metorikku/configuration/job/Configuration.scala +++ b/src/main/scala/com/yotpo/metorikku/configuration/job/Configuration.scala @@ -13,6 +13,7 @@ case class Configuration(metrics: Option[Seq[String]], showQuery: Option[Boolean], streaming: Option[Streaming], periodic: Option[Periodic], + parallel: Option[Boolean], var logLevel: Option[String], var showPreviewLines: Option[Int], var explain: Option[Boolean], diff --git a/src/main/scala/com/yotpo/metorikku/test/Tester.scala b/src/main/scala/com/yotpo/metorikku/test/Tester.scala index fccff5d00..0e2018dda 100644 --- a/src/main/scala/com/yotpo/metorikku/test/Tester.scala +++ b/src/main/scala/com/yotpo/metorikku/test/Tester.scala @@ -46,7 +46,7 @@ case class Tester(config: TesterConfig) { val variables = params.variables val inputs = getMockFilesFromDir(config.test.mocks, config.basePath) Configuration(Option(metrics), inputs, variables, None, None, None, None, None, - Option(config.preview > 0), None, None, None, Option(config.preview), None, None, None, None) + Option(config.preview > 0), None, None, None, None, Option(config.preview), None, None, None, None) } private def getMockFilesFromDir(mocks: Option[List[Mock]], testDir: File): Option[Map[String, Input]] = {