Skip to content

Commit

Permalink
[bugfix] when task status is complete, ignore engine quited unexpecte…
Browse files Browse the repository at this point in the history
…dly (apache#4758)

this close apache#4757
  • Loading branch information
guoshupei authored Jul 10, 2023
1 parent 10aad5f commit 71e6994
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.linkis.manager.common.protocol.node.{RequestNodeStatus, Respon
import org.apache.linkis.orchestrator.computation.conf.ComputationOrchestratorConf
import org.apache.linkis.orchestrator.computation.execute.{CodeExecTaskExecutor, EngineConnTaskInfo}
import org.apache.linkis.orchestrator.listener.task.{
EngineQuitedUnexpectedlyEvent,
TaskErrorResponseEvent,
TaskLogEvent,
TaskStatusEvent
Expand Down Expand Up @@ -202,20 +203,11 @@ object EngineConnMonitor extends Logging {
logger.warn(
s"Will kill task ${execTask.getIDInfo()} because the engine ${executor.getEngineConnExecutor.getServiceInstance.toString} quited unexpectedly."
)
val errLog = LogUtils.generateERROR(
s"Your job : ${execTask.getIDInfo()} was failed because the engine quitted unexpectedly(任务${execTask
.getIDInfo()}失败," +
s"原因是引擎意外退出,可能是复杂任务导致引擎退出,如OOM)."
)
val logEvent = TaskLogEvent(execTask, errLog)
execTask.getPhysicalContext.pushLog(logEvent)
val errorResponseEvent = TaskErrorResponseEvent(
val event = EngineQuitedUnexpectedlyEvent(
execTask,
"task failed,Engine quitted unexpectedly(任务运行失败原因是引擎意外退出,可能是复杂任务导致引擎退出,如OOM)."
executor.getEngineConnExecutor.getServiceInstance.toString
)
execTask.getPhysicalContext.broadcastSyncEvent(errorResponseEvent)
val statusEvent = TaskStatusEvent(execTask, ExecutionNodeStatus.Failed)
execTask.getPhysicalContext.broadcastSyncEvent(statusEvent)
execTask.getPhysicalContext.broadcastSyncEvent(event)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,13 @@ case class TaskYarnResourceEvent(
resourceMap: util.HashMap[String, ResourceWithStatus]
) extends TaskInfoEvent
with OrchestratorAsyncEvent

case class EngineQuitedUnexpectedlyEvent(execTask: ExecTask, serviceInstance: String)
extends TaskInfoEvent
with OrchestratorSyncEvent {

override def toString: String = {
s"task ${execTask.getIDInfo()}, serviceInstance $serviceInstance"
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.linkis.orchestrator.strategy.async

import org.apache.linkis.common.log.LogUtils
import org.apache.linkis.governance.common.entity.ExecutionNodeStatus
import org.apache.linkis.orchestrator.execution.ExecTaskRunner
import org.apache.linkis.orchestrator.execution.impl.DefaultTaskManager
Expand Down Expand Up @@ -46,6 +47,8 @@ class AsyncTaskManager
onTaskErrorResponseEvent(taskErrorResponseEvent)
case ExecTaskRunnerCompletedEvent(execTaskRunner) =>
addCompletedTask(execTaskRunner)
case event: EngineQuitedUnexpectedlyEvent =>
onEngineQuitedUnexpectedly(event)
case _ =>
}
}
Expand Down Expand Up @@ -109,4 +112,28 @@ class AsyncTaskManager
new AsyncExecTaskRunnerImpl(execTask)
}

def onEngineQuitedUnexpectedly(event: EngineQuitedUnexpectedlyEvent): Unit = {
logger.info(s"received EngineUnexpectedlyQuitedEvent $event")
findDealEventTaskRunner(event).foreach {
case asyncExecTaskRunner: AsyncExecTaskRunner =>
if (asyncExecTaskRunner.isCompleted) {
logger.warn(
s"task ${event.execTask.getIDInfo()} already complete, ignore engine ${event.serviceInstance} quited unexpectedly"
)
} else {
val execTask = event.execTask
val errLog = LogUtils.generateERROR(
s"Your job : ${execTask.getIDInfo()} was failed because the engine quited unexpectedly(任务${execTask.getIDInfo()}失败,原因是引擎意外退出,可能是复杂任务导致引擎退出,如OOM)."
)
val logEvent = TaskLogEvent(execTask, errLog)
execTask.getPhysicalContext.pushLog(logEvent)
val errorMsg =
s"task ${execTask.getIDInfo()} failed,Engine ${event.serviceInstance} quited unexpectedly(任务运行失败原因是引擎意外退出,可能是复杂任务导致引擎退出,如OOM)";
asyncExecTaskRunner.markFailed(errorMsg, null)
asyncExecTaskRunner.transientStatus(ExecutionNodeStatus.Failed)
}
case _ =>
}
}

}

0 comments on commit 71e6994

Please sign in to comment.