diff --git a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/factory/LinkisYarnClusterClientFactory.java b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/factory/LinkisYarnClusterClientFactory.java index 8a70c2b9a3..384bab8cc9 100644 --- a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/factory/LinkisYarnClusterClientFactory.java +++ b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/factory/LinkisYarnClusterClientFactory.java @@ -17,8 +17,6 @@ package org.apache.linkis.engineconnplugin.flink.client.factory; -import org.apache.flink.configuration.ConfigOptions; -import org.apache.flink.yarn.configuration.YarnConfigOptions; import org.apache.linkis.engineconnplugin.flink.client.utils.YarnConfLoader; import org.apache.flink.configuration.ConfigOption; @@ -27,6 +25,7 @@ import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever; import org.apache.flink.yarn.YarnClusterClientFactory; import org.apache.flink.yarn.YarnClusterDescriptor; +import org.apache.flink.yarn.configuration.YarnConfigOptions; import org.apache.flink.yarn.configuration.YarnLogConfigUtil; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.client.api.YarnClient; @@ -35,6 +34,8 @@ import java.io.Closeable; import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.List; import java.util.Optional; @@ -53,12 +54,6 @@ public class LinkisYarnClusterClientFactory extends YarnClusterClientFactory imp .withDescription( "**DO NOT USE** The location of the log config file, e.g. the path to your log4j.properties for log4j."); - public static final ConfigOption YARN_SHIP_FILES = - ConfigOptions.key("yarn.ship-files") - .stringType() - .noDefaultValue() - .withDescription("**DO NOT USE** The location of the log config file, e.g. the path to your log4j.properties for log4j."); - private YarnConfiguration yarnConfiguration; private YarnClient yarnClient; @@ -70,14 +65,14 @@ private void initYarnClient(Configuration configuration) { checkNotNull(configuration); String configurationDirectory = configuration.get(DeploymentOptionsInternal.CONF_DIR); List paths = configuration.get(YarnConfigOptions.SHIP_FILES); - Optional firstLog4jPath = paths.stream() - .filter(path -> path.contains("log4j.properties")) - .findFirst(); + Optional firstLog4jPath = + paths.stream().filter(path -> path.contains("log4j.properties")).findFirst(); if (firstLog4jPath.isPresent()) { - configurationDirectory = firstLog4jPath.get(); + Path parentAbsolutePath = Paths.get(firstLog4jPath.get()).toAbsolutePath().getParent(); + configurationDirectory = parentAbsolutePath.toString(); LOG.info("log4j.properties路径:" + configurationDirectory); } else { - LOG.info("未找到匹配的路径使用系统默认路径:" +configurationDirectory); + LOG.info("未找到匹配的路径使用系统默认路径:" + configurationDirectory); } YarnLogConfigUtil.setLogConfigFileInConfig(configuration, configurationDirectory); String yarnConfDir = configuration.getString(YARN_CONFIG_DIR); diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkJarOnceExecutor.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkJarOnceExecutor.scala index 36c45f7b9b..6c808dbea2 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkJarOnceExecutor.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkJarOnceExecutor.scala @@ -17,6 +17,7 @@ package org.apache.linkis.engineconnplugin.flink.executor +import org.apache.linkis.common.exception.ErrorException import org.apache.linkis.common.utils.Utils import org.apache.linkis.engineconn.acessible.executor.service.ExecutorHeartbeatServiceHolder import org.apache.linkis.engineconn.executor.service.ManagerService @@ -27,19 +28,19 @@ import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration._ import org.apache.linkis.engineconnplugin.flink.context.FlinkEngineConnContext import org.apache.linkis.engineconnplugin.flink.operator.StatusOperator import org.apache.linkis.engineconnplugin.flink.util.YarnUtil -import org.apache.linkis.governance.common.conf.GovernanceCommonConf -import org.apache.linkis.governance.common.constant.ec.ECConstants import org.apache.linkis.manager.common.entity.enumeration.NodeStatus + import org.apache.commons.lang3.StringUtils import org.apache.flink.yarn.configuration.{YarnConfigOptions, YarnConfigOptionsInternal} -import org.apache.linkis.common.exception.ErrorException -import org.yaml.snakeyaml.Yaml import java.io.{File, FileNotFoundException} +import java.nio.file.{Files, Paths} import java.util +import java.util.Properties import java.util.concurrent.{Future, TimeUnit} + +import scala.collection.JavaConverters.dictionaryAsScalaMapConverter import scala.concurrent.duration.Duration -import scala.io.Source class FlinkJarOnceExecutor( override val id: Long, @@ -59,17 +60,17 @@ class FlinkJarOnceExecutor( if (StringUtils.isNotEmpty(args)) args.split(" ") else Array.empty[String] val mainClass = FLINK_APPLICATION_MAIN_CLASS.getValue(options) logger.info(s"Ready to submit flink application, mainClass: $mainClass, args: $args.") - val internalYarnLogConfigFile = flinkEngineConnContext.getEnvironmentContext.getFlinkConfig.getValue(YarnConfigOptions.SHIP_FILES) - val paths: Array[String] = internalYarnLogConfigFile.split(";") + val internalYarnLogConfigFile = flinkEngineConnContext.getEnvironmentContext.getFlinkConfig + .getValue(YarnConfigOptions.SHIP_FILES) + logger.info(internalYarnLogConfigFile) + val paths = internalYarnLogConfigFile.stripPrefix("[").stripSuffix("]").split(",").toList val firstLog4jPath: Option[String] = paths.find(path => path.contains("log4j.properties")) firstLog4jPath match { case Some(log4jPath) => if (new File(log4jPath).exists()) { - val source = Source.fromFile(internalYarnLogConfigFile) try { - val yamlContent = source.mkString - val yaml = new Yaml() - val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]]) + val configMap = new Properties() + configMap.load(Files.newBufferedReader(Paths.get(log4jPath))) var appenderName = "" var appenderType = "" if (configMap.containsKey("appender.stream.name")) { @@ -78,16 +79,21 @@ class FlinkJarOnceExecutor( if (configMap.containsKey("appender.eventmeshAppender.type")) { appenderType = configMap.get("appender.eventmeshAppender.type").toString } - if (appenderName.equals("StreamRpcLog") && appenderType.equals("EventMeshLog4j2Appender")) { + if ( + appenderName + .equals("StreamRpcLog") && appenderType.equals("EventMeshLog4j2Appender") + ) { clusterDescriptor.deployCluster(programArguments, mainClass) } else { throw new ErrorException(30000, s"log4j.properties 不符合规范,请检测内容") } - } finally { - source.close() + } catch { + case e: Exception => + logger.error("读取或解析文件时出现错误: " + e.getMessage) } } else { - throw new FileNotFoundException("log4j.properties file not found in both file system .") + logger.info(log4jPath) + throw new FileNotFoundException("log4j.properties file not found in both file system") } case None => throw new FileNotFoundException("log4j.properties path not found .")