Skip to content

Commit

Permalink
fix log4j bug
Browse files Browse the repository at this point in the history
  • Loading branch information
yangwenzea committed Oct 13, 2023
1 parent 5d63cf5 commit e4780b5
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ object FlinkEnvConfiguration {
FLINK_HOME.getValue + s"/lib/flink-dist_2.11-${FLINK_VERSION.getValue}.jar"
)

val FLINK_CONF_YAML = CommonVars("flink.conf.yaml.dir", "flink-conf.yaml")

val FLINK_PROVIDED_LIB_PATH = CommonVars("flink.lib.path", "")

val FLINK_PROVIDED_USER_LIB_PATH =
Expand All @@ -61,6 +63,7 @@ object FlinkEnvConfiguration {
"The local lib path of each user in Flink EngineConn."
)

val FLINK_YAML_MERGE_ENABLE = CommonVars("flink.yaml.merge.enable", true)
val FLINK_SHIP_DIRECTORIES = CommonVars("flink.yarn.ship-directories", "")
val FLINK_SHIP_REMOTE_DIRECTORIES = CommonVars("flink.yarn.remote.ship-directories", "")

Expand Down Expand Up @@ -88,6 +91,8 @@ object FlinkEnvConfiguration {
val FLINK_SQL_DEV_RESULT_MAX_WAIT_TIME =
CommonVars("flink.dev.sql.result.wait.time.max", new TimeType("1m"))

val LINKIS_FLINK_LOG4J_CHECK_ENABLE = CommonVars("linkis.flink.log4j.check.enable", true)
val LINKIS_FLINK_LOG4J_CHECK_KEYWORDS = CommonVars("linkis.flink.log4j.check.keywords", "")
val FLINK_APPLICATION_ARGS = CommonVars("flink.app.args", "")
val FLINK_APPLICATION_MAIN_CLASS = CommonVars("flink.app.main.class", "")
val FLINK_APPLICATION_MAIN_CLASS_JAR = CommonVars("flink.app.main.class.jar", "")
Expand Down Expand Up @@ -137,4 +142,7 @@ object FlinkEnvConfiguration {
val FLINK_HANDSHAKE_WAIT_TIME_MILLS =
CommonVars("linkis.flink.handshake.wait.time.mills", 60 * 1000)

val FLINK_ENV_JAVA_OPTS =
CommonVars("flink.env.java.opts", "env.java.opts")

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.linkis.engineconnplugin.flink.util.YarnUtil
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus

import org.apache.commons.lang3.StringUtils
import org.apache.flink.configuration.ConfigOption
import org.apache.flink.yarn.configuration.{YarnConfigOptions, YarnConfigOptionsInternal}

import java.io.{File, FileNotFoundException}
Expand Down Expand Up @@ -60,40 +61,50 @@ class FlinkJarOnceExecutor(
if (StringUtils.isNotEmpty(args)) args.split(" ") else Array.empty[String]
val mainClass = FLINK_APPLICATION_MAIN_CLASS.getValue(options)
logger.info(s"Ready to submit flink application, mainClass: $mainClass, args: $args.")
val internalYarnLogConfigFile = flinkEngineConnContext.getEnvironmentContext.getFlinkConfig.getValue(YarnConfigOptions.SHIP_FILES)
logger.info(internalYarnLogConfigFile)
val paths = internalYarnLogConfigFile.stripPrefix("[").stripSuffix("]").split(",").toList
val firstLog4jPath: Option[String] = paths.find(path => path.contains("log4j.properties"))
val configMap = new Properties()
firstLog4jPath match {
case Some(log4jPath) =>
if (new File(log4jPath).exists()) {
if (LINKIS_FLINK_LOG4J_CHECK_ENABLE.getHotValue()) {
val yarnShipLog4jPath = getLog4jPath(YarnConfigOptions.SHIP_FILES)
var firstLog4jPath: Option[String] = yarnShipLog4jPath
if (null == firstLog4jPath && firstLog4jPath.isEmpty) {
val internalYarnLog4jPath = getLog4jPath(
YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE
)
firstLog4jPath = internalYarnLog4jPath
}
val configMap = new Properties()
firstLog4jPath match {
case Some(log4jPath) =>
try {
configMap.load(Files.newBufferedReader(Paths.get(log4jPath)))
} catch {
case e: Exception =>
logger.error("读取或解析文件时出现错误: " + e.getMessage)
}
} else {
logger.info(log4jPath)
throw new FileNotFoundException("log4j.properties file not found in both file system")
}
var appenderName = ""
var appenderType = ""
if (configMap.containsKey("appender.stream.name")) {
appenderName = configMap.get("appender.stream.name").toString
}
if (configMap.containsKey("appender.eventmeshAppender.type")) {
appenderType = configMap.get("appender.eventmeshAppender.type").toString
}
if (!appenderName.equals("StreamRpcLog") && !appenderType.equals("EventMeshLog4j2Appender")) {
throw new ErrorException(30000, s"log4j.properties 不符合规范,请检测内容")
}
case None =>
val ecOptions = onceExecutorExecutionContext.getEngineCreationContext.getOptions
LINKIS_FLINK_LOG4J_CHECK_KEYWORDS.getValue(ecOptions).split(",").foreach {
appenderConfig =>
if (null != appenderConfig && appenderConfig.nonEmpty) {
if (!configMap.values().contains(appenderConfig)) {
throw new ErrorException(30000, s"log4j.properties 不符合规范,请检测内容")
}
} else {
logger.info("log4j.properties does not need check")
}
}
case None =>
}
}
clusterDescriptor.deployCluster(programArguments, mainClass)
}

def getLog4jPath(configOption: ConfigOption[_]): Option[String] = {
val internalYarnLogConfigFile =
flinkEngineConnContext.getEnvironmentContext.getFlinkConfig.getValue(configOption)
logger.info(internalYarnLogConfigFile)
val paths = internalYarnLogConfigFile.stripPrefix("[").stripSuffix("]").split(",").toList
val option = paths.find(path => path.contains("log4j.properties"))
option
}

override protected def waitToRunning(): Unit = {
Utils.waitUntil(() => clusterDescriptor.initJobId(), Duration.Inf)
setJobID(clusterDescriptor.getJobId.toHexString)
Expand Down

0 comments on commit e4780b5

Please sign in to comment.