diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala index a60bdce7e7..00ee5e28e5 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala @@ -429,14 +429,36 @@ class TaskExecutionServiceImpl while (null != taskFuture && !taskFuture.isDone) { if (!ExecutionNodeStatus.isCompleted(task.getStatus)) { Utils.tryAndWarn { - val progressResponse = Utils.tryAndWarn(taskProgress(task.getTaskId)) - val resourceResponse = Utils.tryAndWarn(buildResourceMap(task)) - val extraInfoMap = Utils.tryAndWarn(buildExtraInfoMap(task)) + val progressResponse = Utils.tryCatch(taskProgress(task.getTaskId)) { + case e: Exception => + logger.info("Failed to get progress", e) + null + } + val resourceResponse = Utils.tryCatch(buildResourceMap(task)) { case e: Exception => + logger.info("Failed to get resource", e) + null + } + val extraInfoMap = Utils.tryCatch(buildExtraInfoMap(task)) { case e: Exception => + logger.info("Failed to get extra info ", e) + null + } val resourceMap = if (null != resourceResponse) resourceResponse.resourceMap else null + + /** + * It is guaranteed that there must be progress the progress must be greater than or + * equal to 0.1 + */ + val newProgressResponse = if (null == progressResponse) { + ResponseTaskProgress(task.getTaskId, 0.1f, null) + } else if (progressResponse.progress < 0.1f) { + ResponseTaskProgress(task.getTaskId, 0.1f, progressResponse.progressInfo) + } else { + progressResponse + } val respRunningInfo: ResponseTaskRunningInfo = ResponseTaskRunningInfo( - progressResponse.execId, - progressResponse.progress, - progressResponse.progressInfo, + newProgressResponse.execId, + newProgressResponse.progress, + newProgressResponse.progressInfo, resourceMap, extraInfoMap ) @@ -498,7 +520,7 @@ class TaskExecutionServiceImpl } override def taskProgress(taskID: String): ResponseTaskProgress = { - var response = ResponseTaskProgress(taskID, 0, null) + var response = ResponseTaskProgress(taskID, 0.01f, null) if (StringUtils.isBlank(taskID)) return response val executor = taskIdCache.getIfPresent(taskID) if (null != executor) { @@ -513,11 +535,9 @@ class TaskExecutionServiceImpl ResponseTaskProgress(taskID, progress, executor.getProgressInfo(taskID)) ) } - } else { - response = ResponseTaskProgress(taskID, -1, null) } } else { - logger.error(s"Executor of taskId : $taskID is not cached.") + logger.info(s"Executor of taskId : $taskID is not cached.") } response } diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala index c306d33eca..b403eb74ba 100644 --- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala +++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala @@ -470,7 +470,7 @@ class HiveEngineConnExecutor( val currentBegin = (currentSQL - 1) / totalSQLs.asInstanceOf[Float] val finishedStage = if (null != driver && null != driver.getPlan() && !driver.getPlan().getRootTasks.isEmpty) { - Utils.tryAndWarn( + Utils.tryQuietly( Utilities .getMRTasks(driver.getPlan().getRootTasks) .asScala