Skip to content

Commit

Permalink
Fix progress get 0.1 fake progress
Browse files Browse the repository at this point in the history
  • Loading branch information
peacewong committed Sep 19, 2023
1 parent 1e9fb71 commit 2503979
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -429,14 +429,36 @@ class TaskExecutionServiceImpl
while (null != taskFuture && !taskFuture.isDone) {
if (!ExecutionNodeStatus.isCompleted(task.getStatus)) {
Utils.tryAndWarn {
val progressResponse = Utils.tryAndWarn(taskProgress(task.getTaskId))
val resourceResponse = Utils.tryAndWarn(buildResourceMap(task))
val extraInfoMap = Utils.tryAndWarn(buildExtraInfoMap(task))
val progressResponse = Utils.tryCatch(taskProgress(task.getTaskId)) {
case e: Exception =>
logger.info("Failed to get progress", e)
null
}
val resourceResponse = Utils.tryCatch(buildResourceMap(task)) { case e: Exception =>
logger.info("Failed to get resource", e)
null
}
val extraInfoMap = Utils.tryCatch(buildExtraInfoMap(task)) { case e: Exception =>
logger.info("Failed to get extra info ", e)
null
}
val resourceMap = if (null != resourceResponse) resourceResponse.resourceMap else null

/**
* It is guaranteed that there must be progress the progress must be greater than or
* equal to 0.1
*/
val newProgressResponse = if (null == progressResponse) {
ResponseTaskProgress(task.getTaskId, 0.1f, null)
} else if (progressResponse.progress < 0.1f) {
ResponseTaskProgress(task.getTaskId, 0.1f, progressResponse.progressInfo)
} else {
progressResponse
}
val respRunningInfo: ResponseTaskRunningInfo = ResponseTaskRunningInfo(
progressResponse.execId,
progressResponse.progress,
progressResponse.progressInfo,
newProgressResponse.execId,
newProgressResponse.progress,
newProgressResponse.progressInfo,
resourceMap,
extraInfoMap
)
Expand Down Expand Up @@ -498,7 +520,7 @@ class TaskExecutionServiceImpl
}

override def taskProgress(taskID: String): ResponseTaskProgress = {
var response = ResponseTaskProgress(taskID, 0, null)
var response = ResponseTaskProgress(taskID, 0.01f, null)
if (StringUtils.isBlank(taskID)) return response
val executor = taskIdCache.getIfPresent(taskID)
if (null != executor) {
Expand All @@ -513,11 +535,9 @@ class TaskExecutionServiceImpl
ResponseTaskProgress(taskID, progress, executor.getProgressInfo(taskID))
)
}
} else {
response = ResponseTaskProgress(taskID, -1, null)
}
} else {
logger.error(s"Executor of taskId : $taskID is not cached.")
logger.info(s"Executor of taskId : $taskID is not cached.")
}
response
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ class HiveEngineConnExecutor(
val currentBegin = (currentSQL - 1) / totalSQLs.asInstanceOf[Float]
val finishedStage =
if (null != driver && null != driver.getPlan() && !driver.getPlan().getRootTasks.isEmpty) {
Utils.tryAndWarn(
Utils.tryQuietly(
Utilities
.getMRTasks(driver.getPlan().getRootTasks)
.asScala
Expand Down

0 comments on commit 2503979

Please sign in to comment.