Skip to content

Commit

Permalink
Merge branch 'dev-1.1.16-webank' into dev-1.1.17-webank
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexkun committed Oct 26, 2023
2 parents 27b1707 + 05ed141 commit c9cb5c6
Showing 1 changed file with 37 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.linkis.engineconnplugin.flink.executor

import java.util.concurrent.{Future, TimeUnit}

import org.apache.commons.lang3.StringUtils
import org.apache.linkis.common.utils.Utils
import org.apache.linkis.engineconn.acessible.executor.service.ExecutorHeartbeatServiceHolder
import org.apache.linkis.engineconn.executor.service.ManagerService
Expand All @@ -27,15 +30,8 @@ import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration._
import org.apache.linkis.engineconnplugin.flink.context.FlinkEngineConnContext
import org.apache.linkis.engineconnplugin.flink.operator.StatusOperator
import org.apache.linkis.engineconnplugin.flink.util.YarnUtil
import org.apache.linkis.governance.common.conf.GovernanceCommonConf
import org.apache.linkis.governance.common.constant.ec.ECConstants
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus

import org.apache.commons.lang3.StringUtils

import java.util
import java.util.concurrent.{Future, TimeUnit}

import scala.concurrent.duration.Duration

class FlinkJarOnceExecutor(
Expand All @@ -60,11 +56,12 @@ class FlinkJarOnceExecutor(
}

override protected def waitToRunning(): Unit = {
Utils.waitUntil(() => clusterDescriptor.initJobId(), Duration.Inf)
setJobID(clusterDescriptor.getJobId.toHexString)
super.waitToRunning()
if (YarnUtil.isDetach(flinkEngineConnContext.getEnvironmentContext.getExtraParams())) {
waitToExit()
tryToHeartbeat()
} else {
Utils.waitUntil(() => clusterDescriptor.initJobId(), Duration.Inf)
setJobID(clusterDescriptor.getJobId.toHexString)
super.waitToRunning()
}
}

Expand All @@ -89,47 +86,37 @@ class FlinkJarOnceExecutor(
}
}

private def waitToExit(): Unit = {
private def tryToHeartbeat(): Unit = {
// upload applicationId to manager and then exit
val thisExecutor = this
if (!isCompleted) {
daemonThread = Utils.defaultScheduler.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = {
if (!isCompleted) {
Utils.waitUntil(() => StringUtils.isNotBlank(getApplicationId), Duration.apply("10s"))
if (StringUtils.isNotBlank(getApplicationId)) {
Utils.tryAndWarn {
val heartbeatService = ExecutorHeartbeatServiceHolder.getDefaultHeartbeatService()
if (null == heartbeatService) {
logger.error("HeartbeatService not inited.")
return null
}
val heartbeatMsg = heartbeatService.generateHeartBeatMsg(thisExecutor)
ManagerService.getManagerService.heartbeatReport(heartbeatMsg)
logger.info(
s"Succeed to report heatbeatMsg : ${heartbeatMsg.getHeartBeatMsg}, will add handshake."
)
if (0L >= firstReportAppIdTimestampMills) {
firstReportAppIdTimestampMills = System.currentTimeMillis()
}
if (!StatusOperator.isHandshaked) {
StatusOperator.addHandshake()
} else {
logger.info("Will exit with handshaked.")
trySucceed()
}
}
}
}
logger.info(s"try to send heartbeat to LinkisManager with applicationId: $getApplicationId.")
daemonThread = Utils.defaultScheduler.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = Utils.tryAndWarn {
val heartbeatService = ExecutorHeartbeatServiceHolder.getDefaultHeartbeatService()
if (null == heartbeatService) {
logger.warn("HeartbeatService is not inited.")
return
}
},
1000,
FlinkEnvConfiguration.FLINK_ONCE_JAR_APP_REPORT_APPLICATIONID_INTERVAL.getValue.toLong,
TimeUnit.MILLISECONDS
)
logger.info("waitToExit submited.")
}
val heartbeatMsg = heartbeatService.generateHeartBeatMsg(FlinkJarOnceExecutor.this)
ManagerService.getManagerService.heartbeatReport(heartbeatMsg)
logger.info(
s"Succeed to report heartbeatMsg: ${heartbeatMsg.getHeartBeatMsg}, will add handshake."
)
if (0L >= firstReportAppIdTimestampMills) {
firstReportAppIdTimestampMills = System.currentTimeMillis()
}
if (!StatusOperator.isHandshaked) {
StatusOperator.addHandshake()
} else {
logger.info("submit to yarn, report heartbeat to LinkisManager, and add handshake succeed, now exit this detach ec.")
trySucceed()
}
}
},
1000,
FlinkEnvConfiguration.FLINK_ONCE_JAR_APP_REPORT_APPLICATIONID_INTERVAL.getValue.toLong,
TimeUnit.MILLISECONDS
)
}

}

0 comments on commit c9cb5c6

Please sign in to comment.