Skip to content

Commit

Permalink
fix log4j bug
Browse files Browse the repository at this point in the history
  • Loading branch information
yangwenzea committed Oct 11, 2023
1 parent ed26d26 commit 61adf5d
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.linkis.engineconnplugin.flink.client.factory;

import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.linkis.engineconnplugin.flink.client.utils.YarnConfLoader;

import org.apache.flink.configuration.ConfigOption;
Expand All @@ -27,6 +25,7 @@
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
import org.apache.flink.yarn.YarnClusterClientFactory;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
Expand All @@ -35,6 +34,8 @@

import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Optional;

Expand All @@ -53,12 +54,6 @@ public class LinkisYarnClusterClientFactory extends YarnClusterClientFactory imp
.withDescription(
"**DO NOT USE** The location of the log config file, e.g. the path to your log4j.properties for log4j.");

public static final ConfigOption<String> YARN_SHIP_FILES =
ConfigOptions.key("yarn.ship-files")
.stringType()
.noDefaultValue()
.withDescription("**DO NOT USE** The location of the log config file, e.g. the path to your log4j.properties for log4j.");

private YarnConfiguration yarnConfiguration;
private YarnClient yarnClient;

Expand All @@ -70,14 +65,14 @@ private void initYarnClient(Configuration configuration) {
checkNotNull(configuration);
String configurationDirectory = configuration.get(DeploymentOptionsInternal.CONF_DIR);
List<String> paths = configuration.get(YarnConfigOptions.SHIP_FILES);
Optional<String> firstLog4jPath = paths.stream()
.filter(path -> path.contains("log4j.properties"))
.findFirst();
Optional<String> firstLog4jPath =
paths.stream().filter(path -> path.contains("log4j.properties")).findFirst();
if (firstLog4jPath.isPresent()) {
configurationDirectory = firstLog4jPath.get();
Path parentAbsolutePath = Paths.get(firstLog4jPath.get()).toAbsolutePath().getParent();
configurationDirectory = parentAbsolutePath.toString();
LOG.info("log4j.properties路径:" + configurationDirectory);
} else {
LOG.info("未找到匹配的路径使用系统默认路径:" +configurationDirectory);
LOG.info("未找到匹配的路径使用系统默认路径:" + configurationDirectory);
}
YarnLogConfigUtil.setLogConfigFileInConfig(configuration, configurationDirectory);
String yarnConfDir = configuration.getString(YARN_CONFIG_DIR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.linkis.engineconnplugin.flink.executor

import org.apache.linkis.common.exception.ErrorException
import org.apache.linkis.common.utils.Utils
import org.apache.linkis.engineconn.acessible.executor.service.ExecutorHeartbeatServiceHolder
import org.apache.linkis.engineconn.executor.service.ManagerService
Expand All @@ -27,19 +28,19 @@ import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration._
import org.apache.linkis.engineconnplugin.flink.context.FlinkEngineConnContext
import org.apache.linkis.engineconnplugin.flink.operator.StatusOperator
import org.apache.linkis.engineconnplugin.flink.util.YarnUtil
import org.apache.linkis.governance.common.conf.GovernanceCommonConf
import org.apache.linkis.governance.common.constant.ec.ECConstants
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus

import org.apache.commons.lang3.StringUtils
import org.apache.flink.yarn.configuration.{YarnConfigOptions, YarnConfigOptionsInternal}
import org.apache.linkis.common.exception.ErrorException
import org.yaml.snakeyaml.Yaml

import java.io.{File, FileNotFoundException}
import java.nio.file.{Files, Paths}
import java.util
import java.util.Properties
import java.util.concurrent.{Future, TimeUnit}

import scala.collection.JavaConverters.dictionaryAsScalaMapConverter
import scala.concurrent.duration.Duration
import scala.io.Source

class FlinkJarOnceExecutor(
override val id: Long,
Expand All @@ -59,17 +60,17 @@ class FlinkJarOnceExecutor(
if (StringUtils.isNotEmpty(args)) args.split(" ") else Array.empty[String]
val mainClass = FLINK_APPLICATION_MAIN_CLASS.getValue(options)
logger.info(s"Ready to submit flink application, mainClass: $mainClass, args: $args.")
val internalYarnLogConfigFile = flinkEngineConnContext.getEnvironmentContext.getFlinkConfig.getValue(YarnConfigOptions.SHIP_FILES)
val paths: Array[String] = internalYarnLogConfigFile.split(";")
val internalYarnLogConfigFile = flinkEngineConnContext.getEnvironmentContext.getFlinkConfig
.getValue(YarnConfigOptions.SHIP_FILES)
logger.info(internalYarnLogConfigFile)
val paths = internalYarnLogConfigFile.stripPrefix("[").stripSuffix("]").split(",").toList
val firstLog4jPath: Option[String] = paths.find(path => path.contains("log4j.properties"))
firstLog4jPath match {
case Some(log4jPath) =>
if (new File(log4jPath).exists()) {
val source = Source.fromFile(internalYarnLogConfigFile)
try {
val yamlContent = source.mkString
val yaml = new Yaml()
val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]])
val configMap = new Properties()
configMap.load(Files.newBufferedReader(Paths.get(log4jPath)))
var appenderName = ""
var appenderType = ""
if (configMap.containsKey("appender.stream.name")) {
Expand All @@ -78,16 +79,21 @@ class FlinkJarOnceExecutor(
if (configMap.containsKey("appender.eventmeshAppender.type")) {
appenderType = configMap.get("appender.eventmeshAppender.type").toString
}
if (appenderName.equals("StreamRpcLog") && appenderType.equals("EventMeshLog4j2Appender")) {
if (
appenderName
.equals("StreamRpcLog") && appenderType.equals("EventMeshLog4j2Appender")
) {
clusterDescriptor.deployCluster(programArguments, mainClass)
} else {
throw new ErrorException(30000, s"log4j.properties 不符合规范,请检测内容")
}
} finally {
source.close()
} catch {
case e: Exception =>
logger.error("读取或解析文件时出现错误: " + e.getMessage)
}
} else {
throw new FileNotFoundException("log4j.properties file not found in both file system .")
logger.info(log4jPath)
throw new FileNotFoundException("log4j.properties file not found in both file system")
}
case None =>
throw new FileNotFoundException("log4j.properties path not found .")
Expand Down

0 comments on commit 61adf5d

Please sign in to comment.