Skip to content

Commit

Permalink
fix code review
Browse files Browse the repository at this point in the history
  • Loading branch information
yangwenzea committed Oct 8, 2023
1 parent 3df68f3 commit 788eaad
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,5 +140,7 @@ object FlinkEnvConfiguration {
val FLINK_HANDSHAKE_WAIT_TIME_MILLS =
CommonVars("linkis.flink.handshake.wait.time.mills", 60 * 1000)

val FLINK_ENV_JAVA_OPTS =
CommonVars("flink.env.java.opts", "env.java.opts")

}
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
var flinkConfigValue = value
if (
FlinkEnvConfiguration.FLINK_YAML_MERGE_ENABLE.getValue && key
.equals(FLINK_CONFIG_PREFIX + "env.java.opts")
.equals(FLINK_CONFIG_PREFIX + FLINK_ENV_JAVA_OPTS.getValue)
) {
flinkConfigValue = getExtractJavaOpts(value)
}
Expand Down Expand Up @@ -242,7 +242,7 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
context
}

protected def getExtractJavaOpts(envJavaOpts: String): String = {
private def getExtractJavaOpts(envJavaOpts: String): String = {
var defaultJavaOpts = ""
val yamlFilePath = FLINK_CONF_DIR.getValue
val yamlFile = yamlFilePath + "/" + FLINK_CONF_YAML.getHotValue()
Expand All @@ -252,8 +252,8 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
val yamlContent = source.mkString
val yaml = new Yaml()
val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]])
if (configMap.containsKey("env.java.opts")) {
defaultJavaOpts = configMap.get("env.java.opts").toString
if (configMap.containsKey(FLINK_ENV_JAVA_OPTS.getValue)) {
defaultJavaOpts = configMap.get(FLINK_ENV_JAVA_OPTS.getValue).toString
}
} finally {
source.close()
Expand All @@ -266,8 +266,8 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
val yamlContent = source.mkString
val yaml = new Yaml()
val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]])
if (configMap.containsKey("env.java.opts")) {
defaultJavaOpts = configMap.get("env.java.opts").toString
if (configMap.containsKey(FLINK_ENV_JAVA_OPTS.getValue)) {
defaultJavaOpts = configMap.get(FLINK_ENV_JAVA_OPTS.getValue).toString
}
} finally {
source.close()
Expand All @@ -281,39 +281,39 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
}


protected def mergeAndDeduplicate(str1: String, str2: String): String = {
private def mergeAndDeduplicate(defaultJavaOpts: String, envJavaOpts: String): String = {
val patternX = """-XX:([^\s]+)=([^\s]+)""".r
val keyValueMapX = patternX.findAllMatchIn(str2).map { matchResult =>
val keyValueMapX = patternX.findAllMatchIn(envJavaOpts).map { matchResult =>
val key = matchResult.group(1)
val value = matchResult.group(2)
(key, value)
}.toMap

val patternD = """-D([^\s]+)=([^\s]+)""".r
val keyValueMapD = patternD.findAllMatchIn(str2).map { matchResult =>
val keyValueMapD = patternD.findAllMatchIn(envJavaOpts).map { matchResult =>
val key = matchResult.group(1)
val value = matchResult.group(2)
(key, value)
}.toMap
val xloggcPattern = """-Xloggc:[^\s]+""".r
val xloggcValueStr1 = xloggcPattern.findFirstMatchIn(str1).getOrElse("").toString
val xloggcValueStr2 = xloggcPattern.findFirstMatchIn(str2).getOrElse("").toString
val xloggcValueStr1 = xloggcPattern.findFirstMatchIn(defaultJavaOpts).getOrElse("").toString
val xloggcValueStr2 = xloggcPattern.findFirstMatchIn(envJavaOpts).getOrElse("").toString
var escapedXloggcValue = ""
var replaceStr1 = ""
var replaceStr2 = ""
if (xloggcValueStr1.nonEmpty && xloggcValueStr2.nonEmpty) {
escapedXloggcValue = xloggcValueStr2.replace("<", "\\<").replace(">", "\\>")
replaceStr1 = str1.replace(xloggcValueStr1, escapedXloggcValue)
replaceStr2 = str2.replace(xloggcValueStr2, "")
replaceStr1 = defaultJavaOpts.replace(xloggcValueStr1, escapedXloggcValue)
replaceStr2 = envJavaOpts.replace(xloggcValueStr2, "")
}
if (xloggcValueStr1.nonEmpty && xloggcValueStr2.isEmpty) {
escapedXloggcValue = xloggcValueStr1.replace("<", "\\<").replace(">", "\\>")
replaceStr1 = str1.replace(xloggcValueStr1, escapedXloggcValue)
replaceStr2 = str2
replaceStr1 = defaultJavaOpts.replace(xloggcValueStr1, escapedXloggcValue)
replaceStr2 = envJavaOpts
}
if (xloggcValueStr1.isEmpty && xloggcValueStr2.isEmpty) {
replaceStr1 = str1
replaceStr2 = str2
replaceStr1 = defaultJavaOpts
replaceStr2 = envJavaOpts
}
val MergedStringX = keyValueMapX.foldLeft(replaceStr1) { (result, entry) =>
val (key, value) = entry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,62 @@

package org.apache.linkis.engineconnplugin.flink.factory

import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration.{FLINK_CONF_DIR, FLINK_CONF_YAML}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.yaml.snakeyaml.Yaml

import java.io.{File, FileNotFoundException}
import java.util
import scala.io.Source

class TestFlinkEngineConnFactory {

@Test
private def getExtractJavaOpts(envJavaOpts: String): String = {
var defaultJavaOpts = ""
val yamlFilePath = FLINK_CONF_DIR.getValue
val yamlFile = yamlFilePath + "/" + FLINK_CONF_YAML.getHotValue()
if (new File(yamlFile).exists()) {
val source = Source.fromFile(yamlFile)
try {
val yamlContent = source.mkString
val yaml = new Yaml()
val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]])
if (configMap.containsKey("env.java.opts")) {
defaultJavaOpts = configMap.get("env.java.opts").toString
}
} finally {
source.close()
}
} else {
val inputStream = getClass.getResourceAsStream(yamlFile)
if (inputStream != null) {
val source = Source.fromInputStream(inputStream)
try {
val yamlContent = source.mkString
val yaml = new Yaml()
val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]])
if (configMap.containsKey("env.java.opts")) {
defaultJavaOpts = configMap.get("env.java.opts").toString
}
} finally {
source.close()
}
} else {
throw new FileNotFoundException("YAML file not found in both file system and classpath.")
}
}
val merged = mergeAndDeduplicate(defaultJavaOpts, envJavaOpts)
merged
}

@Test
def testMergeAndDeduplicate: Unit = {
var defaultJavaOpts = "-Da=3 -Db=4 -XXc=5 -Dk=a1=b";
var envJavaOpts = "-DjobName=0607_1 -Dlog4j.configuration=./log4j.properties -Da=1 -Dk=a1=c";
val merged = mergeAndDeduplicate(defaultJavaOpts, envJavaOpts)
assertEquals("-Da=1 -Db=4 -XXc=5 -Dk=a1=c -DjobName=0607_1 -Dlog4j.configuration=./log4j.properties", merged)
}

protected def mergeAndDeduplicate(str1: String, str2: String): String = {
Expand Down

0 comments on commit 788eaad

Please sign in to comment.