From e765bb142b058a6e16d1083e3507e0984a6fc5e8 Mon Sep 17 00:00:00 2001 From: Krishnan R Date: Tue, 29 May 2018 23:24:19 +0530 Subject: [PATCH] Fixed byte/str issue #9 in serverextension Removed stray print statements in scala SparkListener. --- extension/MANIFEST.in | 3 +- extension/VERSION | 2 +- extension/scalalistener/CustomListener.scala | 42 ++++++++++---------- extension/sparkmonitor/serverextension.py | 7 ++-- 4 files changed, 27 insertions(+), 27 deletions(-) diff --git a/extension/MANIFEST.in b/extension/MANIFEST.in index d31b9184..2d72b59c 100644 --- a/extension/MANIFEST.in +++ b/extension/MANIFEST.in @@ -1,2 +1,3 @@ graft sparkmonitor/static -recursive-include sparkmonitor *.jar \ No newline at end of file +recursive-include sparkmonitor *.jar +include VERSION diff --git a/extension/VERSION b/extension/VERSION index 3ce186fb..38494260 100644 --- a/extension/VERSION +++ b/extension/VERSION @@ -1 +1 @@ -v0.0.8 +v0.0.9 diff --git a/extension/scalalistener/CustomListener.scala b/extension/scalalistener/CustomListener.scala index 2aa9bf01..2f04a50d 100644 --- a/extension/scalalistener/CustomListener.scala +++ b/extension/scalalistener/CustomListener.scala @@ -30,9 +30,9 @@ import java.io._ */ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener { - println("SPARKLISTENER: Started SparkListener for Jupyter Notebook") + println("SPARKMONITOR_LISTENER: Started SparkListener for Jupyter Notebook") val port = scala.util.Properties.envOrElse("SPARKMONITOR_KERNEL_PORT", "ERRORNOTFOUND") - println("SPARKLISTENER: Port obtained from environment: " + port) + println("SPARKMONITOR_LISTENER: Port obtained from environment: " + port) var socket: Socket = null var out: OutputStreamWriter = null // Open the socket to the kernel. The kernel is the server already waiting for connections. @@ -40,24 +40,24 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener { socket = new Socket("localhost", port.toInt) out = new OutputStreamWriter(socket.getOutputStream()) } catch { - case exception: Throwable => println("\nSPARKLISTENER: Exception creating socket:" + exception + "\n") + case exception: Throwable => println("\nSPARKMONITOR_LISTENER: Exception creating socket:" + exception + "\n") } /** Send a string message to the kernel using the open socket.*/ def send(msg: String): Unit = { try { - //println("\nSPARKLISTENER: --------------Sending Message:------------------\n"+msg+ - // "\nSPARKLISTENER: -------------------------------------------------\n") // Uncomment to see all events + //println("\nSPARKMONITOR_LISTENER: --------------Sending Message:------------------\n"+msg+ + // "\nSPARKMONITOR_LISTENER: -------------------------------------------------\n") // Uncomment to see all events out.write(msg + ";EOD:") out.flush() } catch { - case exception: Throwable => println("\nSPARKLISTENER: Exception sending socket message:" + exception + "\n") + case exception: Throwable => println("\nSPARKMONITOR_LISTENER: Exception sending socket message:" + exception + "\n") } } /** Close the socket connection to the kernel.*/ def closeConnection(): Unit = { - println("SPARKLISTNER: Closing Connection") + println("SPARKMONITOR_LISTENER: Closing Connection") out.close() socket.close() } @@ -112,7 +112,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener { override def onApplicationStart(appStarted: SparkListenerApplicationStart): Unit = { startTime = appStarted.time appId = appStarted.appId.getOrElse("null") - println("SPARKLISTENER Application Started: " + appId + " ...Start Time: " + appStarted.time) + println("SPARKMONITOR_LISTENER: Application Started: " + appId + " ...Start Time: " + appStarted.time) val json = ("msgtype" -> "sparkApplicationStart") ~ ("startTime" -> startTime) ~ ("appId" -> appId) ~ @@ -129,7 +129,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener { * Closes the socket connection to the kernel. */ override def onApplicationEnd(appEnded: SparkListenerApplicationEnd): Unit = { - println("SPARKLISTENER Application ending...End Time: " + appEnded.time) + println("SPARKMONITOR_LISTENER: Application ending...End Time: " + appEnded.time) endTime = appEnded.time val json = ("msgtype" -> "sparkApplicationEnd") ~ ("endTime" -> endTime) @@ -195,7 +195,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener { stageIdToData.getOrElseUpdate((stageInfo.stageId, stageInfo.attemptId), new StageUIData) } val name = jobStart.properties.getProperty("callSite.short", "null") - println("Num Executors" + numExecutors.toInt) + // println("Num Executors" + numExecutors.toInt) val json = ("msgtype" -> "sparkJobStart") ~ ("jobGroup" -> jobGroup.getOrElse("null")) ~ ("jobId" -> jobStart.jobId) ~ @@ -208,14 +208,14 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener { ("appId" -> appId) ~ ("numExecutors" -> numExecutors) ~ ("name" -> name) - println("SPARKLISTENER JobStart: \n" + pretty(render(json)) + "\n") + // println("SPARKMONITOR_LISTENER: JobStart: \n" + pretty(render(json)) + "\n") send(pretty(render(json))) } /** Called when a job ends. */ override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized { val jobData = activeJobs.remove(jobEnd.jobId).getOrElse { - println("SPARKLISTENER:Job completed for unknown job: " + jobEnd.jobId) + println("SPARKMONITOR_LISTENER: Job completed for unknown job: " + jobEnd.jobId) new JobUIData(jobId = jobEnd.jobId) } jobData.completionTime = Option(jobEnd.time).filter(_ >= 0) @@ -266,7 +266,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener { val stage = stageCompleted.stageInfo stageIdToInfo(stage.stageId) = stage val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), { - println("SPARKLISTENER: Stage completed for unknown stage " + stage.stageId) + println("SPARKMONITOR_LISTENER: Stage completed for unknown stage " + stage.stageId) new StageUIData }) var status = "UNKNOWN" @@ -306,7 +306,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener { ("numTasks" -> stage.numTasks) ~ ("status" -> status) - println("SPARKLISTENER Stage Completed: \n" + pretty(render(json)) + "\n") + // println("SPARKMONITOR_LISTENER: Stage Completed: \n" + pretty(render(json)) + "\n") send(pretty(render(json))) } @@ -342,7 +342,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener { ("parentIds" -> stage.parentIds) ~ ("submissionTime" -> submissionTime) ~ ("jobIds" -> jobIds) - println("SPARKLISTENER Stage Submitted: \n" + pretty(render(json)) + "\n") + // println("SPARKMONITOR_LISTENER Stage Submitted: \n" + pretty(render(json)) + "\n") send(pretty(render(json))) } @@ -351,7 +351,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener { val taskInfo = taskStart.taskInfo if (taskInfo != null) { val stageData = stageIdToData.getOrElseUpdate((taskStart.stageId, taskStart.stageAttemptId), { - println("SPARKLISTENER: Task start for unknown stage " + taskStart.stageId) + println("SPARKMONITOR_LISTENER: Task start for unknown stage " + taskStart.stageId) new StageUIData }) stageData.numActiveTasks += 1 @@ -387,7 +387,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener { ("status" -> taskInfo.status) ~ ("speculative" -> taskInfo.speculative) - //println("SPARKLISTENER Task Started: \n"+ pretty(render(json)) + "\n") + //println("SPARKMONITOR_LISTENER: Task Started: \n"+ pretty(render(json)) + "\n") send(pretty(render(json))) } @@ -400,7 +400,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener { var errorMessage: Option[String] = None if (info != null && taskEnd.stageAttemptId != -1) { val stageData = stageIdToData.getOrElseUpdate((taskEnd.stageId, taskEnd.stageAttemptId), { - println("SPARKLISTENER: Task end for unknown stage " + taskEnd.stageId) + println("SPARKMONITOR_LISTENER: Task end for unknown stage " + taskEnd.stageId) new StageUIData }) stageData.numActiveTasks -= 1 @@ -517,7 +517,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener { ("errorMessage" -> errorMessage) ~ ("metrics" -> jsonMetrics) - println("SPARKLISTENER Task Ended: \n" + pretty(render(json)) + "\n") + // println("SPARKMONITOR_LISTENER: Task Ended: \n" + pretty(render(json)) + "\n") send(pretty(render(json))) } @@ -573,7 +573,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener { ("numCores" -> executorAdded.executorInfo.totalCores) ~ ("totalCores" -> totalCores) // Sending this as browser data can be lost during reloads - println("SPARKLISTENER Executor Added: \n" + pretty(render(json)) + "\n") + // println("SPARKMONITOR_LISTENER: Executor Added: \n" + pretty(render(json)) + "\n") send(pretty(render(json))) } @@ -586,7 +586,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener { ("time" -> executorRemoved.time) ~ ("totalCores" -> totalCores) // Sending this as browser data can be lost during reloads - println("SPARKLISTENER Executor Removed: \n" + pretty(render(json)) + "\n") + // println("SPARKMONITOR_LISTENER: Executor Removed: \n" + pretty(render(json)) + "\n") send(pretty(render(json))) } } diff --git a/extension/sparkmonitor/serverextension.py b/extension/sparkmonitor/serverextension.py index 5fdea0c3..2943c9ad 100644 --- a/extension/sparkmonitor/serverextension.py +++ b/extension/sparkmonitor/serverextension.py @@ -42,8 +42,7 @@ def get(self): self.request.uri.index(proxy_root) + len(proxy_root) + 1):] self.replace_path = self.request.uri[:self.request.uri.index( proxy_root) + len(proxy_root)] - print("SPARKMONITOR_SERVER: Request_path " + - request_path + " \n Replace_path:" + self.replace_path) + # print("SPARKMONITOR_SERVER: Request_path " + request_path + " \n Replace_path:" + self.replace_path) backendurl = url_path_join(url, request_path) self.debug_url = url self.backendurl = backendurl @@ -64,8 +63,8 @@ def handle_response(self, response): if "text/html" in content_type: content = replace(response.body, self.replace_path) elif "javascript" in content_type: - content = response.body.replace( - "location.origin", "location.origin +'" + self.replace_path + "' ") + body="location.origin +'" + self.replace_path + "' " + content = response.body.replace(b"location.origin",body.encode()) else: # Probably binary response, send it directly. content = response.body