From 524d1be6d2920674eb871b5f0f25e7496a374090 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 13 Dec 2018 09:07:33 -0800 Subject: [PATCH] [SPARK-26098][WEBUI] Show associated SQL query in Job page ## What changes were proposed in this pull request? For jobs associated to SQL queries, it would be easier to understand the context to showing the SQL query in Job detail page. Before code change, it is hard to tell what the job is about from the job page: ![image](https://user-images.githubusercontent.com/1097932/48659359-96baa180-ea8a-11e8-8419-a0a87c3f30fc.png) After code change: ![image](https://user-images.githubusercontent.com/1097932/48659390-26f8e680-ea8b-11e8-8fdd-3b58909ea364.png) After navigating to the associated SQL detail page, We can see the whole context : ![image](https://user-images.githubusercontent.com/1097932/48659463-9fac7280-ea8c-11e8-9dfe-244e849f72a5.png) **For Jobs don't have associated SQL query, the text won't be shown.** ## How was this patch tested? Manual test Closes #23068 from gengliangwang/addSQLID. Authored-by: Gengliang Wang Signed-off-by: gatorsmile --- .../apache/spark/status/AppStatusListener.scala | 7 ++++++- .../org/apache/spark/status/AppStatusStore.scala | 7 +++++++ .../scala/org/apache/spark/status/LiveEntity.scala | 5 +++-- .../scala/org/apache/spark/status/storeTypes.scala | 3 ++- .../scala/org/apache/spark/ui/jobs/JobPage.scala | 14 +++++++++++++- 5 files changed, 31 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index bd3f58b6182c0..262ff6547faa5 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -70,6 +70,8 @@ private[spark] class AppStatusListener( private val liveTasks = new HashMap[Long, LiveTask]() private val liveRDDs = new HashMap[Int, LiveRDD]() private val pools = new HashMap[String, SchedulerPool]() + + private val SQL_EXECUTION_ID_KEY = "spark.sql.execution.id" // Keep the active executor count as a separate variable to avoid having to do synchronization // around liveExecutors. @volatile private var activeExecutorCount = 0 @@ -318,6 +320,8 @@ private[spark] class AppStatusListener( val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)") val jobGroup = Option(event.properties) .flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) } + val sqlExecutionId = Option(event.properties) + .flatMap(p => Option(p.getProperty(SQL_EXECUTION_ID_KEY)).map(_.toLong)) val job = new LiveJob( event.jobId, @@ -325,7 +329,8 @@ private[spark] class AppStatusListener( if (event.time > 0) Some(new Date(event.time)) else None, event.stageIds, jobGroup, - numTasks) + numTasks, + sqlExecutionId) liveJobs.put(event.jobId, job) liveUpdate(job, now) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index b35781cb36e81..312bcccb1cca1 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -56,6 +56,13 @@ private[spark] class AppStatusStore( store.read(classOf[JobDataWrapper], jobId).info } + // Returns job data and associated SQL execution ID of certain Job ID. + // If there is no related SQL execution, the SQL execution ID part will be None. + def jobWithAssociatedSql(jobId: Int): (v1.JobData, Option[Long]) = { + val data = store.read(classOf[JobDataWrapper], jobId) + (data.info, data.sqlExecutionId) + } + def executorList(activeOnly: Boolean): Seq[v1.ExecutorSummary] = { val base = store.view(classOf[ExecutorSummaryWrapper]) val filtered = if (activeOnly) { diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index 47e45a66ecccb..7f7b83a54d794 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -64,7 +64,8 @@ private class LiveJob( val submissionTime: Option[Date], val stageIds: Seq[Int], jobGroup: Option[String], - numTasks: Int) extends LiveEntity { + numTasks: Int, + sqlExecutionId: Option[Long]) extends LiveEntity { var activeTasks = 0 var completedTasks = 0 @@ -108,7 +109,7 @@ private class LiveJob( skippedStages.size, failedStages, killedSummary) - new JobDataWrapper(info, skippedStages) + new JobDataWrapper(info, skippedStages, sqlExecutionId) } } diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index ef19e86f3135f..eea47b3b17098 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -68,7 +68,8 @@ private[spark] class ExecutorSummaryWrapper(val info: ExecutorSummary) { */ private[spark] class JobDataWrapper( val info: JobData, - val skippedStages: Set[Int]) { + val skippedStages: Set[Int], + val sqlExecutionId: Option[Long]) { @JsonIgnore @KVIndex private def id: Int = info.jobId diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala index 55444a2c0c9ab..b58a6ca447edf 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala @@ -189,7 +189,7 @@ private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIP require(parameterId != null && parameterId.nonEmpty, "Missing id parameter") val jobId = parameterId.toInt - val jobData = store.asOption(store.job(jobId)).getOrElse { + val (jobData, sqlExecutionId) = store.asOption(store.jobWithAssociatedSql(jobId)).getOrElse { val content =

No information to display for job {jobId}

@@ -197,6 +197,7 @@ private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIP return UIUtils.headerSparkPage( request, s"Details for Job $jobId", content, parent) } + val isComplete = jobData.status != JobExecutionStatus.RUNNING val stages = jobData.stageIds.map { stageId => // This could be empty if the listener hasn't received information about the @@ -278,6 +279,17 @@ private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIP Status: {jobData.status} + { + if (sqlExecutionId.isDefined) { +
  • + Associated SQL Query: + {{sqlExecutionId.get}} +
  • + } + } { if (jobData.jobGroup.isDefined) {