Skip to content

Commit

Permalink
flink load default configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
yangwenzea committed Dec 6, 2023
1 parent b689f88 commit a9359e2
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,12 @@ object FlinkEnvConfiguration {
val FLINK_HANDSHAKE_WAIT_TIME_MILLS =
CommonVars("linkis.flink.handshake.wait.time.mills", 60 * 1000)

val FLINK_CONF_YAML = CommonVars("flink.conf.yaml.dir", "flink-conf.yaml")

val FLINK_YAML_MERGE_ENABLE = CommonVars("flink.yaml.merge.enable", true)

val FLINK_ENV_JAVA_OPTS =
CommonVars("flink.env.java.opts", "env.java.opts")


}
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,18 @@ import org.apache.linkis.engineconnplugin.flink.client.shims.config.Environment
import org.apache.linkis.engineconnplugin.flink.client.shims.config.entries.ExecutionEntry
import org.apache.linkis.engineconnplugin.flink.client.shims.errorcode.FlinkErrorCodeSummary._
import org.apache.linkis.engineconnplugin.flink.client.shims.exception.FlinkInitFailedException
import org.apache.linkis.engineconnplugin.flink.config.{
FlinkEnvConfiguration,
FlinkExecutionTargetType
}
import org.apache.linkis.engineconnplugin.flink.config.{FlinkEnvConfiguration, FlinkExecutionTargetType}
import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration._
import org.apache.linkis.engineconnplugin.flink.config.FlinkResourceConfiguration._
import org.apache.linkis.engineconnplugin.flink.context.{EnvironmentContext, FlinkEngineConnContext}
import org.apache.linkis.engineconnplugin.flink.setting.Settings
import org.apache.linkis.engineconnplugin.flink.util.{ClassUtil, ManagerUtil}
import org.apache.linkis.engineconnplugin.flink.util.{ClassUtil, FlinkValueFormatUtil, ManagerUtil}
import org.apache.linkis.governance.common.conf.GovernanceCommonConf
import org.apache.linkis.manager.engineplugin.common.conf.EnvConfiguration
import org.apache.linkis.manager.engineplugin.common.creation.{
ExecutorFactory,
MultiExecutorEngineConnFactory
}
import org.apache.linkis.manager.engineplugin.common.creation.{ExecutorFactory, MultiExecutorEngineConnFactory}
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine._
import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType

import org.apache.commons.lang3.StringUtils
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.configuration._
Expand All @@ -55,16 +48,17 @@ import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.yarn.configuration.{YarnConfigOptions, YarnDeploymentTarget}

import java.io.File
import java.io.{File, FileNotFoundException}
import java.net.URL
import java.text.MessageFormat
import java.time.Duration
import java.util
import java.util.{Collections, Locale}

import scala.collection.JavaConverters._

import com.google.common.collect.{Lists, Sets}
import org.yaml.snakeyaml.Yaml

import scala.io.Source

class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging {

Expand Down Expand Up @@ -196,7 +190,12 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numberOfTaskSlots)
// set extra configs
options.asScala.filter { case (key, _) => key.startsWith(FLINK_CONFIG_PREFIX) }.foreach {
case (key, value) => flinkConfig.setString(key.substring(FLINK_CONFIG_PREFIX.length), value)
case (key, value) =>
var flinkConfigValue = value
if (FlinkEnvConfiguration.FLINK_YAML_MERGE_ENABLE.getValue && key.equals(FLINK_CONFIG_PREFIX + FLINK_ENV_JAVA_OPTS.getValue)) {
flinkConfigValue = getExtractJavaOpts(value)
}
flinkConfig.setString(key.substring(FLINK_CONFIG_PREFIX.length), flinkConfigValue)
}
// set kerberos config
if (FLINK_KERBEROS_ENABLE.getValue(options)) {
Expand Down Expand Up @@ -295,6 +294,44 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
context
}

private def getExtractJavaOpts(envJavaOpts: String): String = {
var defaultJavaOpts = ""
val yamlFilePath = FLINK_CONF_DIR.getValue
val yamlFile = yamlFilePath + "/" + FLINK_CONF_YAML.getHotValue()
if (new File(yamlFile).exists()) {
val source = Source.fromFile(yamlFile)
try {
val yamlContent = source.mkString
val yaml = new Yaml()
val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]])
if (configMap.containsKey(FLINK_ENV_JAVA_OPTS.getValue)) {
defaultJavaOpts = configMap.get(FLINK_ENV_JAVA_OPTS.getValue).toString
}
} finally {
source.close()
}
} else {
val inputStream = getClass.getResourceAsStream(yamlFile)
if (inputStream != null) {
val source = Source.fromInputStream(inputStream)
try {
val yamlContent = source.mkString
val yaml = new Yaml()
val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]])
if (configMap.containsKey(FLINK_ENV_JAVA_OPTS.getValue)) {
defaultJavaOpts = configMap.get(FLINK_ENV_JAVA_OPTS.getValue).toString
}
} finally {
source.close()
}
} else {
throw new FileNotFoundException("YAML file not found in both file system and classpath.")
}
}
val merged = FlinkValueFormatUtil.mergeAndDeduplicate(defaultJavaOpts, envJavaOpts)
merged
}

protected def isOnceEngineConn(labels: util.List[Label[_]]): Boolean = {
val engineConnModeLabel = getEngineConnModeLabel(labels)
engineConnModeLabel != null && (EngineConnMode.toEngineConnMode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,56 @@ object FlinkValueFormatUtil {
case _ => null
}

def mergeAndDeduplicate(defaultJavaOpts: String, envJavaOpts: String): String = {
val patternX = """-XX:([^\s]+)=([^\s]+)""".r
val keyValueMapX = patternX.findAllMatchIn(envJavaOpts).map { matchResult =>
val key = matchResult.group(1)
val value = matchResult.group(2)
(key, value)
}.toMap

val patternD = """-D([^\s]+)=([^\s]+)""".r
val keyValueMapD = patternD.findAllMatchIn(envJavaOpts).map { matchResult =>
val key = matchResult.group(1)
val value = matchResult.group(2)
(key, value)
}.toMap
val xloggcPattern = """-Xloggc:[^\s]+""".r
val xloggcValueStr1 = xloggcPattern.findFirstMatchIn(defaultJavaOpts).getOrElse("").toString
val xloggcValueStr2 = xloggcPattern.findFirstMatchIn(envJavaOpts).getOrElse("").toString
var escapedXloggcValue = ""
var replaceStr1 = ""
var replaceStr2 = ""
if (xloggcValueStr1.nonEmpty && xloggcValueStr2.nonEmpty) {
escapedXloggcValue = xloggcValueStr2.replace("<", "\\<").replace(">", "\\>")
replaceStr1 = defaultJavaOpts.replace(xloggcValueStr1, escapedXloggcValue)
replaceStr2 = envJavaOpts.replace(xloggcValueStr2, "")
}
if (xloggcValueStr1.nonEmpty && xloggcValueStr2.isEmpty) {
escapedXloggcValue = xloggcValueStr1.replace("<", "\\<").replace(">", "\\>")
replaceStr1 = defaultJavaOpts.replace(xloggcValueStr1, escapedXloggcValue)
replaceStr2 = envJavaOpts
}
if (xloggcValueStr1.isEmpty && xloggcValueStr2.isEmpty) {
replaceStr1 = defaultJavaOpts
replaceStr2 = envJavaOpts
}
val MergedStringX = keyValueMapX.foldLeft(replaceStr1) { (result, entry) =>
val (key, value) = entry
val oldValue = s"$key=[^\\s]+"
val newValue = key + "=" + value
result.replaceAll(oldValue, newValue)
}

val MergedStringD = keyValueMapD.foldLeft(MergedStringX) { (result, entry) =>
val (key, value) = entry
val oldValue = s"$key=[^\\s]+"
val newValue = key + "=" + value
result.replaceAll(oldValue, newValue)
}
val javaOpts = (MergedStringD.split("\\s+") ++ replaceStr2.split("\\s+")).distinct.mkString(" ")
javaOpts
}


}

0 comments on commit a9359e2

Please sign in to comment.