Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/dev-1.1.16-webank' into dev-1.1.…
Browse files Browse the repository at this point in the history
…16-webank
  • Loading branch information
wushengyeyouya committed Nov 1, 2023
2 parents 6bfbc8a + 436b4ac commit 85d8eb0
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
import org.apache.flink.yarn.YarnClusterClientFactory;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
Expand All @@ -33,6 +34,10 @@

import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Optional;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -59,6 +64,16 @@ public class LinkisYarnClusterClientFactory extends YarnClusterClientFactory imp
private void initYarnClient(Configuration configuration) {
checkNotNull(configuration);
String configurationDirectory = configuration.get(DeploymentOptionsInternal.CONF_DIR);
List<String> paths = configuration.get(YarnConfigOptions.SHIP_FILES);
Optional<String> firstLog4jPath =
paths.stream().filter(path -> path.contains("log4j.properties")).findFirst();
if (firstLog4jPath.isPresent()) {
Path parentAbsolutePath = Paths.get(firstLog4jPath.get()).toAbsolutePath().getParent();
configurationDirectory = parentAbsolutePath.toString();
LOG.info("log4j.properties路径:" + configurationDirectory);
} else {
LOG.info("未找到匹配的路径使用系统默认路径:" + configurationDirectory);
}
YarnLogConfigUtil.setLogConfigFileInConfig(configuration, configurationDirectory);
String yarnConfDir = configuration.getString(YARN_CONFIG_DIR);
this.configuration = configuration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ object FlinkEnvConfiguration {
val FLINK_SQL_DEV_RESULT_MAX_WAIT_TIME =
CommonVars("flink.dev.sql.result.wait.time.max", new TimeType("1m"))

val LINKIS_FLINK_LOG4J_CHECK_ENABLE = CommonVars("linkis.flink.log4j.check.enable", true)
val LINKIS_FLINK_LOG4J_CHECK_KEYWORDS = CommonVars("linkis.flink.log4j.check.keywords", "")
val FLINK_APPLICATION_ARGS = CommonVars("flink.app.args", "")
val FLINK_APPLICATION_MAIN_CLASS = CommonVars("flink.app.main.class", "")
val FLINK_APPLICATION_MAIN_CLASS_JAR = CommonVars("flink.app.main.class.jar", "")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

package org.apache.linkis.engineconnplugin.flink.executor

import java.util.concurrent.{Future, TimeUnit}

import org.apache.commons.lang3.StringUtils
import org.apache.linkis.common.exception.ErrorException
import org.apache.linkis.common.utils.Utils
import org.apache.linkis.engineconn.acessible.executor.service.ExecutorHeartbeatServiceHolder
import org.apache.linkis.engineconn.executor.service.ManagerService
Expand All @@ -32,6 +30,17 @@ import org.apache.linkis.engineconnplugin.flink.operator.StatusOperator
import org.apache.linkis.engineconnplugin.flink.util.YarnUtil
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus

import org.apache.commons.lang3.StringUtils
import org.apache.flink.configuration.ConfigOption
import org.apache.flink.yarn.configuration.{YarnConfigOptions, YarnConfigOptionsInternal}

import java.io.{File, FileNotFoundException}
import java.nio.file.{Files, Paths}
import java.util
import java.util.Properties
import java.util.concurrent.{Future, TimeUnit}

import scala.collection.JavaConverters.dictionaryAsScalaMapConverter
import scala.concurrent.duration.Duration

class FlinkJarOnceExecutor(
Expand All @@ -52,9 +61,50 @@ class FlinkJarOnceExecutor(
if (StringUtils.isNotEmpty(args)) args.split(" ") else Array.empty[String]
val mainClass = FLINK_APPLICATION_MAIN_CLASS.getValue(options)
logger.info(s"Ready to submit flink application, mainClass: $mainClass, args: $args.")
if (LINKIS_FLINK_LOG4J_CHECK_ENABLE.getHotValue()) {
val yarnShipLog4jPath = getLog4jPath(YarnConfigOptions.SHIP_FILES)
var firstLog4jPath: Option[String] = yarnShipLog4jPath
if (null == firstLog4jPath && firstLog4jPath.isEmpty) {
val internalYarnLog4jPath = getLog4jPath(
YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE
)
firstLog4jPath = internalYarnLog4jPath
}
val configMap = new Properties()
firstLog4jPath match {
case Some(log4jPath) =>
try {
configMap.load(Files.newBufferedReader(Paths.get(log4jPath)))
} catch {
case e: Exception =>
logger.error("读取或解析文件时出现错误: " + e.getMessage)
}
val ecOptions = onceExecutorExecutionContext.getEngineCreationContext.getOptions
LINKIS_FLINK_LOG4J_CHECK_KEYWORDS.getValue(ecOptions).split(",").foreach {
appenderConfig =>
if (null != appenderConfig && appenderConfig.nonEmpty) {
if (!configMap.values().contains(appenderConfig)) {
throw new ErrorException(30000, s"log4j.properties 不符合规范,请检测内容")
}
} else {
logger.info("log4j.properties does not need check")
}
}
case None =>
}
}
clusterDescriptor.deployCluster(programArguments, mainClass)
}

def getLog4jPath(configOption: ConfigOption[_]): Option[String] = {
val internalYarnLogConfigFile =
flinkEngineConnContext.getEnvironmentContext.getFlinkConfig.getValue(configOption)
logger.info(internalYarnLogConfigFile)
val paths = internalYarnLogConfigFile.stripPrefix("[").stripSuffix("]").split(",").toList
val option = paths.find(path => path.contains("log4j.properties"))
option
}

override protected def waitToRunning(): Unit = {
if (YarnUtil.isDetach(flinkEngineConnContext.getEnvironmentContext.getExtraParams())) {
tryToHeartbeat()
Expand Down Expand Up @@ -108,7 +158,9 @@ class FlinkJarOnceExecutor(
if (!StatusOperator.isHandshaked) {
StatusOperator.addHandshake()
} else {
logger.info("submit to yarn, report heartbeat to LinkisManager, and add handshake succeed, now exit this detach ec.")
logger.info(
"submit to yarn, report heartbeat to LinkisManager, and add handshake succeed, now exit this detach ec."
)
trySucceed()
}
}
Expand Down

0 comments on commit 85d8eb0

Please sign in to comment.