Skip to content

Commit

Permalink
fix log4j bug
Browse files Browse the repository at this point in the history
  • Loading branch information
yangwenzea committed Oct 11, 2023
1 parent f5c21bb commit ed26d26
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.linkis.engineconnplugin.flink.client.factory;

import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.linkis.engineconnplugin.flink.client.utils.YarnConfLoader;

import org.apache.flink.configuration.ConfigOption;
Expand All @@ -33,8 +35,9 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Arrays;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -51,10 +54,11 @@ public class LinkisYarnClusterClientFactory extends YarnClusterClientFactory imp
"**DO NOT USE** The location of the log config file, e.g. the path to your log4j.properties for log4j.");

public static final ConfigOption<String> YARN_SHIP_FILES =
key("yarn.ship-files")
.stringType()
.noDefaultValue()
.withDescription("**DO NOT USE** The location of the log config file, e.g. the path to your log4j.properties for log4j.");
ConfigOptions.key("yarn.ship-files")
.stringType()
.noDefaultValue()
.withDescription("**DO NOT USE** The location of the log config file, e.g. the path to your log4j.properties for log4j.");

private YarnConfiguration yarnConfiguration;
private YarnClient yarnClient;

Expand All @@ -65,8 +69,8 @@ public class LinkisYarnClusterClientFactory extends YarnClusterClientFactory imp
private void initYarnClient(Configuration configuration) {
checkNotNull(configuration);
String configurationDirectory = configuration.get(DeploymentOptionsInternal.CONF_DIR);
String [] paths = configuration.get(YARN_SHIP_FILES).split(";");
Optional<String> firstLog4jPath = Arrays.stream(paths)
List<String> paths = configuration.get(YarnConfigOptions.SHIP_FILES);
Optional<String> firstLog4jPath = paths.stream()
.filter(path -> path.contains("log4j.properties"))
.findFirst();
if (firstLog4jPath.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.linkis.governance.common.conf.GovernanceCommonConf
import org.apache.linkis.governance.common.constant.ec.ECConstants
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
import org.apache.commons.lang3.StringUtils
import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal
import org.apache.flink.yarn.configuration.{YarnConfigOptions, YarnConfigOptionsInternal}
import org.apache.linkis.common.exception.ErrorException
import org.yaml.snakeyaml.Yaml

Expand Down Expand Up @@ -59,31 +59,38 @@ class FlinkJarOnceExecutor(
if (StringUtils.isNotEmpty(args)) args.split(" ") else Array.empty[String]
val mainClass = FLINK_APPLICATION_MAIN_CLASS.getValue(options)
logger.info(s"Ready to submit flink application, mainClass: $mainClass, args: $args.")
val internalYarnLogConfigFile = flinkEngineConnContext.getEnvironmentContext.getFlinkConfig.getValue(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE)
if (new File(internalYarnLogConfigFile).exists()) {
val source = Source.fromFile(internalYarnLogConfigFile)
try {
val yamlContent = source.mkString
val yaml = new Yaml()
val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]])
var appenderName = ""
var appenderType = ""
if (configMap.containsKey("appender.stream.name")) {
appenderName = configMap.get("appender.stream.name").toString
}
if (configMap.containsKey("appender.eventmeshAppender.type")) {
appenderType = configMap.get("appender.eventmeshAppender.type").toString
}
if (appenderName.equals("StreamRpcLog") && appenderType.equals("EventMeshLog4j2Appender")) {
clusterDescriptor.deployCluster(programArguments, mainClass)
val internalYarnLogConfigFile = flinkEngineConnContext.getEnvironmentContext.getFlinkConfig.getValue(YarnConfigOptions.SHIP_FILES)
val paths: Array[String] = internalYarnLogConfigFile.split(";")
val firstLog4jPath: Option[String] = paths.find(path => path.contains("log4j.properties"))
firstLog4jPath match {
case Some(log4jPath) =>
if (new File(log4jPath).exists()) {
val source = Source.fromFile(internalYarnLogConfigFile)
try {
val yamlContent = source.mkString
val yaml = new Yaml()
val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]])
var appenderName = ""
var appenderType = ""
if (configMap.containsKey("appender.stream.name")) {
appenderName = configMap.get("appender.stream.name").toString
}
if (configMap.containsKey("appender.eventmeshAppender.type")) {
appenderType = configMap.get("appender.eventmeshAppender.type").toString
}
if (appenderName.equals("StreamRpcLog") && appenderType.equals("EventMeshLog4j2Appender")) {
clusterDescriptor.deployCluster(programArguments, mainClass)
} else {
throw new ErrorException(30000, s"log4j.properties 不符合规范,请检测内容")
}
} finally {
source.close()
}
} else {
throw new ErrorException(30000, s"log4j.properties 不符合规范,请检测内容")
throw new FileNotFoundException("log4j.properties file not found in both file system .")
}
} finally {
source.close()
}
} else {
throw new FileNotFoundException("log4j.properties file not found in both file system .")
case None =>
throw new FileNotFoundException("log4j.properties path not found .")
}
}

Expand Down

0 comments on commit ed26d26

Please sign in to comment.