Skip to content

Commit

Permalink
[SPARK-26098][WEBUI] Show associated SQL query in Job page
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

For jobs associated to SQL queries, it would be easier to understand the context to showing the SQL query in Job detail page.
Before code change, it is hard to tell what the job is about from the job page:

![image](https://user-images.githubusercontent.com/1097932/48659359-96baa180-ea8a-11e8-8419-a0a87c3f30fc.png)

After code change:
![image](https://user-images.githubusercontent.com/1097932/48659390-26f8e680-ea8b-11e8-8fdd-3b58909ea364.png)

After navigating to the associated SQL detail page, We can see the whole context :
![image](https://user-images.githubusercontent.com/1097932/48659463-9fac7280-ea8c-11e8-9dfe-244e849f72a5.png)

**For Jobs don't have associated SQL query, the text won't be shown.**

## How was this patch tested?

Manual test

Closes apache#23068 from gengliangwang/addSQLID.

Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: gatorsmile <[email protected]>
  • Loading branch information
gengliangwang authored and gatorsmile committed Dec 13, 2018
1 parent 6c1f7ba commit 524d1be
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ private[spark] class AppStatusListener(
private val liveTasks = new HashMap[Long, LiveTask]()
private val liveRDDs = new HashMap[Int, LiveRDD]()
private val pools = new HashMap[String, SchedulerPool]()

private val SQL_EXECUTION_ID_KEY = "spark.sql.execution.id"
// Keep the active executor count as a separate variable to avoid having to do synchronization
// around liveExecutors.
@volatile private var activeExecutorCount = 0
Expand Down Expand Up @@ -318,14 +320,17 @@ private[spark] class AppStatusListener(
val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
val jobGroup = Option(event.properties)
.flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) }
val sqlExecutionId = Option(event.properties)
.flatMap(p => Option(p.getProperty(SQL_EXECUTION_ID_KEY)).map(_.toLong))

val job = new LiveJob(
event.jobId,
lastStageName,
if (event.time > 0) Some(new Date(event.time)) else None,
event.stageIds,
jobGroup,
numTasks)
numTasks,
sqlExecutionId)
liveJobs.put(event.jobId, job)
liveUpdate(job, now)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ private[spark] class AppStatusStore(
store.read(classOf[JobDataWrapper], jobId).info
}

// Returns job data and associated SQL execution ID of certain Job ID.
// If there is no related SQL execution, the SQL execution ID part will be None.
def jobWithAssociatedSql(jobId: Int): (v1.JobData, Option[Long]) = {
val data = store.read(classOf[JobDataWrapper], jobId)
(data.info, data.sqlExecutionId)
}

def executorList(activeOnly: Boolean): Seq[v1.ExecutorSummary] = {
val base = store.view(classOf[ExecutorSummaryWrapper])
val filtered = if (activeOnly) {
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/status/LiveEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ private class LiveJob(
val submissionTime: Option[Date],
val stageIds: Seq[Int],
jobGroup: Option[String],
numTasks: Int) extends LiveEntity {
numTasks: Int,
sqlExecutionId: Option[Long]) extends LiveEntity {

var activeTasks = 0
var completedTasks = 0
Expand Down Expand Up @@ -108,7 +109,7 @@ private class LiveJob(
skippedStages.size,
failedStages,
killedSummary)
new JobDataWrapper(info, skippedStages)
new JobDataWrapper(info, skippedStages, sqlExecutionId)
}

}
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/status/storeTypes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ private[spark] class ExecutorSummaryWrapper(val info: ExecutorSummary) {
*/
private[spark] class JobDataWrapper(
val info: JobData,
val skippedStages: Set[Int]) {
val skippedStages: Set[Int],
val sqlExecutionId: Option[Long]) {

@JsonIgnore @KVIndex
private def id: Int = info.jobId
Expand Down
14 changes: 13 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,15 @@ private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIP
require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")

val jobId = parameterId.toInt
val jobData = store.asOption(store.job(jobId)).getOrElse {
val (jobData, sqlExecutionId) = store.asOption(store.jobWithAssociatedSql(jobId)).getOrElse {
val content =
<div id="no-info">
<p>No information to display for job {jobId}</p>
</div>
return UIUtils.headerSparkPage(
request, s"Details for Job $jobId", content, parent)
}

val isComplete = jobData.status != JobExecutionStatus.RUNNING
val stages = jobData.stageIds.map { stageId =>
// This could be empty if the listener hasn't received information about the
Expand Down Expand Up @@ -278,6 +279,17 @@ private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIP
<Strong>Status:</Strong>
{jobData.status}
</li>
{
if (sqlExecutionId.isDefined) {
<li>
<strong>Associated SQL Query: </strong>
{<a href={"%s/SQL/execution/?id=%s".format(
UIUtils.prependBaseUri(request, parent.basePath),
sqlExecutionId.get)
}>{sqlExecutionId.get}</a>}
</li>
}
}
{
if (jobData.jobGroup.isDefined) {
<li>
Expand Down

0 comments on commit 524d1be

Please sign in to comment.