Skip to content

Commit

Permalink
resolve The problem when application in yarn is always acceptable, th…
Browse files Browse the repository at this point in the history
…e flink ec will be blocked in detach mode.
  • Loading branch information
wushengyeyouya committed Oct 26, 2023
1 parent a68a12b commit 05ed141
Showing 1 changed file with 37 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.linkis.engineconnplugin.flink.executor

import java.util.concurrent.{Future, TimeUnit}

import org.apache.commons.lang3.StringUtils
import org.apache.linkis.common.utils.Utils
import org.apache.linkis.engineconn.acessible.executor.service.ExecutorHeartbeatServiceHolder
import org.apache.linkis.engineconn.executor.service.ManagerService
Expand All @@ -27,15 +30,8 @@ import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration._
import org.apache.linkis.engineconnplugin.flink.context.FlinkEngineConnContext
import org.apache.linkis.engineconnplugin.flink.operator.StatusOperator
import org.apache.linkis.engineconnplugin.flink.util.YarnUtil
import org.apache.linkis.governance.common.conf.GovernanceCommonConf
import org.apache.linkis.governance.common.constant.ec.ECConstants
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus

import org.apache.commons.lang3.StringUtils

import java.util
import java.util.concurrent.{Future, TimeUnit}

import scala.concurrent.duration.Duration

class FlinkJarOnceExecutor(
Expand All @@ -60,11 +56,12 @@ class FlinkJarOnceExecutor(
}

override protected def waitToRunning(): Unit = {
Utils.waitUntil(() => clusterDescriptor.initJobId(), Duration.Inf)
setJobID(clusterDescriptor.getJobId.toHexString)
super.waitToRunning()
if (YarnUtil.isDetach(flinkEngineConnContext.getEnvironmentContext.getExtraParams())) {
waitToExit()
tryToHeartbeat()
} else {
Utils.waitUntil(() => clusterDescriptor.initJobId(), Duration.Inf)
setJobID(clusterDescriptor.getJobId.toHexString)
super.waitToRunning()
}
}

Expand All @@ -89,47 +86,37 @@ class FlinkJarOnceExecutor(
}
}

private def waitToExit(): Unit = {
private def tryToHeartbeat(): Unit = {
// upload applicationId to manager and then exit
val thisExecutor = this
if (!isCompleted) {
daemonThread = Utils.defaultScheduler.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = {
if (!isCompleted) {
Utils.waitUntil(() => StringUtils.isNotBlank(getApplicationId), Duration.apply("10s"))
if (StringUtils.isNotBlank(getApplicationId)) {
Utils.tryAndWarn {
val heartbeatService = ExecutorHeartbeatServiceHolder.getDefaultHeartbeatService()
if (null == heartbeatService) {
logger.error("HeartbeatService not inited.")
return null
}
val heartbeatMsg = heartbeatService.generateHeartBeatMsg(thisExecutor)
ManagerService.getManagerService.heartbeatReport(heartbeatMsg)
logger.info(
s"Succeed to report heatbeatMsg : ${heartbeatMsg.getHeartBeatMsg}, will add handshake."
)
if (0L >= firstReportAppIdTimestampMills) {
firstReportAppIdTimestampMills = System.currentTimeMillis()
}
if (!StatusOperator.isHandshaked) {
StatusOperator.addHandshake()
} else {
logger.info("Will exit with handshaked.")
trySucceed()
}
}
}
}
logger.info(s"try to send heartbeat to LinkisManager with applicationId: $getApplicationId.")
daemonThread = Utils.defaultScheduler.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = Utils.tryAndWarn {
val heartbeatService = ExecutorHeartbeatServiceHolder.getDefaultHeartbeatService()
if (null == heartbeatService) {
logger.warn("HeartbeatService is not inited.")
return
}
},
1000,
FlinkEnvConfiguration.FLINK_ONCE_JAR_APP_REPORT_APPLICATIONID_INTERVAL.getValue.toLong,
TimeUnit.MILLISECONDS
)
logger.info("waitToExit submited.")
}
val heartbeatMsg = heartbeatService.generateHeartBeatMsg(FlinkJarOnceExecutor.this)
ManagerService.getManagerService.heartbeatReport(heartbeatMsg)
logger.info(
s"Succeed to report heartbeatMsg: ${heartbeatMsg.getHeartBeatMsg}, will add handshake."
)
if (0L >= firstReportAppIdTimestampMills) {
firstReportAppIdTimestampMills = System.currentTimeMillis()
}
if (!StatusOperator.isHandshaked) {
StatusOperator.addHandshake()
} else {
logger.info("submit to yarn, report heartbeat to LinkisManager, and add handshake succeed, now exit this detach ec.")
trySucceed()
}
}
},
1000,
FlinkEnvConfiguration.FLINK_ONCE_JAR_APP_REPORT_APPLICATIONID_INTERVAL.getValue.toLong,
TimeUnit.MILLISECONDS
)
}

}

0 comments on commit 05ed141

Please sign in to comment.