From 426fbceb49d44a102e552165fd17d4807e5242d9 Mon Sep 17 00:00:00 2001 From: v_nikeyang <1013195908@qq.com> Date: Fri, 18 Aug 2023 17:21:57 +0800 Subject: [PATCH 01/21] flink load default configuration --- .../flink/config/FlinkEnvConfiguration.scala | 6 ++++++ .../flink/launch/FlinkEngineConnLaunchBuilder.scala | 7 +++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala index 857faeed8d..d01db5a414 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala @@ -137,4 +137,10 @@ object FlinkEnvConfiguration { val FLINK_HANDSHAKE_WAIT_TIME_MILLS = CommonVars("linkis.flink.handshake.wait.time.mills", 60 * 1000) + val FLINK_ENGINE_CONN_DEFAULT_JAVA_OPTS = CommonVars[String]( + "wds.linkis.engineConn.javaOpts.default", + s"-Xloggc:%s -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=20 -XX:GCLogFileSize=512M -XX:+PrintPromotionFailure -XX:+PrintGCCause" + ) + + } diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala index 70b3ad1b20..9b51ebfa1c 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala @@ -130,8 +130,11 @@ class FlinkEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder { Array(FLINK_HOME_ENV, FLINK_CONF_DIR_ENV) ++: super.getNecessaryEnvironment override protected def getExtractJavaOpts: String = { - if (!HadoopConf.KEYTAB_PROXYUSER_ENABLED.getValue) super.getExtractJavaOpts - else super.getExtractJavaOpts + s" -DHADOOP_PROXY_USER=${variable(USER)}".trim + val javaOpts = super.getExtractJavaOpts + val defaultJavaOpts = FLINK_ENGINE_CONN_DEFAULT_JAVA_OPTS.getValue + val mergedString = (defaultJavaOpts.format(getGcLogDir(engineConnBuildRequest)).split("\\s+") ++ javaOpts.split("\\s+")).distinct.mkString(" ") + if (!HadoopConf.KEYTAB_PROXYUSER_ENABLED.getValue) mergedString + else mergedString + s" -DHADOOP_PROXY_USER=${variable(USER)}".trim } override protected def ifAddHiveConfigPath: Boolean = true From fb017b9c332cf22ab7b697bc622e1e885b076774 Mon Sep 17 00:00:00 2001 From: v_nikeyang <1013195908@qq.com> Date: Tue, 22 Aug 2023 14:51:26 +0800 Subject: [PATCH 02/21] flink load default configuration --- .../flink/config/FlinkEnvConfiguration.scala | 2 +- .../launch/FlinkEngineConnLaunchBuilder.scala | 24 ++++++------------- 2 files changed, 8 insertions(+), 18 deletions(-) diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala index d01db5a414..c979ed0be4 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala @@ -139,7 +139,7 @@ object FlinkEnvConfiguration { val FLINK_ENGINE_CONN_DEFAULT_JAVA_OPTS = CommonVars[String]( "wds.linkis.engineConn.javaOpts.default", - s"-Xloggc:%s -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=20 -XX:GCLogFileSize=512M -XX:+PrintPromotionFailure -XX:+PrintGCCause" + s"-Xloggc:\\/gc -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=20 -XX:GCLogFileSize=512M -XX:+PrintPromotionFailure -XX:+PrintGCCause" ) diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala index 9b51ebfa1c..aa4979cf0f 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala @@ -24,24 +24,14 @@ import org.apache.linkis.hadoop.common.conf.HadoopConf import org.apache.linkis.manager.common.protocol.bml.BmlResource import org.apache.linkis.manager.engineplugin.common.conf.EnvConfiguration import org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnBuildRequest -import org.apache.linkis.manager.engineplugin.common.launch.process.{ - Environment, - JavaProcessEngineConnLaunchBuilder -} -import org.apache.linkis.manager.engineplugin.common.launch.process.Environment.{ - variable, - PWD, - USER -} -import org.apache.linkis.manager.engineplugin.common.launch.process.LaunchConstants.{ - addPathToClassPath, - CLASS_PATH_SEPARATOR -} +import org.apache.linkis.manager.engineplugin.common.launch.process.{Environment, JavaProcessEngineConnLaunchBuilder} +import org.apache.linkis.manager.engineplugin.common.launch.process.Environment.{PWD, USER, variable} +import org.apache.linkis.manager.engineplugin.common.launch.process.LaunchConstants.{CLASS_PATH_SEPARATOR, addPathToClassPath} import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel import java.util - import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer class FlinkEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder { @@ -132,9 +122,9 @@ class FlinkEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder { override protected def getExtractJavaOpts: String = { val javaOpts = super.getExtractJavaOpts val defaultJavaOpts = FLINK_ENGINE_CONN_DEFAULT_JAVA_OPTS.getValue - val mergedString = (defaultJavaOpts.format(getGcLogDir(engineConnBuildRequest)).split("\\s+") ++ javaOpts.split("\\s+")).distinct.mkString(" ") - if (!HadoopConf.KEYTAB_PROXYUSER_ENABLED.getValue) mergedString - else mergedString + s" -DHADOOP_PROXY_USER=${variable(USER)}".trim + val mergedJavaOpts = (defaultJavaOpts.split("[ =]+") ++ javaOpts.split("[ =]+")).distinct.mkString(" ") + if (!HadoopConf.KEYTAB_PROXYUSER_ENABLED.getValue) mergedJavaOpts + else mergedJavaOpts + s" -DHADOOP_PROXY_USER=${variable(USER)}".trim } override protected def ifAddHiveConfigPath: Boolean = true From 6e492ca0185830df23a906e7e18391d8f4bf4095 Mon Sep 17 00:00:00 2001 From: v_nikeyang <1013195908@qq.com> Date: Tue, 22 Aug 2023 14:57:51 +0800 Subject: [PATCH 03/21] flink load default configuration --- .../engineconnplugin/flink/config/FlinkEnvConfiguration.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala index c979ed0be4..ab1a16df61 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala @@ -139,7 +139,7 @@ object FlinkEnvConfiguration { val FLINK_ENGINE_CONN_DEFAULT_JAVA_OPTS = CommonVars[String]( "wds.linkis.engineConn.javaOpts.default", - s"-Xloggc:\\/gc -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=20 -XX:GCLogFileSize=512M -XX:+PrintPromotionFailure -XX:+PrintGCCause" + s"-Xloggc:\\/gc.log -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=20 -XX:GCLogFileSize=512M -XX:+PrintPromotionFailure -XX:+PrintGCCause" ) From 1f52e7e59370050dccd18c0d13aa4a56d464a2c8 Mon Sep 17 00:00:00 2001 From: v_nikeyang <1013195908@qq.com> Date: Tue, 29 Aug 2023 15:01:00 +0800 Subject: [PATCH 04/21] flink load default configuration --- .../flink/config/FlinkEnvConfiguration.scala | 5 ----- .../launch/FlinkEngineConnLaunchBuilder.scala | 15 ++++++++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala index ab1a16df61..77db14ab99 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala @@ -137,10 +137,5 @@ object FlinkEnvConfiguration { val FLINK_HANDSHAKE_WAIT_TIME_MILLS = CommonVars("linkis.flink.handshake.wait.time.mills", 60 * 1000) - val FLINK_ENGINE_CONN_DEFAULT_JAVA_OPTS = CommonVars[String]( - "wds.linkis.engineConn.javaOpts.default", - s"-Xloggc:\\/gc.log -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=20 -XX:GCLogFileSize=512M -XX:+PrintPromotionFailure -XX:+PrintGCCause" - ) - } diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala index aa4979cf0f..39806c0df1 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala @@ -28,7 +28,9 @@ import org.apache.linkis.manager.engineplugin.common.launch.process.{Environment import org.apache.linkis.manager.engineplugin.common.launch.process.Environment.{PWD, USER, variable} import org.apache.linkis.manager.engineplugin.common.launch.process.LaunchConstants.{CLASS_PATH_SEPARATOR, addPathToClassPath} import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel +import org.yaml.snakeyaml.Yaml +import java.io.{File, FileInputStream} import java.util import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer @@ -120,11 +122,14 @@ class FlinkEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder { Array(FLINK_HOME_ENV, FLINK_CONF_DIR_ENV) ++: super.getNecessaryEnvironment override protected def getExtractJavaOpts: String = { - val javaOpts = super.getExtractJavaOpts - val defaultJavaOpts = FLINK_ENGINE_CONN_DEFAULT_JAVA_OPTS.getValue - val mergedJavaOpts = (defaultJavaOpts.split("[ =]+") ++ javaOpts.split("[ =]+")).distinct.mkString(" ") - if (!HadoopConf.KEYTAB_PROXYUSER_ENABLED.getValue) mergedJavaOpts - else mergedJavaOpts + s" -DHADOOP_PROXY_USER=${variable(USER)}".trim + var javaOpts = super.getExtractJavaOpts + val configFile = new File(FLINK_CONF_DIR.getValue + "flink-config.yaml") + val yaml = new Yaml() + val config = yaml.load(new FileInputStream(configFile)) + val defaultJavaOpts = config.asInstanceOf[java.util.LinkedHashMap[String, Any]].get("env.java.opts") + if (defaultJavaOpts.toString.nonEmpty) javaOpts = (defaultJavaOpts.toString.split("[ =]+") ++ javaOpts.split("[ =]+")).distinct.mkString(" ") + if (!HadoopConf.KEYTAB_PROXYUSER_ENABLED.getValue) javaOpts + else javaOpts + s" -DHADOOP_PROXY_USER=${variable(USER)}".trim } override protected def ifAddHiveConfigPath: Boolean = true From f7b4239f8dbca246255a508a98389cffeca879f8 Mon Sep 17 00:00:00 2001 From: v_nikeyang <1013195908@qq.com> Date: Tue, 29 Aug 2023 21:50:26 +0800 Subject: [PATCH 05/21] flink load default configuration --- .../launch/FlinkEngineConnLaunchBuilder.scala | 42 +++++++++++++++---- 1 file changed, 33 insertions(+), 9 deletions(-) diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala index 39806c0df1..83907865ef 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala @@ -24,17 +24,29 @@ import org.apache.linkis.hadoop.common.conf.HadoopConf import org.apache.linkis.manager.common.protocol.bml.BmlResource import org.apache.linkis.manager.engineplugin.common.conf.EnvConfiguration import org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnBuildRequest -import org.apache.linkis.manager.engineplugin.common.launch.process.{Environment, JavaProcessEngineConnLaunchBuilder} -import org.apache.linkis.manager.engineplugin.common.launch.process.Environment.{PWD, USER, variable} -import org.apache.linkis.manager.engineplugin.common.launch.process.LaunchConstants.{CLASS_PATH_SEPARATOR, addPathToClassPath} +import org.apache.linkis.manager.engineplugin.common.launch.process.{ + Environment, + JavaProcessEngineConnLaunchBuilder +} +import org.apache.linkis.manager.engineplugin.common.launch.process.Environment.{ + variable, + PWD, + USER +} +import org.apache.linkis.manager.engineplugin.common.launch.process.LaunchConstants.{ + addPathToClassPath, + CLASS_PATH_SEPARATOR +} import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel -import org.yaml.snakeyaml.Yaml import java.io.{File, FileInputStream} import java.util + import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer +import org.yaml.snakeyaml.Yaml + class FlinkEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder { override protected def getCommands(implicit @@ -122,12 +134,24 @@ class FlinkEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder { Array(FLINK_HOME_ENV, FLINK_CONF_DIR_ENV) ++: super.getNecessaryEnvironment override protected def getExtractJavaOpts: String = { + logger.info("FlinkEngineConnLaunchBuilder.getExtractJavaOpts+++++++++++++++++") var javaOpts = super.getExtractJavaOpts - val configFile = new File(FLINK_CONF_DIR.getValue + "flink-config.yaml") - val yaml = new Yaml() - val config = yaml.load(new FileInputStream(configFile)) - val defaultJavaOpts = config.asInstanceOf[java.util.LinkedHashMap[String, Any]].get("env.java.opts") - if (defaultJavaOpts.toString.nonEmpty) javaOpts = (defaultJavaOpts.toString.split("[ =]+") ++ javaOpts.split("[ =]+")).distinct.mkString(" ") + var defaultJavaOpts = "" + val configFile = new File(FLINK_CONF_DIR.getValue + "/flink-conf.yaml") + if (configFile.exists()) { + val yaml = new Yaml() + val inputStream = new FileInputStream(configFile) + val config = yaml.load(inputStream) + logger.info(config) + if (config != null) { + val configMap = config.asInstanceOf[util.Map[String, Any]] + if (configMap.containsKey("env.java.opts")) { + defaultJavaOpts = configMap.get("env.java.opts").toString + } + javaOpts = + (defaultJavaOpts.split("[ =]+") ++ javaOpts.split("[ =]+")).distinct.mkString(" ") + } + } if (!HadoopConf.KEYTAB_PROXYUSER_ENABLED.getValue) javaOpts else javaOpts + s" -DHADOOP_PROXY_USER=${variable(USER)}".trim } From 34cea9202872dac33d4ba1b5542f78c42e2db77e Mon Sep 17 00:00:00 2001 From: v_nikeyang <1013195908@qq.com> Date: Wed, 30 Aug 2023 18:20:38 +0800 Subject: [PATCH 06/21] flink load default configuration --- .../flink/config/FlinkEnvConfiguration.scala | 4 ++ .../launch/FlinkEngineConnLaunchBuilder.scala | 47 ++++++++++++------- 2 files changed, 34 insertions(+), 17 deletions(-) diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala index 77db14ab99..b3742384a1 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala @@ -137,5 +137,9 @@ object FlinkEnvConfiguration { val FLINK_HANDSHAKE_WAIT_TIME_MILLS = CommonVars("linkis.flink.handshake.wait.time.mills", 60 * 1000) + val FLINK_ENGINE_CONN_DEFAULT_JAVA_OPTS = CommonVars[String]( + "wds.linkis.engineConn.javaOpts.default", + s"-Xloggc:\\/gc.log -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=20 -XX:GCLogFileSize=512M -XX:+PrintPromotionFailure -XX:+PrintGCCause" + ) } diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala index 83907865ef..b15b53ed68 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala @@ -134,26 +134,39 @@ class FlinkEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder { Array(FLINK_HOME_ENV, FLINK_CONF_DIR_ENV) ++: super.getNecessaryEnvironment override protected def getExtractJavaOpts: String = { - logger.info("FlinkEngineConnLaunchBuilder.getExtractJavaOpts+++++++++++++++++") - var javaOpts = super.getExtractJavaOpts - var defaultJavaOpts = "" - val configFile = new File(FLINK_CONF_DIR.getValue + "/flink-conf.yaml") - if (configFile.exists()) { - val yaml = new Yaml() - val inputStream = new FileInputStream(configFile) - val config = yaml.load(inputStream) - logger.info(config) - if (config != null) { - val configMap = config.asInstanceOf[util.Map[String, Any]] - if (configMap.containsKey("env.java.opts")) { - defaultJavaOpts = configMap.get("env.java.opts").toString + // logger.info("FlinkEngineConnLaunchBuilder.getExtractJavaOpts+++++++++++++++++") + // var javaOpts = super.getExtractJavaOpts + // var defaultJavaOpts = "" + // val configFile = new File(FLINK_CONF_DIR.getValue + "/flink-conf.yaml") + // if (configFile.exists()) { + // val yaml = new Yaml() + // val inputStream = new FileInputStream(configFile) + // val config = yaml.load(inputStream) + // logger.info(config) + // if (config != null) { + // val configMap = config.asInstanceOf[util.Map[String, Any]] + // if (configMap.containsKey("env.java.opts")) { + // defaultJavaOpts = configMap.get("env.java.opts").toString + // } + // javaOpts = + // (defaultJavaOpts.split("[ =]+") ++ javaOpts.split("[ =]+")).distinct.mkString(" ") + // } + // } + val commandLine: ArrayBuffer[String] = ArrayBuffer[String]() + var javaOpts = super.getExtractJavaOpts.split("\\s+").toSet + var defaultJavaOpts = FLINK_ENGINE_CONN_DEFAULT_JAVA_OPTS.getValue.split("\\s+").toSet + javaOpts.toStream.foreach(commandLine += _) + defaultJavaOpts.toStream.foreach(commandLine += _) + for (x <- javaOpts) { + for (y <- defaultJavaOpts) { + if (x.substring(0, 7).equals(y.substring(0, 7))) { + commandLine -= x } - javaOpts = - (defaultJavaOpts.split("[ =]+") ++ javaOpts.split("[ =]+")).distinct.mkString(" ") } } - if (!HadoopConf.KEYTAB_PROXYUSER_ENABLED.getValue) javaOpts - else javaOpts + s" -DHADOOP_PROXY_USER=${variable(USER)}".trim + val newJavaOpts = commandLine.mkString(" ") + if (!HadoopConf.KEYTAB_PROXYUSER_ENABLED.getValue) newJavaOpts + else newJavaOpts + s" -DHADOOP_PROXY_USER=${variable(USER)}".trim } override protected def ifAddHiveConfigPath: Boolean = true From b576adbc1b0f41005bdd29f7bba189d012567a86 Mon Sep 17 00:00:00 2001 From: v_nikeyang <1013195908@qq.com> Date: Wed, 30 Aug 2023 21:56:28 +0800 Subject: [PATCH 07/21] flink load default configuration --- .../flink/config/FlinkEnvConfiguration.scala | 6 +-- .../factory/FlinkEngineConnFactory.scala | 54 ++++++++++++++++++- .../launch/FlinkEngineConnLaunchBuilder.scala | 35 +----------- 3 files changed, 57 insertions(+), 38 deletions(-) diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala index b3742384a1..9f5d847ca9 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala @@ -60,7 +60,7 @@ object FlinkEnvConfiguration { "/appcom/Install/flink/lib", "The local lib path of each user in Flink EngineConn." ) - + val FLINK_YAML_MERGE_ENABLE = CommonVars("flink.yaml.merge.enable", true) val FLINK_SHIP_DIRECTORIES = CommonVars("flink.yarn.ship-directories", "") val FLINK_SHIP_REMOTE_DIRECTORIES = CommonVars("flink.yarn.remote.ship-directories", "") @@ -137,9 +137,9 @@ object FlinkEnvConfiguration { val FLINK_HANDSHAKE_WAIT_TIME_MILLS = CommonVars("linkis.flink.handshake.wait.time.mills", 60 * 1000) - val FLINK_ENGINE_CONN_DEFAULT_JAVA_OPTS = CommonVars[String]( + val FLINK_ENGINE_CONN_DEFAULT_JAVA_OPTS = CommonVars( "wds.linkis.engineConn.javaOpts.default", - s"-Xloggc:\\/gc.log -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=20 -XX:GCLogFileSize=512M -XX:+PrintPromotionFailure -XX:+PrintGCCause" + "-Xloggc:\\/gc.log -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=20 -XX:GCLogFileSize=512M -XX:+PrintPromotionFailure -XX:+PrintGCCause" ) } diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala index 35961a652f..db19e70c55 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala @@ -51,7 +51,7 @@ import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup import org.apache.flink.yarn.configuration.{YarnConfigOptions, YarnDeploymentTarget} -import java.io.File +import java.io.{File, FileInputStream} import java.net.URL import java.text.MessageFormat import java.time.Duration @@ -59,8 +59,10 @@ import java.util import java.util.{Collections, Locale} import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer import com.google.common.collect.{Lists, Sets} +import org.yaml.snakeyaml.Yaml class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging { @@ -174,7 +176,12 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numberOfTaskSlots) // set extra configs options.asScala.filter { case (key, _) => key.startsWith(FLINK_CONFIG_PREFIX) }.foreach { - case (key, value) => flinkConfig.setString(key.substring(FLINK_CONFIG_PREFIX.length), value) + case (key, value) => + var flinkConfigValue = value + if (FlinkEnvConfiguration.FLINK_YAML_MERGE_ENABLE.getValue && key.equals(FLINK_CONFIG_PREFIX + "env.java.opts")) { + flinkConfigValue = getExtractJavaOpts(value) + } + flinkConfig.setString(key.substring(FLINK_CONFIG_PREFIX.length), flinkConfigValue) } // set kerberos config if (FLINK_KERBEROS_ENABLE.getValue(options)) { @@ -232,6 +239,49 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging context } + protected def getExtractJavaOpts(envJavaOpts: String): String = { + val commandLine: ArrayBuffer[String] = ArrayBuffer[String]() + val javaOpts = envJavaOpts.split("\\s+") + val defaultJavaOpts = FlinkEnvConfiguration.FLINK_ENGINE_CONN_DEFAULT_JAVA_OPTS.getValue.split("\\s+") + javaOpts.toStream.foreach(commandLine += _) + defaultJavaOpts.toStream.foreach(commandLine += _) +// val configFile = new File("/appcom/config/flink-config/flink-conf.yaml") +// if (configFile.exists()) { +// val yaml = new Yaml() +// val inputStream = new FileInputStream(configFile) +// val configMap = yaml.loadAs(inputStream, classOf[util.Map[String, Any]]) +// if (configMap != null) { +// if (configMap.containsKey("env.java.opts")) { +// val defaultJavaOpts = configMap.get("env.java.opts").toString.split("\\s+") +// defaultJavaOpts.toStream.foreach(commandLine += _) +// } +// } +// inputStream.close() +// } +// if (configFile.exists()) { +// val yaml = new Yaml() +// val inputStream = new FileInputStream(configFile) +// val configMap = yaml.loadAs(inputStream, classOf[util.Map[String, Any]]) +// if (configMap != null) { +// if (configMap.containsKey("env.java.opts")) { +// val defaultJavaOpts = configMap.get("env.java.opts").toString.split("\\s+") +// defaultJavaOpts.toStream.foreach(commandLine += _) +// if (javaOpts.nonEmpty && defaultJavaOpts.nonEmpty) { +// for (x <- defaultJavaOpts) { +// for (y <- javaOpts) { +// if (x.substring(0, 7).equals(y.substring(0, 7))) { +// commandLine -= x +// } +// } +// } +// } +// } +// } +// inputStream.close() +// } + commandLine.mkString(" ") + } + protected def isOnceEngineConn(labels: util.List[Label[_]]): Boolean = { val engineConnModeLabel = getEngineConnModeLabel(labels) engineConnModeLabel != null && (EngineConnMode.toEngineConnMode( diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala index b15b53ed68..9f686b929d 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala @@ -134,39 +134,8 @@ class FlinkEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder { Array(FLINK_HOME_ENV, FLINK_CONF_DIR_ENV) ++: super.getNecessaryEnvironment override protected def getExtractJavaOpts: String = { - // logger.info("FlinkEngineConnLaunchBuilder.getExtractJavaOpts+++++++++++++++++") - // var javaOpts = super.getExtractJavaOpts - // var defaultJavaOpts = "" - // val configFile = new File(FLINK_CONF_DIR.getValue + "/flink-conf.yaml") - // if (configFile.exists()) { - // val yaml = new Yaml() - // val inputStream = new FileInputStream(configFile) - // val config = yaml.load(inputStream) - // logger.info(config) - // if (config != null) { - // val configMap = config.asInstanceOf[util.Map[String, Any]] - // if (configMap.containsKey("env.java.opts")) { - // defaultJavaOpts = configMap.get("env.java.opts").toString - // } - // javaOpts = - // (defaultJavaOpts.split("[ =]+") ++ javaOpts.split("[ =]+")).distinct.mkString(" ") - // } - // } - val commandLine: ArrayBuffer[String] = ArrayBuffer[String]() - var javaOpts = super.getExtractJavaOpts.split("\\s+").toSet - var defaultJavaOpts = FLINK_ENGINE_CONN_DEFAULT_JAVA_OPTS.getValue.split("\\s+").toSet - javaOpts.toStream.foreach(commandLine += _) - defaultJavaOpts.toStream.foreach(commandLine += _) - for (x <- javaOpts) { - for (y <- defaultJavaOpts) { - if (x.substring(0, 7).equals(y.substring(0, 7))) { - commandLine -= x - } - } - } - val newJavaOpts = commandLine.mkString(" ") - if (!HadoopConf.KEYTAB_PROXYUSER_ENABLED.getValue) newJavaOpts - else newJavaOpts + s" -DHADOOP_PROXY_USER=${variable(USER)}".trim + if (!HadoopConf.KEYTAB_PROXYUSER_ENABLED.getValue) super.getExtractJavaOpts + else super.getExtractJavaOpts + s" -DHADOOP_PROXY_USER=${variable(USER)}".trim } override protected def ifAddHiveConfigPath: Boolean = true From 1559dd232f1eaea2eda1271b25892efaa9e723b3 Mon Sep 17 00:00:00 2001 From: v_nikeyang <1013195908@qq.com> Date: Thu, 31 Aug 2023 00:43:14 +0800 Subject: [PATCH 08/21] flink load default configuration --- .../factory/FlinkEngineConnFactory.scala | 87 +++++++++---------- 1 file changed, 39 insertions(+), 48 deletions(-) diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala index db19e70c55..61b4ab8435 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala @@ -17,7 +17,6 @@ package org.apache.linkis.engineconnplugin.flink.factory -import org.apache.linkis.common.conf.CommonVars import org.apache.linkis.common.utils.{ClassUtils, Logging} import org.apache.linkis.engineconn.acessible.executor.conf.AccessibleExecutorConfiguration import org.apache.linkis.engineconn.common.creation.EngineCreationContext @@ -42,27 +41,22 @@ import org.apache.linkis.manager.engineplugin.common.creation.{ import org.apache.linkis.manager.label.entity.Label import org.apache.linkis.manager.label.entity.engine._ import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType -import org.apache.linkis.protocol.utils.TaskUtils - import org.apache.commons.lang3.StringUtils import org.apache.flink.configuration._ import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup import org.apache.flink.yarn.configuration.{YarnConfigOptions, YarnDeploymentTarget} - -import java.io.{File, FileInputStream} +import java.io.{File} import java.net.URL import java.text.MessageFormat import java.time.Duration import java.util import java.util.{Collections, Locale} - import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer - import com.google.common.collect.{Lists, Sets} import org.yaml.snakeyaml.Yaml +import scala.io.Source class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging { @@ -240,46 +234,43 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging } protected def getExtractJavaOpts(envJavaOpts: String): String = { - val commandLine: ArrayBuffer[String] = ArrayBuffer[String]() - val javaOpts = envJavaOpts.split("\\s+") - val defaultJavaOpts = FlinkEnvConfiguration.FLINK_ENGINE_CONN_DEFAULT_JAVA_OPTS.getValue.split("\\s+") - javaOpts.toStream.foreach(commandLine += _) - defaultJavaOpts.toStream.foreach(commandLine += _) -// val configFile = new File("/appcom/config/flink-config/flink-conf.yaml") -// if (configFile.exists()) { -// val yaml = new Yaml() -// val inputStream = new FileInputStream(configFile) -// val configMap = yaml.loadAs(inputStream, classOf[util.Map[String, Any]]) -// if (configMap != null) { -// if (configMap.containsKey("env.java.opts")) { -// val defaultJavaOpts = configMap.get("env.java.opts").toString.split("\\s+") -// defaultJavaOpts.toStream.foreach(commandLine += _) -// } -// } -// inputStream.close() -// } -// if (configFile.exists()) { -// val yaml = new Yaml() -// val inputStream = new FileInputStream(configFile) -// val configMap = yaml.loadAs(inputStream, classOf[util.Map[String, Any]]) -// if (configMap != null) { -// if (configMap.containsKey("env.java.opts")) { -// val defaultJavaOpts = configMap.get("env.java.opts").toString.split("\\s+") -// defaultJavaOpts.toStream.foreach(commandLine += _) -// if (javaOpts.nonEmpty && defaultJavaOpts.nonEmpty) { -// for (x <- defaultJavaOpts) { -// for (y <- javaOpts) { -// if (x.substring(0, 7).equals(y.substring(0, 7))) { -// commandLine -= x -// } -// } -// } -// } -// } -// } -// inputStream.close() -// } - commandLine.mkString(" ") + var defaultJavaOpts = "" + val yamlFilePath = "/appcom/config/flink-config/flink-conf.yaml" + val source = Source.fromFile(yamlFilePath) + try { + val yamlContent = source.mkString + val yaml = new Yaml() + val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]]) + if (configMap.containsKey("env.java.opts")) { + defaultJavaOpts = configMap.get("env.java.opts").toString + } + } finally { + source.close() + } + val merged = mergeAndDeduplicate(defaultJavaOpts, envJavaOpts) + merged + } + protected def mergeAndDeduplicate(str1: String, str2: String): String = { + // Extract values from str2 + val pattern = """-XX:([^\s]+)=([^\s]+)""".r + val keyValueMap = pattern.findAllMatchIn(str2).map { matchResult => + val key = matchResult.group(1) + val value = matchResult.group(2) + (key, value) + }.toMap + + val xloggcPattern = """-Xloggc:[^\s]+""".r + val xloggcValue = xloggcPattern.findFirstMatchIn(str2).getOrElse("").toString + val escapedXloggcValue = xloggcValue.replace("<", "\\<").replace(">", "\\>") + val mergedString = str1.replace("-Xloggc:%s", escapedXloggcValue) + + val finalMergedString = keyValueMap.foldLeft(mergedString) { (result, entry) => + val (key, value) = entry + val oldValue = s"$key=[^\\s]+" + val newValue = key + "=" + value + result.replaceAll(oldValue, newValue) + } + finalMergedString } protected def isOnceEngineConn(labels: util.List[Label[_]]): Boolean = { From 29cdbd8fd852772a01b788c7f09b6ecb04cfdfe0 Mon Sep 17 00:00:00 2001 From: v_nikeyang <1013195908@qq.com> Date: Thu, 31 Aug 2023 10:49:08 +0800 Subject: [PATCH 09/21] flink load default configuration --- .../factory/FlinkEngineConnFactory.scala | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala index 61b4ab8435..56d9e3e751 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala @@ -34,10 +34,7 @@ import org.apache.linkis.engineconnplugin.flink.setting.Settings import org.apache.linkis.engineconnplugin.flink.util.{ClassUtil, ManagerUtil} import org.apache.linkis.governance.common.conf.GovernanceCommonConf import org.apache.linkis.manager.engineplugin.common.conf.EnvConfiguration -import org.apache.linkis.manager.engineplugin.common.creation.{ - ExecutorFactory, - MultiExecutorEngineConnFactory -} +import org.apache.linkis.manager.engineplugin.common.creation.{ExecutorFactory, MultiExecutorEngineConnFactory} import org.apache.linkis.manager.label.entity.Label import org.apache.linkis.manager.label.entity.engine._ import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType @@ -47,7 +44,7 @@ import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup import org.apache.flink.yarn.configuration.{YarnConfigOptions, YarnDeploymentTarget} -import java.io.{File} +import java.io.File import java.net.URL import java.text.MessageFormat import java.time.Duration @@ -251,26 +248,33 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging merged } protected def mergeAndDeduplicate(str1: String, str2: String): String = { - // Extract values from str2 val pattern = """-XX:([^\s]+)=([^\s]+)""".r val keyValueMap = pattern.findAllMatchIn(str2).map { matchResult => val key = matchResult.group(1) val value = matchResult.group(2) (key, value) }.toMap - val xloggcPattern = """-Xloggc:[^\s]+""".r - val xloggcValue = xloggcPattern.findFirstMatchIn(str2).getOrElse("").toString - val escapedXloggcValue = xloggcValue.replace("<", "\\<").replace(">", "\\>") - val mergedString = str1.replace("-Xloggc:%s", escapedXloggcValue) - + val xloggcValueStr1 = xloggcPattern.findFirstMatchIn(str1).getOrElse("").toString + val xloggcValueStr2 = xloggcPattern.findFirstMatchIn(str2).getOrElse("").toString + var escapedXloggcValue = "" + var mergedString = "" + if (xloggcValueStr1.nonEmpty && xloggcValueStr2.nonEmpty) { + escapedXloggcValue = xloggcValueStr2.replace("<", "\\<").replace(">", "\\>") + mergedString = str1.replace(xloggcValueStr1, escapedXloggcValue) + } + if (xloggcValueStr1.nonEmpty && xloggcValueStr2.isEmpty) { + escapedXloggcValue = xloggcValueStr1.replace("<", "\\<").replace(">", "\\>") + mergedString = str1.replace(xloggcValueStr1, escapedXloggcValue) + } val finalMergedString = keyValueMap.foldLeft(mergedString) { (result, entry) => val (key, value) = entry val oldValue = s"$key=[^\\s]+" val newValue = key + "=" + value result.replaceAll(oldValue, newValue) } - finalMergedString + val javaOpts = (finalMergedString.split("\\s+") ++ str2.split("\\s+")).distinct.mkString(" ") + javaOpts } protected def isOnceEngineConn(labels: util.List[Label[_]]): Boolean = { From 70d940ce52ffee8d1f2073355be2744a5971e1d7 Mon Sep 17 00:00:00 2001 From: v_nikeyang <1013195908@qq.com> Date: Thu, 31 Aug 2023 11:05:32 +0800 Subject: [PATCH 10/21] flink load default configuration --- .../factory/FlinkEngineConnFactory.scala | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala index 56d9e3e751..d6812577e6 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala @@ -248,8 +248,15 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging merged } protected def mergeAndDeduplicate(str1: String, str2: String): String = { - val pattern = """-XX:([^\s]+)=([^\s]+)""".r - val keyValueMap = pattern.findAllMatchIn(str2).map { matchResult => + val patternX = """-XX:([^\s]+)=([^\s]+)""".r + val keyValueMapX = patternX.findAllMatchIn(str2).map { matchResult => + val key = matchResult.group(1) + val value = matchResult.group(2) + (key, value) + }.toMap + + val patternD = """-D([^\s]+)=([^\s]+)""".r + val keyValueMapD = patternD.findAllMatchIn(str2).map { matchResult => val key = matchResult.group(1) val value = matchResult.group(2) (key, value) @@ -267,13 +274,20 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging escapedXloggcValue = xloggcValueStr1.replace("<", "\\<").replace(">", "\\>") mergedString = str1.replace(xloggcValueStr1, escapedXloggcValue) } - val finalMergedString = keyValueMap.foldLeft(mergedString) { (result, entry) => + val MergedStringX = keyValueMapX.foldLeft(mergedString) { (result, entry) => + val (key, value) = entry + val oldValue = s"$key=[^\\s]+" + val newValue = key + "=" + value + result.replaceAll(oldValue, newValue) + } + + val MergedStringD = keyValueMapD.foldLeft(MergedStringX) { (result, entry) => val (key, value) = entry val oldValue = s"$key=[^\\s]+" val newValue = key + "=" + value result.replaceAll(oldValue, newValue) } - val javaOpts = (finalMergedString.split("\\s+") ++ str2.split("\\s+")).distinct.mkString(" ") + val javaOpts = (MergedStringD.split("\\s+") ++ str2.split("\\s+")).distinct.mkString(" ") javaOpts } From f31faa779d109eb8fe7e961fcb377a19812489bb Mon Sep 17 00:00:00 2001 From: v_nikeyang <1013195908@qq.com> Date: Thu, 31 Aug 2023 11:23:49 +0800 Subject: [PATCH 11/21] flink load default configuration --- .../flink/config/FlinkEnvConfiguration.scala | 2 +- .../flink/factory/FlinkEngineConnFactory.scala | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala index 9f5d847ca9..78db5c6ce2 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala @@ -139,7 +139,7 @@ object FlinkEnvConfiguration { val FLINK_ENGINE_CONN_DEFAULT_JAVA_OPTS = CommonVars( "wds.linkis.engineConn.javaOpts.default", - "-Xloggc:\\/gc.log -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=20 -XX:GCLogFileSize=512M -XX:+PrintPromotionFailure -XX:+PrintGCCause" + "-Xloggc:/gc.log -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=20 -XX:GCLogFileSize=512M -XX:+PrintPromotionFailure -XX:+PrintGCCause" ) } diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala index d6812577e6..b80a0afb55 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala @@ -169,7 +169,10 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging options.asScala.filter { case (key, _) => key.startsWith(FLINK_CONFIG_PREFIX) }.foreach { case (key, value) => var flinkConfigValue = value - if (FlinkEnvConfiguration.FLINK_YAML_MERGE_ENABLE.getValue && key.equals(FLINK_CONFIG_PREFIX + "env.java.opts")) { + if ( + FlinkEnvConfiguration.FLINK_YAML_MERGE_ENABLE.getValue && key + .equals(FLINK_CONFIG_PREFIX + "env.java.opts") + ) { flinkConfigValue = getExtractJavaOpts(value) } flinkConfig.setString(key.substring(FLINK_CONFIG_PREFIX.length), flinkConfigValue) @@ -231,7 +234,8 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging } protected def getExtractJavaOpts(envJavaOpts: String): String = { - var defaultJavaOpts = "" + var defaultJavaOpts = FlinkEnvConfiguration.FLINK_ENGINE_CONN_DEFAULT_JAVA_OPTS.getValue +// var defaultJavaOpts = "" val yamlFilePath = "/appcom/config/flink-config/flink-conf.yaml" val source = Source.fromFile(yamlFilePath) try { @@ -247,6 +251,7 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging val merged = mergeAndDeduplicate(defaultJavaOpts, envJavaOpts) merged } + protected def mergeAndDeduplicate(str1: String, str2: String): String = { val patternX = """-XX:([^\s]+)=([^\s]+)""".r val keyValueMapX = patternX.findAllMatchIn(str2).map { matchResult => From c1dcfcb3e40fcd4e64e285805387eb669cd24159 Mon Sep 17 00:00:00 2001 From: v_nikeyang <1013195908@qq.com> Date: Thu, 31 Aug 2023 12:05:33 +0800 Subject: [PATCH 12/21] add exception --- .../flink/factory/FlinkEngineConnFactory.scala | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala index b80a0afb55..8951b3bcb5 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala @@ -17,7 +17,7 @@ package org.apache.linkis.engineconnplugin.flink.factory -import org.apache.linkis.common.utils.{ClassUtils, Logging} +import org.apache.linkis.common.utils.{ClassUtils, Logging, Utils} import org.apache.linkis.engineconn.acessible.executor.conf.AccessibleExecutorConfiguration import org.apache.linkis.engineconn.common.creation.EngineCreationContext import org.apache.linkis.engineconn.launch.EngineConnServer @@ -44,7 +44,8 @@ import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup import org.apache.flink.yarn.configuration.{YarnConfigOptions, YarnDeploymentTarget} -import java.io.File + +import java.io.{File, FileNotFoundException} import java.net.URL import java.text.MessageFormat import java.time.Duration @@ -52,7 +53,10 @@ import java.util import java.util.{Collections, Locale} import scala.collection.JavaConverters._ import com.google.common.collect.{Lists, Sets} +import org.apache.flink.table.api.Expressions.e +import org.apache.flink.table.api.e import org.yaml.snakeyaml.Yaml + import scala.io.Source class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging { @@ -234,8 +238,8 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging } protected def getExtractJavaOpts(envJavaOpts: String): String = { - var defaultJavaOpts = FlinkEnvConfiguration.FLINK_ENGINE_CONN_DEFAULT_JAVA_OPTS.getValue -// var defaultJavaOpts = "" + // var defaultJavaOpts = FlinkEnvConfiguration.FLINK_ENGINE_CONN_DEFAULT_JAVA_OPTS.getValue + var defaultJavaOpts = "" val yamlFilePath = "/appcom/config/flink-config/flink-conf.yaml" val source = Source.fromFile(yamlFilePath) try { @@ -253,6 +257,7 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging } protected def mergeAndDeduplicate(str1: String, str2: String): String = { + if (str1.isEmpty) throw new FileNotFoundException("env.java.opts is empty") val patternX = """-XX:([^\s]+)=([^\s]+)""".r val keyValueMapX = patternX.findAllMatchIn(str2).map { matchResult => val key = matchResult.group(1) From 91ed48d46eeca5d5ba58cc73df68b34bba9897dc Mon Sep 17 00:00:00 2001 From: v_nikeyang <1013195908@qq.com> Date: Thu, 31 Aug 2023 12:48:32 +0800 Subject: [PATCH 13/21] exception Change to logger warn --- .../engineconnplugin/flink/factory/FlinkEngineConnFactory.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala index 8951b3bcb5..ac65dba901 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala @@ -257,7 +257,7 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging } protected def mergeAndDeduplicate(str1: String, str2: String): String = { - if (str1.isEmpty) throw new FileNotFoundException("env.java.opts is empty") + if (str1.isEmpty) logger.warn("env.java.opts is empty") val patternX = """-XX:([^\s]+)=([^\s]+)""".r val keyValueMapX = patternX.findAllMatchIn(str2).map { matchResult => val key = matchResult.group(1) From bc71f914041b191203b6727006eb704b863f5994 Mon Sep 17 00:00:00 2001 From: v_nikeyang <1013195908@qq.com> Date: Thu, 31 Aug 2023 14:14:22 +0800 Subject: [PATCH 14/21] fix bug --- .../flink/factory/FlinkEngineConnFactory.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala index ac65dba901..2ceb0dc113 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala @@ -276,9 +276,11 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging val xloggcValueStr2 = xloggcPattern.findFirstMatchIn(str2).getOrElse("").toString var escapedXloggcValue = "" var mergedString = "" + var replaceString = "" if (xloggcValueStr1.nonEmpty && xloggcValueStr2.nonEmpty) { escapedXloggcValue = xloggcValueStr2.replace("<", "\\<").replace(">", "\\>") mergedString = str1.replace(xloggcValueStr1, escapedXloggcValue) + replaceString = str2.replace(xloggcValueStr2, "") } if (xloggcValueStr1.nonEmpty && xloggcValueStr2.isEmpty) { escapedXloggcValue = xloggcValueStr1.replace("<", "\\<").replace(">", "\\>") @@ -297,7 +299,7 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging val newValue = key + "=" + value result.replaceAll(oldValue, newValue) } - val javaOpts = (MergedStringD.split("\\s+") ++ str2.split("\\s+")).distinct.mkString(" ") + val javaOpts = (MergedStringD.split("\\s+") ++ replaceString.split("\\s+")).distinct.mkString(" ") javaOpts } From 10eea188ff830759c2241cf7d271ffcf77db0de7 Mon Sep 17 00:00:00 2001 From: v_nikeyang <1013195908@qq.com> Date: Mon, 4 Sep 2023 22:22:52 +0800 Subject: [PATCH 15/21] fix bug --- .../flink/factory/FlinkEngineConnFactory.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala index 2ceb0dc113..8982838476 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala @@ -286,6 +286,11 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging escapedXloggcValue = xloggcValueStr1.replace("<", "\\<").replace(">", "\\>") mergedString = str1.replace(xloggcValueStr1, escapedXloggcValue) } + + if (xloggcValueStr1.isEmpty && xloggcValueStr2.isEmpty) { + mergedString = str1 + } + val MergedStringX = keyValueMapX.foldLeft(mergedString) { (result, entry) => val (key, value) = entry val oldValue = s"$key=[^\\s]+" From 7fac30d7617f601f764c0b445994a9136df83b55 Mon Sep 17 00:00:00 2001 From: v_nikeyang <1013195908@qq.com> Date: Tue, 5 Sep 2023 11:10:16 +0800 Subject: [PATCH 16/21] change variable name --- .../factory/FlinkEngineConnFactory.scala | 36 ++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala index 8982838476..d38f69f7a9 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala @@ -34,15 +34,21 @@ import org.apache.linkis.engineconnplugin.flink.setting.Settings import org.apache.linkis.engineconnplugin.flink.util.{ClassUtil, ManagerUtil} import org.apache.linkis.governance.common.conf.GovernanceCommonConf import org.apache.linkis.manager.engineplugin.common.conf.EnvConfiguration -import org.apache.linkis.manager.engineplugin.common.creation.{ExecutorFactory, MultiExecutorEngineConnFactory} +import org.apache.linkis.manager.engineplugin.common.creation.{ + ExecutorFactory, + MultiExecutorEngineConnFactory +} import org.apache.linkis.manager.label.entity.Label import org.apache.linkis.manager.label.entity.engine._ import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType + import org.apache.commons.lang3.StringUtils import org.apache.flink.configuration._ import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup +import org.apache.flink.table.api.Expressions.e +import org.apache.flink.table.api.e import org.apache.flink.yarn.configuration.{YarnConfigOptions, YarnDeploymentTarget} import java.io.{File, FileNotFoundException} @@ -51,14 +57,13 @@ import java.text.MessageFormat import java.time.Duration import java.util import java.util.{Collections, Locale} + import scala.collection.JavaConverters._ +import scala.io.Source + import com.google.common.collect.{Lists, Sets} -import org.apache.flink.table.api.Expressions.e -import org.apache.flink.table.api.e import org.yaml.snakeyaml.Yaml -import scala.io.Source - class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging { override protected def createEngineConnSession( @@ -257,7 +262,6 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging } protected def mergeAndDeduplicate(str1: String, str2: String): String = { - if (str1.isEmpty) logger.warn("env.java.opts is empty") val patternX = """-XX:([^\s]+)=([^\s]+)""".r val keyValueMapX = patternX.findAllMatchIn(str2).map { matchResult => val key = matchResult.group(1) @@ -275,23 +279,23 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging val xloggcValueStr1 = xloggcPattern.findFirstMatchIn(str1).getOrElse("").toString val xloggcValueStr2 = xloggcPattern.findFirstMatchIn(str2).getOrElse("").toString var escapedXloggcValue = "" - var mergedString = "" - var replaceString = "" + var replaceStr1 = "" + var replaceStr2 = "" if (xloggcValueStr1.nonEmpty && xloggcValueStr2.nonEmpty) { escapedXloggcValue = xloggcValueStr2.replace("<", "\\<").replace(">", "\\>") - mergedString = str1.replace(xloggcValueStr1, escapedXloggcValue) - replaceString = str2.replace(xloggcValueStr2, "") + replaceStr1 = str1.replace(xloggcValueStr1, escapedXloggcValue) + replaceStr2 = str2.replace(xloggcValueStr2, "") } if (xloggcValueStr1.nonEmpty && xloggcValueStr2.isEmpty) { escapedXloggcValue = xloggcValueStr1.replace("<", "\\<").replace(">", "\\>") - mergedString = str1.replace(xloggcValueStr1, escapedXloggcValue) + replaceStr1 = str1.replace(xloggcValueStr1, escapedXloggcValue) + replaceStr2 = str2 } - if (xloggcValueStr1.isEmpty && xloggcValueStr2.isEmpty) { - mergedString = str1 + replaceStr1 = str1 + replaceStr2 = str2 } - - val MergedStringX = keyValueMapX.foldLeft(mergedString) { (result, entry) => + val MergedStringX = keyValueMapX.foldLeft(replaceStr1) { (result, entry) => val (key, value) = entry val oldValue = s"$key=[^\\s]+" val newValue = key + "=" + value @@ -304,7 +308,7 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging val newValue = key + "=" + value result.replaceAll(oldValue, newValue) } - val javaOpts = (MergedStringD.split("\\s+") ++ replaceString.split("\\s+")).distinct.mkString(" ") + val javaOpts = (MergedStringD.split("\\s+") ++ replaceStr2.split("\\s+")).distinct.mkString(" ") javaOpts } From 67756f49dd019b7be2d0ba494209ad0484db0dce Mon Sep 17 00:00:00 2001 From: v_nikeyang <1013195908@qq.com> Date: Thu, 14 Sep 2023 17:02:29 +0800 Subject: [PATCH 17/21] fix code review --- .../flink/config/FlinkEnvConfiguration.scala | 7 +- .../factory/FlinkEngineConnFactory.scala | 42 +++++++--- .../factory/TestFlinkEngineConnFactory.scala | 82 +++++++++++++++++++ 3 files changed, 116 insertions(+), 15 deletions(-) create mode 100644 linkis-engineconn-plugins/flink/src/main/test/scala/org.apache.linkis.engineconnplugin.flink/factory/TestFlinkEngineConnFactory.scala diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala index 78db5c6ce2..aef2f7ffc8 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala @@ -44,6 +44,8 @@ object FlinkEnvConfiguration { FLINK_HOME.getValue + s"/lib/flink-dist_2.11-${FLINK_VERSION.getValue}.jar" ) + val FLINK_CONF_YAML = CommonVars("flink.conf.yaml.dir", "flink-conf.yaml") + val FLINK_PROVIDED_LIB_PATH = CommonVars("flink.lib.path", "") val FLINK_PROVIDED_USER_LIB_PATH = @@ -60,6 +62,7 @@ object FlinkEnvConfiguration { "/appcom/Install/flink/lib", "The local lib path of each user in Flink EngineConn." ) + val FLINK_YAML_MERGE_ENABLE = CommonVars("flink.yaml.merge.enable", true) val FLINK_SHIP_DIRECTORIES = CommonVars("flink.yarn.ship-directories", "") val FLINK_SHIP_REMOTE_DIRECTORIES = CommonVars("flink.yarn.remote.ship-directories", "") @@ -137,9 +140,5 @@ object FlinkEnvConfiguration { val FLINK_HANDSHAKE_WAIT_TIME_MILLS = CommonVars("linkis.flink.handshake.wait.time.mills", 60 * 1000) - val FLINK_ENGINE_CONN_DEFAULT_JAVA_OPTS = CommonVars( - "wds.linkis.engineConn.javaOpts.default", - "-Xloggc:/gc.log -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=20 -XX:GCLogFileSize=512M -XX:+PrintPromotionFailure -XX:+PrintGCCause" - ) } diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala index d38f69f7a9..3cf1ec6708 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala @@ -243,24 +243,44 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging } protected def getExtractJavaOpts(envJavaOpts: String): String = { - // var defaultJavaOpts = FlinkEnvConfiguration.FLINK_ENGINE_CONN_DEFAULT_JAVA_OPTS.getValue var defaultJavaOpts = "" - val yamlFilePath = "/appcom/config/flink-config/flink-conf.yaml" - val source = Source.fromFile(yamlFilePath) - try { - val yamlContent = source.mkString - val yaml = new Yaml() - val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]]) - if (configMap.containsKey("env.java.opts")) { - defaultJavaOpts = configMap.get("env.java.opts").toString + val yamlFilePath = FLINK_CONF_DIR.getValue + val yamlFile = yamlFilePath + FLINK_CONF_YAML.getHotValue() + if (new File(yamlFile).exists()) { + val source = Source.fromFile(yamlFile) + try { + val yamlContent = source.mkString + val yaml = new Yaml() + val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]]) + if (configMap.containsKey("env.java.opts")) { + defaultJavaOpts = configMap.get("env.java.opts").toString + } + } finally { + source.close() + } + } else { + val inputStream = getClass.getResourceAsStream(yamlFile) + if (inputStream != null) { + val source = Source.fromInputStream(inputStream) + try { + val yamlContent = source.mkString + val yaml = new Yaml() + val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]]) + if (configMap.containsKey("env.java.opts")) { + defaultJavaOpts = configMap.get("env.java.opts").toString + } + } finally { + source.close() + } + } else { + throw new FileNotFoundException("YAML file not found in both file system and classpath.") } - } finally { - source.close() } val merged = mergeAndDeduplicate(defaultJavaOpts, envJavaOpts) merged } + protected def mergeAndDeduplicate(str1: String, str2: String): String = { val patternX = """-XX:([^\s]+)=([^\s]+)""".r val keyValueMapX = patternX.findAllMatchIn(str2).map { matchResult => diff --git a/linkis-engineconn-plugins/flink/src/main/test/scala/org.apache.linkis.engineconnplugin.flink/factory/TestFlinkEngineConnFactory.scala b/linkis-engineconn-plugins/flink/src/main/test/scala/org.apache.linkis.engineconnplugin.flink/factory/TestFlinkEngineConnFactory.scala new file mode 100644 index 0000000000..6feaaafe30 --- /dev/null +++ b/linkis-engineconn-plugins/flink/src/main/test/scala/org.apache.linkis.engineconnplugin.flink/factory/TestFlinkEngineConnFactory.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.factory + +import org.junit.jupiter.api.Test + +class TestFlinkEngineConnFactory { + + @Test + def testMergeAndDeduplicate: Unit = { + var defaultJavaOpts = "-Da=3 -Db=4 -XXc=5 -Dk=a1=b"; + var envJavaOpts = "-DjobName=0607_1 -Dlog4j.configuration=./log4j.properties -Da=1 -Dk=a1=c"; + val merged = mergeAndDeduplicate(defaultJavaOpts, envJavaOpts) + } + + protected def mergeAndDeduplicate(str1: String, str2: String): String = { + val patternX = """-XX:([^\s]+)=([^\s]+)""".r + val keyValueMapX = patternX.findAllMatchIn(str2).map { matchResult => + val key = matchResult.group(1) + val value = matchResult.group(2) + (key, value) + }.toMap + + val patternD = """-D([^\s]+)=([^\s]+)""".r + val keyValueMapD = patternD.findAllMatchIn(str2).map { matchResult => + val key = matchResult.group(1) + val value = matchResult.group(2) + (key, value) + }.toMap + val xloggcPattern = """-Xloggc:[^\s]+""".r + val xloggcValueStr1 = xloggcPattern.findFirstMatchIn(str1).getOrElse("").toString + val xloggcValueStr2 = xloggcPattern.findFirstMatchIn(str2).getOrElse("").toString + var escapedXloggcValue = "" + var replaceStr1 = "" + var replaceStr2 = "" + if (xloggcValueStr1.nonEmpty && xloggcValueStr2.nonEmpty) { + escapedXloggcValue = xloggcValueStr2.replace("<", "\\<").replace(">", "\\>") + replaceStr1 = str1.replace(xloggcValueStr1, escapedXloggcValue) + replaceStr2 = str2.replace(xloggcValueStr2, "") + } + if (xloggcValueStr1.nonEmpty && xloggcValueStr2.isEmpty) { + escapedXloggcValue = xloggcValueStr1.replace("<", "\\<").replace(">", "\\>") + replaceStr1 = str1.replace(xloggcValueStr1, escapedXloggcValue) + replaceStr2 = str2 + } + if (xloggcValueStr1.isEmpty && xloggcValueStr2.isEmpty) { + replaceStr1 = str1 + replaceStr2 = str2 + } + val MergedStringX = keyValueMapX.foldLeft(replaceStr1) { (result, entry) => + val (key, value) = entry + val oldValue = s"$key=[^\\s]+" + val newValue = key + "=" + value + result.replaceAll(oldValue, newValue) + } + + val MergedStringD = keyValueMapD.foldLeft(MergedStringX) { (result, entry) => + val (key, value) = entry + val oldValue = s"$key=[^\\s]+" + val newValue = key + "=" + value + result.replaceAll(oldValue, newValue) + } + val javaOpts = (MergedStringD.split("\\s+") ++ replaceStr2.split("\\s+")).distinct.mkString(" ") + javaOpts + } + +} From 3df68f3e74dfb9e5098509a0ffd926c7e9bcf2dc Mon Sep 17 00:00:00 2001 From: v_nikeyang <1013195908@qq.com> Date: Tue, 19 Sep 2023 16:54:13 +0800 Subject: [PATCH 18/21] fix code review --- .../engineconnplugin/flink/factory/FlinkEngineConnFactory.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala index 3cf1ec6708..56f5fb6862 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala @@ -245,7 +245,7 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging protected def getExtractJavaOpts(envJavaOpts: String): String = { var defaultJavaOpts = "" val yamlFilePath = FLINK_CONF_DIR.getValue - val yamlFile = yamlFilePath + FLINK_CONF_YAML.getHotValue() + val yamlFile = yamlFilePath + "/" + FLINK_CONF_YAML.getHotValue() if (new File(yamlFile).exists()) { val source = Source.fromFile(yamlFile) try { From 788eaadde8f68218a2208eacab9ebb6ada00bc74 Mon Sep 17 00:00:00 2001 From: v_nikeyang <1013195908@qq.com> Date: Sun, 8 Oct 2023 16:24:50 +0800 Subject: [PATCH 19/21] fix code review --- .../flink/config/FlinkEnvConfiguration.scala | 2 + .../factory/FlinkEngineConnFactory.scala | 34 +++++++------- .../factory/TestFlinkEngineConnFactory.scala | 47 +++++++++++++++++++ 3 files changed, 66 insertions(+), 17 deletions(-) rename linkis-engineconn-plugins/flink/src/{main => }/test/scala/org.apache.linkis.engineconnplugin.flink/factory/TestFlinkEngineConnFactory.scala (64%) diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala index aef2f7ffc8..dd3751e26e 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala @@ -140,5 +140,7 @@ object FlinkEnvConfiguration { val FLINK_HANDSHAKE_WAIT_TIME_MILLS = CommonVars("linkis.flink.handshake.wait.time.mills", 60 * 1000) + val FLINK_ENV_JAVA_OPTS = + CommonVars("flink.env.java.opts", "env.java.opts") } diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala index 56f5fb6862..07479ed818 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala @@ -180,7 +180,7 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging var flinkConfigValue = value if ( FlinkEnvConfiguration.FLINK_YAML_MERGE_ENABLE.getValue && key - .equals(FLINK_CONFIG_PREFIX + "env.java.opts") + .equals(FLINK_CONFIG_PREFIX + FLINK_ENV_JAVA_OPTS.getValue) ) { flinkConfigValue = getExtractJavaOpts(value) } @@ -242,7 +242,7 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging context } - protected def getExtractJavaOpts(envJavaOpts: String): String = { + private def getExtractJavaOpts(envJavaOpts: String): String = { var defaultJavaOpts = "" val yamlFilePath = FLINK_CONF_DIR.getValue val yamlFile = yamlFilePath + "/" + FLINK_CONF_YAML.getHotValue() @@ -252,8 +252,8 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging val yamlContent = source.mkString val yaml = new Yaml() val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]]) - if (configMap.containsKey("env.java.opts")) { - defaultJavaOpts = configMap.get("env.java.opts").toString + if (configMap.containsKey(FLINK_ENV_JAVA_OPTS.getValue)) { + defaultJavaOpts = configMap.get(FLINK_ENV_JAVA_OPTS.getValue).toString } } finally { source.close() @@ -266,8 +266,8 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging val yamlContent = source.mkString val yaml = new Yaml() val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]]) - if (configMap.containsKey("env.java.opts")) { - defaultJavaOpts = configMap.get("env.java.opts").toString + if (configMap.containsKey(FLINK_ENV_JAVA_OPTS.getValue)) { + defaultJavaOpts = configMap.get(FLINK_ENV_JAVA_OPTS.getValue).toString } } finally { source.close() @@ -281,39 +281,39 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging } - protected def mergeAndDeduplicate(str1: String, str2: String): String = { + private def mergeAndDeduplicate(defaultJavaOpts: String, envJavaOpts: String): String = { val patternX = """-XX:([^\s]+)=([^\s]+)""".r - val keyValueMapX = patternX.findAllMatchIn(str2).map { matchResult => + val keyValueMapX = patternX.findAllMatchIn(envJavaOpts).map { matchResult => val key = matchResult.group(1) val value = matchResult.group(2) (key, value) }.toMap val patternD = """-D([^\s]+)=([^\s]+)""".r - val keyValueMapD = patternD.findAllMatchIn(str2).map { matchResult => + val keyValueMapD = patternD.findAllMatchIn(envJavaOpts).map { matchResult => val key = matchResult.group(1) val value = matchResult.group(2) (key, value) }.toMap val xloggcPattern = """-Xloggc:[^\s]+""".r - val xloggcValueStr1 = xloggcPattern.findFirstMatchIn(str1).getOrElse("").toString - val xloggcValueStr2 = xloggcPattern.findFirstMatchIn(str2).getOrElse("").toString + val xloggcValueStr1 = xloggcPattern.findFirstMatchIn(defaultJavaOpts).getOrElse("").toString + val xloggcValueStr2 = xloggcPattern.findFirstMatchIn(envJavaOpts).getOrElse("").toString var escapedXloggcValue = "" var replaceStr1 = "" var replaceStr2 = "" if (xloggcValueStr1.nonEmpty && xloggcValueStr2.nonEmpty) { escapedXloggcValue = xloggcValueStr2.replace("<", "\\<").replace(">", "\\>") - replaceStr1 = str1.replace(xloggcValueStr1, escapedXloggcValue) - replaceStr2 = str2.replace(xloggcValueStr2, "") + replaceStr1 = defaultJavaOpts.replace(xloggcValueStr1, escapedXloggcValue) + replaceStr2 = envJavaOpts.replace(xloggcValueStr2, "") } if (xloggcValueStr1.nonEmpty && xloggcValueStr2.isEmpty) { escapedXloggcValue = xloggcValueStr1.replace("<", "\\<").replace(">", "\\>") - replaceStr1 = str1.replace(xloggcValueStr1, escapedXloggcValue) - replaceStr2 = str2 + replaceStr1 = defaultJavaOpts.replace(xloggcValueStr1, escapedXloggcValue) + replaceStr2 = envJavaOpts } if (xloggcValueStr1.isEmpty && xloggcValueStr2.isEmpty) { - replaceStr1 = str1 - replaceStr2 = str2 + replaceStr1 = defaultJavaOpts + replaceStr2 = envJavaOpts } val MergedStringX = keyValueMapX.foldLeft(replaceStr1) { (result, entry) => val (key, value) = entry diff --git a/linkis-engineconn-plugins/flink/src/main/test/scala/org.apache.linkis.engineconnplugin.flink/factory/TestFlinkEngineConnFactory.scala b/linkis-engineconn-plugins/flink/src/test/scala/org.apache.linkis.engineconnplugin.flink/factory/TestFlinkEngineConnFactory.scala similarity index 64% rename from linkis-engineconn-plugins/flink/src/main/test/scala/org.apache.linkis.engineconnplugin.flink/factory/TestFlinkEngineConnFactory.scala rename to linkis-engineconn-plugins/flink/src/test/scala/org.apache.linkis.engineconnplugin.flink/factory/TestFlinkEngineConnFactory.scala index 6feaaafe30..5abfb97e61 100644 --- a/linkis-engineconn-plugins/flink/src/main/test/scala/org.apache.linkis.engineconnplugin.flink/factory/TestFlinkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/flink/src/test/scala/org.apache.linkis.engineconnplugin.flink/factory/TestFlinkEngineConnFactory.scala @@ -17,15 +17,62 @@ package org.apache.linkis.engineconnplugin.flink.factory +import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration.{FLINK_CONF_DIR, FLINK_CONF_YAML} +import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test +import org.yaml.snakeyaml.Yaml + +import java.io.{File, FileNotFoundException} +import java.util +import scala.io.Source class TestFlinkEngineConnFactory { + @Test + private def getExtractJavaOpts(envJavaOpts: String): String = { + var defaultJavaOpts = "" + val yamlFilePath = FLINK_CONF_DIR.getValue + val yamlFile = yamlFilePath + "/" + FLINK_CONF_YAML.getHotValue() + if (new File(yamlFile).exists()) { + val source = Source.fromFile(yamlFile) + try { + val yamlContent = source.mkString + val yaml = new Yaml() + val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]]) + if (configMap.containsKey("env.java.opts")) { + defaultJavaOpts = configMap.get("env.java.opts").toString + } + } finally { + source.close() + } + } else { + val inputStream = getClass.getResourceAsStream(yamlFile) + if (inputStream != null) { + val source = Source.fromInputStream(inputStream) + try { + val yamlContent = source.mkString + val yaml = new Yaml() + val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]]) + if (configMap.containsKey("env.java.opts")) { + defaultJavaOpts = configMap.get("env.java.opts").toString + } + } finally { + source.close() + } + } else { + throw new FileNotFoundException("YAML file not found in both file system and classpath.") + } + } + val merged = mergeAndDeduplicate(defaultJavaOpts, envJavaOpts) + merged + } + @Test def testMergeAndDeduplicate: Unit = { var defaultJavaOpts = "-Da=3 -Db=4 -XXc=5 -Dk=a1=b"; var envJavaOpts = "-DjobName=0607_1 -Dlog4j.configuration=./log4j.properties -Da=1 -Dk=a1=c"; val merged = mergeAndDeduplicate(defaultJavaOpts, envJavaOpts) + assertEquals("-Da=1 -Db=4 -XXc=5 -Dk=a1=c -DjobName=0607_1 -Dlog4j.configuration=./log4j.properties", merged) } protected def mergeAndDeduplicate(str1: String, str2: String): String = { From c9e2b33d924dedba013f29e992de2dcae75df6c4 Mon Sep 17 00:00:00 2001 From: v_nikeyang <1013195908@qq.com> Date: Mon, 9 Oct 2023 15:23:04 +0800 Subject: [PATCH 20/21] fix code review --- .../factory/FlinkEngineConnFactory.scala | 62 +------------------ .../flink/util/FlinkValueFormatUtil.scala | 51 +++++++++++++++ .../factory/TestFlinkEngineConnFactory.scala | 61 ++---------------- 3 files changed, 59 insertions(+), 115 deletions(-) diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala index 07479ed818..5d55b5d87f 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala @@ -31,17 +31,13 @@ import org.apache.linkis.engineconnplugin.flink.context.{EnvironmentContext, Fli import org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary._ import org.apache.linkis.engineconnplugin.flink.exception.FlinkInitFailedException import org.apache.linkis.engineconnplugin.flink.setting.Settings -import org.apache.linkis.engineconnplugin.flink.util.{ClassUtil, ManagerUtil} +import org.apache.linkis.engineconnplugin.flink.util.{ClassUtil, FlinkValueFormatUtil, ManagerUtil} import org.apache.linkis.governance.common.conf.GovernanceCommonConf import org.apache.linkis.manager.engineplugin.common.conf.EnvConfiguration -import org.apache.linkis.manager.engineplugin.common.creation.{ - ExecutorFactory, - MultiExecutorEngineConnFactory -} +import org.apache.linkis.manager.engineplugin.common.creation.{ExecutorFactory, MultiExecutorEngineConnFactory} import org.apache.linkis.manager.label.entity.Label import org.apache.linkis.manager.label.entity.engine._ import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType - import org.apache.commons.lang3.StringUtils import org.apache.flink.configuration._ import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings @@ -57,10 +53,8 @@ import java.text.MessageFormat import java.time.Duration import java.util import java.util.{Collections, Locale} - import scala.collection.JavaConverters._ import scala.io.Source - import com.google.common.collect.{Lists, Sets} import org.yaml.snakeyaml.Yaml @@ -276,61 +270,11 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging throw new FileNotFoundException("YAML file not found in both file system and classpath.") } } - val merged = mergeAndDeduplicate(defaultJavaOpts, envJavaOpts) + val merged = FlinkValueFormatUtil.mergeAndDeduplicate(defaultJavaOpts, envJavaOpts) merged } - private def mergeAndDeduplicate(defaultJavaOpts: String, envJavaOpts: String): String = { - val patternX = """-XX:([^\s]+)=([^\s]+)""".r - val keyValueMapX = patternX.findAllMatchIn(envJavaOpts).map { matchResult => - val key = matchResult.group(1) - val value = matchResult.group(2) - (key, value) - }.toMap - - val patternD = """-D([^\s]+)=([^\s]+)""".r - val keyValueMapD = patternD.findAllMatchIn(envJavaOpts).map { matchResult => - val key = matchResult.group(1) - val value = matchResult.group(2) - (key, value) - }.toMap - val xloggcPattern = """-Xloggc:[^\s]+""".r - val xloggcValueStr1 = xloggcPattern.findFirstMatchIn(defaultJavaOpts).getOrElse("").toString - val xloggcValueStr2 = xloggcPattern.findFirstMatchIn(envJavaOpts).getOrElse("").toString - var escapedXloggcValue = "" - var replaceStr1 = "" - var replaceStr2 = "" - if (xloggcValueStr1.nonEmpty && xloggcValueStr2.nonEmpty) { - escapedXloggcValue = xloggcValueStr2.replace("<", "\\<").replace(">", "\\>") - replaceStr1 = defaultJavaOpts.replace(xloggcValueStr1, escapedXloggcValue) - replaceStr2 = envJavaOpts.replace(xloggcValueStr2, "") - } - if (xloggcValueStr1.nonEmpty && xloggcValueStr2.isEmpty) { - escapedXloggcValue = xloggcValueStr1.replace("<", "\\<").replace(">", "\\>") - replaceStr1 = defaultJavaOpts.replace(xloggcValueStr1, escapedXloggcValue) - replaceStr2 = envJavaOpts - } - if (xloggcValueStr1.isEmpty && xloggcValueStr2.isEmpty) { - replaceStr1 = defaultJavaOpts - replaceStr2 = envJavaOpts - } - val MergedStringX = keyValueMapX.foldLeft(replaceStr1) { (result, entry) => - val (key, value) = entry - val oldValue = s"$key=[^\\s]+" - val newValue = key + "=" + value - result.replaceAll(oldValue, newValue) - } - - val MergedStringD = keyValueMapD.foldLeft(MergedStringX) { (result, entry) => - val (key, value) = entry - val oldValue = s"$key=[^\\s]+" - val newValue = key + "=" + value - result.replaceAll(oldValue, newValue) - } - val javaOpts = (MergedStringD.split("\\s+") ++ replaceStr2.split("\\s+")).distinct.mkString(" ") - javaOpts - } protected def isOnceEngineConn(labels: util.List[Label[_]]): Boolean = { val engineConnModeLabel = getEngineConnModeLabel(labels) diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/FlinkValueFormatUtil.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/FlinkValueFormatUtil.scala index 62782507eb..8335eaeb85 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/FlinkValueFormatUtil.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/FlinkValueFormatUtil.scala @@ -36,4 +36,55 @@ object FlinkValueFormatUtil { case _ => null } + def mergeAndDeduplicate(defaultJavaOpts: String, envJavaOpts: String): String = { + val patternX = """-XX:([^\s]+)=([^\s]+)""".r + val keyValueMapX = patternX.findAllMatchIn(envJavaOpts).map { matchResult => + val key = matchResult.group(1) + val value = matchResult.group(2) + (key, value) + }.toMap + + val patternD = """-D([^\s]+)=([^\s]+)""".r + val keyValueMapD = patternD.findAllMatchIn(envJavaOpts).map { matchResult => + val key = matchResult.group(1) + val value = matchResult.group(2) + (key, value) + }.toMap + val xloggcPattern = """-Xloggc:[^\s]+""".r + val xloggcValueStr1 = xloggcPattern.findFirstMatchIn(defaultJavaOpts).getOrElse("").toString + val xloggcValueStr2 = xloggcPattern.findFirstMatchIn(envJavaOpts).getOrElse("").toString + var escapedXloggcValue = "" + var replaceStr1 = "" + var replaceStr2 = "" + if (xloggcValueStr1.nonEmpty && xloggcValueStr2.nonEmpty) { + escapedXloggcValue = xloggcValueStr2.replace("<", "\\<").replace(">", "\\>") + replaceStr1 = defaultJavaOpts.replace(xloggcValueStr1, escapedXloggcValue) + replaceStr2 = envJavaOpts.replace(xloggcValueStr2, "") + } + if (xloggcValueStr1.nonEmpty && xloggcValueStr2.isEmpty) { + escapedXloggcValue = xloggcValueStr1.replace("<", "\\<").replace(">", "\\>") + replaceStr1 = defaultJavaOpts.replace(xloggcValueStr1, escapedXloggcValue) + replaceStr2 = envJavaOpts + } + if (xloggcValueStr1.isEmpty && xloggcValueStr2.isEmpty) { + replaceStr1 = defaultJavaOpts + replaceStr2 = envJavaOpts + } + val MergedStringX = keyValueMapX.foldLeft(replaceStr1) { (result, entry) => + val (key, value) = entry + val oldValue = s"$key=[^\\s]+" + val newValue = key + "=" + value + result.replaceAll(oldValue, newValue) + } + + val MergedStringD = keyValueMapD.foldLeft(MergedStringX) { (result, entry) => + val (key, value) = entry + val oldValue = s"$key=[^\\s]+" + val newValue = key + "=" + value + result.replaceAll(oldValue, newValue) + } + val javaOpts = (MergedStringD.split("\\s+") ++ replaceStr2.split("\\s+")).distinct.mkString(" ") + javaOpts + } + } diff --git a/linkis-engineconn-plugins/flink/src/test/scala/org.apache.linkis.engineconnplugin.flink/factory/TestFlinkEngineConnFactory.scala b/linkis-engineconn-plugins/flink/src/test/scala/org.apache.linkis.engineconnplugin.flink/factory/TestFlinkEngineConnFactory.scala index 5abfb97e61..43faa71d16 100644 --- a/linkis-engineconn-plugins/flink/src/test/scala/org.apache.linkis.engineconnplugin.flink/factory/TestFlinkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/flink/src/test/scala/org.apache.linkis.engineconnplugin.flink/factory/TestFlinkEngineConnFactory.scala @@ -18,6 +18,7 @@ package org.apache.linkis.engineconnplugin.flink.factory import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration.{FLINK_CONF_DIR, FLINK_CONF_YAML} +import org.apache.linkis.engineconnplugin.flink.util.FlinkValueFormatUtil import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.yaml.snakeyaml.Yaml @@ -63,67 +64,15 @@ class TestFlinkEngineConnFactory { throw new FileNotFoundException("YAML file not found in both file system and classpath.") } } - val merged = mergeAndDeduplicate(defaultJavaOpts, envJavaOpts) + val merged = FlinkValueFormatUtil.mergeAndDeduplicate(defaultJavaOpts, envJavaOpts) merged } @Test def testMergeAndDeduplicate: Unit = { - var defaultJavaOpts = "-Da=3 -Db=4 -XXc=5 -Dk=a1=b"; - var envJavaOpts = "-DjobName=0607_1 -Dlog4j.configuration=./log4j.properties -Da=1 -Dk=a1=c"; - val merged = mergeAndDeduplicate(defaultJavaOpts, envJavaOpts) + val defaultJavaOpts = "-Da=3 -Db=4 -XXc=5 -Dk=a1=b"; + val envJavaOpts = "-DjobName=0607_1 -Dlog4j.configuration=./log4j.properties -Da=1 -Dk=a1=c"; + val merged = FlinkValueFormatUtil.mergeAndDeduplicate(defaultJavaOpts, envJavaOpts) assertEquals("-Da=1 -Db=4 -XXc=5 -Dk=a1=c -DjobName=0607_1 -Dlog4j.configuration=./log4j.properties", merged) } - - protected def mergeAndDeduplicate(str1: String, str2: String): String = { - val patternX = """-XX:([^\s]+)=([^\s]+)""".r - val keyValueMapX = patternX.findAllMatchIn(str2).map { matchResult => - val key = matchResult.group(1) - val value = matchResult.group(2) - (key, value) - }.toMap - - val patternD = """-D([^\s]+)=([^\s]+)""".r - val keyValueMapD = patternD.findAllMatchIn(str2).map { matchResult => - val key = matchResult.group(1) - val value = matchResult.group(2) - (key, value) - }.toMap - val xloggcPattern = """-Xloggc:[^\s]+""".r - val xloggcValueStr1 = xloggcPattern.findFirstMatchIn(str1).getOrElse("").toString - val xloggcValueStr2 = xloggcPattern.findFirstMatchIn(str2).getOrElse("").toString - var escapedXloggcValue = "" - var replaceStr1 = "" - var replaceStr2 = "" - if (xloggcValueStr1.nonEmpty && xloggcValueStr2.nonEmpty) { - escapedXloggcValue = xloggcValueStr2.replace("<", "\\<").replace(">", "\\>") - replaceStr1 = str1.replace(xloggcValueStr1, escapedXloggcValue) - replaceStr2 = str2.replace(xloggcValueStr2, "") - } - if (xloggcValueStr1.nonEmpty && xloggcValueStr2.isEmpty) { - escapedXloggcValue = xloggcValueStr1.replace("<", "\\<").replace(">", "\\>") - replaceStr1 = str1.replace(xloggcValueStr1, escapedXloggcValue) - replaceStr2 = str2 - } - if (xloggcValueStr1.isEmpty && xloggcValueStr2.isEmpty) { - replaceStr1 = str1 - replaceStr2 = str2 - } - val MergedStringX = keyValueMapX.foldLeft(replaceStr1) { (result, entry) => - val (key, value) = entry - val oldValue = s"$key=[^\\s]+" - val newValue = key + "=" + value - result.replaceAll(oldValue, newValue) - } - - val MergedStringD = keyValueMapD.foldLeft(MergedStringX) { (result, entry) => - val (key, value) = entry - val oldValue = s"$key=[^\\s]+" - val newValue = key + "=" + value - result.replaceAll(oldValue, newValue) - } - val javaOpts = (MergedStringD.split("\\s+") ++ replaceStr2.split("\\s+")).distinct.mkString(" ") - javaOpts - } - } From f6c23ee7ff64c8055e7b37377effb00faedaa5f0 Mon Sep 17 00:00:00 2001 From: v_nikeyang <1013195908@qq.com> Date: Wed, 11 Oct 2023 14:19:19 +0800 Subject: [PATCH 21/21] fix code review --- .../factory/TestFlinkEngineConnFactory.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/linkis-engineconn-plugins/flink/src/test/scala/org.apache.linkis.engineconnplugin.flink/factory/TestFlinkEngineConnFactory.scala b/linkis-engineconn-plugins/flink/src/test/scala/org.apache.linkis.engineconnplugin.flink/factory/TestFlinkEngineConnFactory.scala index 43faa71d16..ef8dabf14e 100644 --- a/linkis-engineconn-plugins/flink/src/test/scala/org.apache.linkis.engineconnplugin.flink/factory/TestFlinkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/flink/src/test/scala/org.apache.linkis.engineconnplugin.flink/factory/TestFlinkEngineConnFactory.scala @@ -17,7 +17,7 @@ package org.apache.linkis.engineconnplugin.flink.factory -import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration.{FLINK_CONF_DIR, FLINK_CONF_YAML} +import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration.{FLINK_CONF_DIR, FLINK_CONF_YAML, FLINK_ENV_JAVA_OPTS} import org.apache.linkis.engineconnplugin.flink.util.FlinkValueFormatUtil import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test @@ -40,8 +40,8 @@ class TestFlinkEngineConnFactory { val yamlContent = source.mkString val yaml = new Yaml() val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]]) - if (configMap.containsKey("env.java.opts")) { - defaultJavaOpts = configMap.get("env.java.opts").toString + if (configMap.containsKey(FLINK_ENV_JAVA_OPTS.getValue)) { + defaultJavaOpts = configMap.get(FLINK_ENV_JAVA_OPTS.getValue).toString } } finally { source.close() @@ -54,8 +54,8 @@ class TestFlinkEngineConnFactory { val yamlContent = source.mkString val yaml = new Yaml() val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]]) - if (configMap.containsKey("env.java.opts")) { - defaultJavaOpts = configMap.get("env.java.opts").toString + if (configMap.containsKey(FLINK_ENV_JAVA_OPTS.getValue)) { + defaultJavaOpts = configMap.get(FLINK_ENV_JAVA_OPTS.getValue).toString } } finally { source.close()