From 788eaadde8f68218a2208eacab9ebb6ada00bc74 Mon Sep 17 00:00:00 2001 From: v_nikeyang <1013195908@qq.com> Date: Sun, 8 Oct 2023 16:24:50 +0800 Subject: [PATCH] fix code review --- .../flink/config/FlinkEnvConfiguration.scala | 2 + .../factory/FlinkEngineConnFactory.scala | 34 +++++++------- .../factory/TestFlinkEngineConnFactory.scala | 47 +++++++++++++++++++ 3 files changed, 66 insertions(+), 17 deletions(-) rename linkis-engineconn-plugins/flink/src/{main => }/test/scala/org.apache.linkis.engineconnplugin.flink/factory/TestFlinkEngineConnFactory.scala (64%) diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala index aef2f7ffc8..dd3751e26e 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala @@ -140,5 +140,7 @@ object FlinkEnvConfiguration { val FLINK_HANDSHAKE_WAIT_TIME_MILLS = CommonVars("linkis.flink.handshake.wait.time.mills", 60 * 1000) + val FLINK_ENV_JAVA_OPTS = + CommonVars("flink.env.java.opts", "env.java.opts") } diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala index 56f5fb6862..07479ed818 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala @@ -180,7 +180,7 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging var flinkConfigValue = value if ( FlinkEnvConfiguration.FLINK_YAML_MERGE_ENABLE.getValue && key - .equals(FLINK_CONFIG_PREFIX + "env.java.opts") + .equals(FLINK_CONFIG_PREFIX + FLINK_ENV_JAVA_OPTS.getValue) ) { flinkConfigValue = getExtractJavaOpts(value) } @@ -242,7 +242,7 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging context } - protected def getExtractJavaOpts(envJavaOpts: String): String = { + private def getExtractJavaOpts(envJavaOpts: String): String = { var defaultJavaOpts = "" val yamlFilePath = FLINK_CONF_DIR.getValue val yamlFile = yamlFilePath + "/" + FLINK_CONF_YAML.getHotValue() @@ -252,8 +252,8 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging val yamlContent = source.mkString val yaml = new Yaml() val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]]) - if (configMap.containsKey("env.java.opts")) { - defaultJavaOpts = configMap.get("env.java.opts").toString + if (configMap.containsKey(FLINK_ENV_JAVA_OPTS.getValue)) { + defaultJavaOpts = configMap.get(FLINK_ENV_JAVA_OPTS.getValue).toString } } finally { source.close() @@ -266,8 +266,8 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging val yamlContent = source.mkString val yaml = new Yaml() val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]]) - if (configMap.containsKey("env.java.opts")) { - defaultJavaOpts = configMap.get("env.java.opts").toString + if (configMap.containsKey(FLINK_ENV_JAVA_OPTS.getValue)) { + defaultJavaOpts = configMap.get(FLINK_ENV_JAVA_OPTS.getValue).toString } } finally { source.close() @@ -281,39 +281,39 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging } - protected def mergeAndDeduplicate(str1: String, str2: String): String = { + private def mergeAndDeduplicate(defaultJavaOpts: String, envJavaOpts: String): String = { val patternX = """-XX:([^\s]+)=([^\s]+)""".r - val keyValueMapX = patternX.findAllMatchIn(str2).map { matchResult => + val keyValueMapX = patternX.findAllMatchIn(envJavaOpts).map { matchResult => val key = matchResult.group(1) val value = matchResult.group(2) (key, value) }.toMap val patternD = """-D([^\s]+)=([^\s]+)""".r - val keyValueMapD = patternD.findAllMatchIn(str2).map { matchResult => + val keyValueMapD = patternD.findAllMatchIn(envJavaOpts).map { matchResult => val key = matchResult.group(1) val value = matchResult.group(2) (key, value) }.toMap val xloggcPattern = """-Xloggc:[^\s]+""".r - val xloggcValueStr1 = xloggcPattern.findFirstMatchIn(str1).getOrElse("").toString - val xloggcValueStr2 = xloggcPattern.findFirstMatchIn(str2).getOrElse("").toString + val xloggcValueStr1 = xloggcPattern.findFirstMatchIn(defaultJavaOpts).getOrElse("").toString + val xloggcValueStr2 = xloggcPattern.findFirstMatchIn(envJavaOpts).getOrElse("").toString var escapedXloggcValue = "" var replaceStr1 = "" var replaceStr2 = "" if (xloggcValueStr1.nonEmpty && xloggcValueStr2.nonEmpty) { escapedXloggcValue = xloggcValueStr2.replace("<", "\\<").replace(">", "\\>") - replaceStr1 = str1.replace(xloggcValueStr1, escapedXloggcValue) - replaceStr2 = str2.replace(xloggcValueStr2, "") + replaceStr1 = defaultJavaOpts.replace(xloggcValueStr1, escapedXloggcValue) + replaceStr2 = envJavaOpts.replace(xloggcValueStr2, "") } if (xloggcValueStr1.nonEmpty && xloggcValueStr2.isEmpty) { escapedXloggcValue = xloggcValueStr1.replace("<", "\\<").replace(">", "\\>") - replaceStr1 = str1.replace(xloggcValueStr1, escapedXloggcValue) - replaceStr2 = str2 + replaceStr1 = defaultJavaOpts.replace(xloggcValueStr1, escapedXloggcValue) + replaceStr2 = envJavaOpts } if (xloggcValueStr1.isEmpty && xloggcValueStr2.isEmpty) { - replaceStr1 = str1 - replaceStr2 = str2 + replaceStr1 = defaultJavaOpts + replaceStr2 = envJavaOpts } val MergedStringX = keyValueMapX.foldLeft(replaceStr1) { (result, entry) => val (key, value) = entry diff --git a/linkis-engineconn-plugins/flink/src/main/test/scala/org.apache.linkis.engineconnplugin.flink/factory/TestFlinkEngineConnFactory.scala b/linkis-engineconn-plugins/flink/src/test/scala/org.apache.linkis.engineconnplugin.flink/factory/TestFlinkEngineConnFactory.scala similarity index 64% rename from linkis-engineconn-plugins/flink/src/main/test/scala/org.apache.linkis.engineconnplugin.flink/factory/TestFlinkEngineConnFactory.scala rename to linkis-engineconn-plugins/flink/src/test/scala/org.apache.linkis.engineconnplugin.flink/factory/TestFlinkEngineConnFactory.scala index 6feaaafe30..5abfb97e61 100644 --- a/linkis-engineconn-plugins/flink/src/main/test/scala/org.apache.linkis.engineconnplugin.flink/factory/TestFlinkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/flink/src/test/scala/org.apache.linkis.engineconnplugin.flink/factory/TestFlinkEngineConnFactory.scala @@ -17,15 +17,62 @@ package org.apache.linkis.engineconnplugin.flink.factory +import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration.{FLINK_CONF_DIR, FLINK_CONF_YAML} +import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test +import org.yaml.snakeyaml.Yaml + +import java.io.{File, FileNotFoundException} +import java.util +import scala.io.Source class TestFlinkEngineConnFactory { + @Test + private def getExtractJavaOpts(envJavaOpts: String): String = { + var defaultJavaOpts = "" + val yamlFilePath = FLINK_CONF_DIR.getValue + val yamlFile = yamlFilePath + "/" + FLINK_CONF_YAML.getHotValue() + if (new File(yamlFile).exists()) { + val source = Source.fromFile(yamlFile) + try { + val yamlContent = source.mkString + val yaml = new Yaml() + val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]]) + if (configMap.containsKey("env.java.opts")) { + defaultJavaOpts = configMap.get("env.java.opts").toString + } + } finally { + source.close() + } + } else { + val inputStream = getClass.getResourceAsStream(yamlFile) + if (inputStream != null) { + val source = Source.fromInputStream(inputStream) + try { + val yamlContent = source.mkString + val yaml = new Yaml() + val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]]) + if (configMap.containsKey("env.java.opts")) { + defaultJavaOpts = configMap.get("env.java.opts").toString + } + } finally { + source.close() + } + } else { + throw new FileNotFoundException("YAML file not found in both file system and classpath.") + } + } + val merged = mergeAndDeduplicate(defaultJavaOpts, envJavaOpts) + merged + } + @Test def testMergeAndDeduplicate: Unit = { var defaultJavaOpts = "-Da=3 -Db=4 -XXc=5 -Dk=a1=b"; var envJavaOpts = "-DjobName=0607_1 -Dlog4j.configuration=./log4j.properties -Da=1 -Dk=a1=c"; val merged = mergeAndDeduplicate(defaultJavaOpts, envJavaOpts) + assertEquals("-Da=1 -Db=4 -XXc=5 -Dk=a1=c -DjobName=0607_1 -Dlog4j.configuration=./log4j.properties", merged) } protected def mergeAndDeduplicate(str1: String, str2: String): String = {