Skip to content

Commit

Permalink
fix code review
Browse files Browse the repository at this point in the history
  • Loading branch information
yangwenzea committed Sep 14, 2023
1 parent 7fac30d commit 67756f4
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ object FlinkEnvConfiguration {
FLINK_HOME.getValue + s"/lib/flink-dist_2.11-${FLINK_VERSION.getValue}.jar"
)

val FLINK_CONF_YAML = CommonVars("flink.conf.yaml.dir", "flink-conf.yaml")

val FLINK_PROVIDED_LIB_PATH = CommonVars("flink.lib.path", "")

val FLINK_PROVIDED_USER_LIB_PATH =
Expand All @@ -60,6 +62,7 @@ object FlinkEnvConfiguration {
"/appcom/Install/flink/lib",
"The local lib path of each user in Flink EngineConn."
)

val FLINK_YAML_MERGE_ENABLE = CommonVars("flink.yaml.merge.enable", true)
val FLINK_SHIP_DIRECTORIES = CommonVars("flink.yarn.ship-directories", "")
val FLINK_SHIP_REMOTE_DIRECTORIES = CommonVars("flink.yarn.remote.ship-directories", "")
Expand Down Expand Up @@ -137,9 +140,5 @@ object FlinkEnvConfiguration {
val FLINK_HANDSHAKE_WAIT_TIME_MILLS =
CommonVars("linkis.flink.handshake.wait.time.mills", 60 * 1000)

val FLINK_ENGINE_CONN_DEFAULT_JAVA_OPTS = CommonVars(
"wds.linkis.engineConn.javaOpts.default",
"-Xloggc:<LOG_DIR>/gc.log -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=20 -XX:GCLogFileSize=512M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -243,24 +243,44 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
}

protected def getExtractJavaOpts(envJavaOpts: String): String = {
// var defaultJavaOpts = FlinkEnvConfiguration.FLINK_ENGINE_CONN_DEFAULT_JAVA_OPTS.getValue
var defaultJavaOpts = ""
val yamlFilePath = "/appcom/config/flink-config/flink-conf.yaml"
val source = Source.fromFile(yamlFilePath)
try {
val yamlContent = source.mkString
val yaml = new Yaml()
val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]])
if (configMap.containsKey("env.java.opts")) {
defaultJavaOpts = configMap.get("env.java.opts").toString
val yamlFilePath = FLINK_CONF_DIR.getValue
val yamlFile = yamlFilePath + FLINK_CONF_YAML.getHotValue()
if (new File(yamlFile).exists()) {
val source = Source.fromFile(yamlFile)
try {
val yamlContent = source.mkString
val yaml = new Yaml()
val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]])
if (configMap.containsKey("env.java.opts")) {
defaultJavaOpts = configMap.get("env.java.opts").toString
}
} finally {
source.close()
}
} else {
val inputStream = getClass.getResourceAsStream(yamlFile)
if (inputStream != null) {
val source = Source.fromInputStream(inputStream)
try {
val yamlContent = source.mkString
val yaml = new Yaml()
val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]])
if (configMap.containsKey("env.java.opts")) {
defaultJavaOpts = configMap.get("env.java.opts").toString
}
} finally {
source.close()
}
} else {
throw new FileNotFoundException("YAML file not found in both file system and classpath.")
}
} finally {
source.close()
}
val merged = mergeAndDeduplicate(defaultJavaOpts, envJavaOpts)
merged
}


protected def mergeAndDeduplicate(str1: String, str2: String): String = {
val patternX = """-XX:([^\s]+)=([^\s]+)""".r
val keyValueMapX = patternX.findAllMatchIn(str2).map { matchResult =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.engineconnplugin.flink.factory

import org.junit.jupiter.api.Test

class TestFlinkEngineConnFactory {

@Test
def testMergeAndDeduplicate: Unit = {
var defaultJavaOpts = "-Da=3 -Db=4 -XXc=5 -Dk=a1=b";
var envJavaOpts = "-DjobName=0607_1 -Dlog4j.configuration=./log4j.properties -Da=1 -Dk=a1=c";
val merged = mergeAndDeduplicate(defaultJavaOpts, envJavaOpts)
}

protected def mergeAndDeduplicate(str1: String, str2: String): String = {
val patternX = """-XX:([^\s]+)=([^\s]+)""".r
val keyValueMapX = patternX.findAllMatchIn(str2).map { matchResult =>
val key = matchResult.group(1)
val value = matchResult.group(2)
(key, value)
}.toMap

val patternD = """-D([^\s]+)=([^\s]+)""".r
val keyValueMapD = patternD.findAllMatchIn(str2).map { matchResult =>
val key = matchResult.group(1)
val value = matchResult.group(2)
(key, value)
}.toMap
val xloggcPattern = """-Xloggc:[^\s]+""".r
val xloggcValueStr1 = xloggcPattern.findFirstMatchIn(str1).getOrElse("").toString
val xloggcValueStr2 = xloggcPattern.findFirstMatchIn(str2).getOrElse("").toString
var escapedXloggcValue = ""
var replaceStr1 = ""
var replaceStr2 = ""
if (xloggcValueStr1.nonEmpty && xloggcValueStr2.nonEmpty) {
escapedXloggcValue = xloggcValueStr2.replace("<", "\\<").replace(">", "\\>")
replaceStr1 = str1.replace(xloggcValueStr1, escapedXloggcValue)
replaceStr2 = str2.replace(xloggcValueStr2, "")
}
if (xloggcValueStr1.nonEmpty && xloggcValueStr2.isEmpty) {
escapedXloggcValue = xloggcValueStr1.replace("<", "\\<").replace(">", "\\>")
replaceStr1 = str1.replace(xloggcValueStr1, escapedXloggcValue)
replaceStr2 = str2
}
if (xloggcValueStr1.isEmpty && xloggcValueStr2.isEmpty) {
replaceStr1 = str1
replaceStr2 = str2
}
val MergedStringX = keyValueMapX.foldLeft(replaceStr1) { (result, entry) =>
val (key, value) = entry
val oldValue = s"$key=[^\\s]+"
val newValue = key + "=" + value
result.replaceAll(oldValue, newValue)
}

val MergedStringD = keyValueMapD.foldLeft(MergedStringX) { (result, entry) =>
val (key, value) = entry
val oldValue = s"$key=[^\\s]+"
val newValue = key + "=" + value
result.replaceAll(oldValue, newValue)
}
val javaOpts = (MergedStringD.split("\\s+") ++ replaceStr2.split("\\s+")).distinct.mkString(" ")
javaOpts
}

}

0 comments on commit 67756f4

Please sign in to comment.