diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkJarOnceExecutor.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkJarOnceExecutor.scala index 28ea52656d..d925a80259 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkJarOnceExecutor.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkJarOnceExecutor.scala @@ -17,6 +17,9 @@ package org.apache.linkis.engineconnplugin.flink.executor +import java.util.concurrent.{Future, TimeUnit} + +import org.apache.commons.lang3.StringUtils import org.apache.linkis.common.utils.Utils import org.apache.linkis.engineconn.acessible.executor.service.ExecutorHeartbeatServiceHolder import org.apache.linkis.engineconn.executor.service.ManagerService @@ -27,15 +30,8 @@ import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration._ import org.apache.linkis.engineconnplugin.flink.context.FlinkEngineConnContext import org.apache.linkis.engineconnplugin.flink.operator.StatusOperator import org.apache.linkis.engineconnplugin.flink.util.YarnUtil -import org.apache.linkis.governance.common.conf.GovernanceCommonConf -import org.apache.linkis.governance.common.constant.ec.ECConstants import org.apache.linkis.manager.common.entity.enumeration.NodeStatus -import org.apache.commons.lang3.StringUtils - -import java.util -import java.util.concurrent.{Future, TimeUnit} - import scala.concurrent.duration.Duration class FlinkJarOnceExecutor( @@ -60,11 +56,12 @@ class FlinkJarOnceExecutor( } override protected def waitToRunning(): Unit = { - Utils.waitUntil(() => clusterDescriptor.initJobId(), Duration.Inf) - setJobID(clusterDescriptor.getJobId.toHexString) - super.waitToRunning() if (YarnUtil.isDetach(flinkEngineConnContext.getEnvironmentContext.getExtraParams())) { - waitToExit() + tryToHeartbeat() + } else { + Utils.waitUntil(() => clusterDescriptor.initJobId(), Duration.Inf) + setJobID(clusterDescriptor.getJobId.toHexString) + super.waitToRunning() } } @@ -89,47 +86,37 @@ class FlinkJarOnceExecutor( } } - private def waitToExit(): Unit = { + private def tryToHeartbeat(): Unit = { // upload applicationId to manager and then exit - val thisExecutor = this - if (!isCompleted) { - daemonThread = Utils.defaultScheduler.scheduleWithFixedDelay( - new Runnable { - override def run(): Unit = { - if (!isCompleted) { - Utils.waitUntil(() => StringUtils.isNotBlank(getApplicationId), Duration.apply("10s")) - if (StringUtils.isNotBlank(getApplicationId)) { - Utils.tryAndWarn { - val heartbeatService = ExecutorHeartbeatServiceHolder.getDefaultHeartbeatService() - if (null == heartbeatService) { - logger.error("HeartbeatService not inited.") - return null - } - val heartbeatMsg = heartbeatService.generateHeartBeatMsg(thisExecutor) - ManagerService.getManagerService.heartbeatReport(heartbeatMsg) - logger.info( - s"Succeed to report heatbeatMsg : ${heartbeatMsg.getHeartBeatMsg}, will add handshake." - ) - if (0L >= firstReportAppIdTimestampMills) { - firstReportAppIdTimestampMills = System.currentTimeMillis() - } - if (!StatusOperator.isHandshaked) { - StatusOperator.addHandshake() - } else { - logger.info("Will exit with handshaked.") - trySucceed() - } - } - } - } + logger.info(s"try to send heartbeat to LinkisManager with applicationId: $getApplicationId.") + daemonThread = Utils.defaultScheduler.scheduleWithFixedDelay( + new Runnable { + override def run(): Unit = Utils.tryAndWarn { + val heartbeatService = ExecutorHeartbeatServiceHolder.getDefaultHeartbeatService() + if (null == heartbeatService) { + logger.warn("HeartbeatService is not inited.") + return } - }, - 1000, - FlinkEnvConfiguration.FLINK_ONCE_JAR_APP_REPORT_APPLICATIONID_INTERVAL.getValue.toLong, - TimeUnit.MILLISECONDS - ) - logger.info("waitToExit submited.") - } + val heartbeatMsg = heartbeatService.generateHeartBeatMsg(FlinkJarOnceExecutor.this) + ManagerService.getManagerService.heartbeatReport(heartbeatMsg) + logger.info( + s"Succeed to report heartbeatMsg: ${heartbeatMsg.getHeartBeatMsg}, will add handshake." + ) + if (0L >= firstReportAppIdTimestampMills) { + firstReportAppIdTimestampMills = System.currentTimeMillis() + } + if (!StatusOperator.isHandshaked) { + StatusOperator.addHandshake() + } else { + logger.info("submit to yarn, report heartbeat to LinkisManager, and add handshake succeed, now exit this detach ec.") + trySucceed() + } + } + }, + 1000, + FlinkEnvConfiguration.FLINK_ONCE_JAR_APP_REPORT_APPLICATIONID_INTERVAL.getValue.toLong, + TimeUnit.MILLISECONDS + ) } }