Skip to content

Commit

Permalink
Dev 1.1.15 webank streamis fink load yaml (#303)
Browse files Browse the repository at this point in the history
* flink load default configuration
  • Loading branch information
yangwenzea authored Oct 11, 2023
1 parent e46e07e commit a6fa3dd
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ object FlinkEnvConfiguration {
FLINK_HOME.getValue + s"/lib/flink-dist_2.11-${FLINK_VERSION.getValue}.jar"
)

val FLINK_CONF_YAML = CommonVars("flink.conf.yaml.dir", "flink-conf.yaml")

val FLINK_PROVIDED_LIB_PATH = CommonVars("flink.lib.path", "")

val FLINK_PROVIDED_USER_LIB_PATH =
Expand All @@ -61,6 +63,7 @@ object FlinkEnvConfiguration {
"The local lib path of each user in Flink EngineConn."
)

val FLINK_YAML_MERGE_ENABLE = CommonVars("flink.yaml.merge.enable", true)
val FLINK_SHIP_DIRECTORIES = CommonVars("flink.yarn.ship-directories", "")
val FLINK_SHIP_REMOTE_DIRECTORIES = CommonVars("flink.yarn.remote.ship-directories", "")

Expand Down Expand Up @@ -137,4 +140,7 @@ object FlinkEnvConfiguration {
val FLINK_HANDSHAKE_WAIT_TIME_MILLS =
CommonVars("linkis.flink.handshake.wait.time.mills", 60 * 1000)

val FLINK_ENV_JAVA_OPTS =
CommonVars("flink.env.java.opts", "env.java.opts")

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

package org.apache.linkis.engineconnplugin.flink.factory

import org.apache.linkis.common.conf.CommonVars
import org.apache.linkis.common.utils.{ClassUtils, Logging}
import org.apache.linkis.common.utils.{ClassUtils, Logging, Utils}
import org.apache.linkis.engineconn.acessible.executor.conf.AccessibleExecutorConfiguration
import org.apache.linkis.engineconn.common.creation.EngineCreationContext
import org.apache.linkis.engineconn.launch.EngineConnServer
Expand All @@ -32,35 +31,32 @@ import org.apache.linkis.engineconnplugin.flink.context.{EnvironmentContext, Fli
import org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary._
import org.apache.linkis.engineconnplugin.flink.exception.FlinkInitFailedException
import org.apache.linkis.engineconnplugin.flink.setting.Settings
import org.apache.linkis.engineconnplugin.flink.util.{ClassUtil, ManagerUtil}
import org.apache.linkis.engineconnplugin.flink.util.{ClassUtil, FlinkValueFormatUtil, ManagerUtil}
import org.apache.linkis.governance.common.conf.GovernanceCommonConf
import org.apache.linkis.manager.engineplugin.common.conf.EnvConfiguration
import org.apache.linkis.manager.engineplugin.common.creation.{
ExecutorFactory,
MultiExecutorEngineConnFactory
}
import org.apache.linkis.manager.engineplugin.common.creation.{ExecutorFactory, MultiExecutorEngineConnFactory}
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine._
import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType
import org.apache.linkis.protocol.utils.TaskUtils

import org.apache.commons.lang3.StringUtils
import org.apache.flink.configuration._
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.table.api.Expressions.e
import org.apache.flink.table.api.e
import org.apache.flink.yarn.configuration.{YarnConfigOptions, YarnDeploymentTarget}

import java.io.File
import java.io.{File, FileNotFoundException}
import java.net.URL
import java.text.MessageFormat
import java.time.Duration
import java.util
import java.util.{Collections, Locale}

import scala.collection.JavaConverters._

import scala.io.Source
import com.google.common.collect.{Lists, Sets}
import org.yaml.snakeyaml.Yaml

class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging {

Expand Down Expand Up @@ -174,7 +170,15 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numberOfTaskSlots)
// set extra configs
options.asScala.filter { case (key, _) => key.startsWith(FLINK_CONFIG_PREFIX) }.foreach {
case (key, value) => flinkConfig.setString(key.substring(FLINK_CONFIG_PREFIX.length), value)
case (key, value) =>
var flinkConfigValue = value
if (
FlinkEnvConfiguration.FLINK_YAML_MERGE_ENABLE.getValue && key
.equals(FLINK_CONFIG_PREFIX + FLINK_ENV_JAVA_OPTS.getValue)
) {
flinkConfigValue = getExtractJavaOpts(value)
}
flinkConfig.setString(key.substring(FLINK_CONFIG_PREFIX.length), flinkConfigValue)
}
// set kerberos config
if (FLINK_KERBEROS_ENABLE.getValue(options)) {
Expand Down Expand Up @@ -232,6 +236,46 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
context
}

private def getExtractJavaOpts(envJavaOpts: String): String = {
var defaultJavaOpts = ""
val yamlFilePath = FLINK_CONF_DIR.getValue
val yamlFile = yamlFilePath + "/" + FLINK_CONF_YAML.getHotValue()
if (new File(yamlFile).exists()) {
val source = Source.fromFile(yamlFile)
try {
val yamlContent = source.mkString
val yaml = new Yaml()
val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]])
if (configMap.containsKey(FLINK_ENV_JAVA_OPTS.getValue)) {
defaultJavaOpts = configMap.get(FLINK_ENV_JAVA_OPTS.getValue).toString
}
} finally {
source.close()
}
} else {
val inputStream = getClass.getResourceAsStream(yamlFile)
if (inputStream != null) {
val source = Source.fromInputStream(inputStream)
try {
val yamlContent = source.mkString
val yaml = new Yaml()
val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]])
if (configMap.containsKey(FLINK_ENV_JAVA_OPTS.getValue)) {
defaultJavaOpts = configMap.get(FLINK_ENV_JAVA_OPTS.getValue).toString
}
} finally {
source.close()
}
} else {
throw new FileNotFoundException("YAML file not found in both file system and classpath.")
}
}
val merged = FlinkValueFormatUtil.mergeAndDeduplicate(defaultJavaOpts, envJavaOpts)
merged
}



protected def isOnceEngineConn(labels: util.List[Label[_]]): Boolean = {
val engineConnModeLabel = getEngineConnModeLabel(labels)
engineConnModeLabel != null && (EngineConnMode.toEngineConnMode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,13 @@ import org.apache.linkis.manager.engineplugin.common.launch.process.LaunchConsta
}
import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel

import java.io.{File, FileInputStream}
import java.util

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.yaml.snakeyaml.Yaml

class FlinkEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,55 @@ object FlinkValueFormatUtil {
case _ => null
}

def mergeAndDeduplicate(defaultJavaOpts: String, envJavaOpts: String): String = {
val patternX = """-XX:([^\s]+)=([^\s]+)""".r
val keyValueMapX = patternX.findAllMatchIn(envJavaOpts).map { matchResult =>
val key = matchResult.group(1)
val value = matchResult.group(2)
(key, value)
}.toMap

val patternD = """-D([^\s]+)=([^\s]+)""".r
val keyValueMapD = patternD.findAllMatchIn(envJavaOpts).map { matchResult =>
val key = matchResult.group(1)
val value = matchResult.group(2)
(key, value)
}.toMap
val xloggcPattern = """-Xloggc:[^\s]+""".r
val xloggcValueStr1 = xloggcPattern.findFirstMatchIn(defaultJavaOpts).getOrElse("").toString
val xloggcValueStr2 = xloggcPattern.findFirstMatchIn(envJavaOpts).getOrElse("").toString
var escapedXloggcValue = ""
var replaceStr1 = ""
var replaceStr2 = ""
if (xloggcValueStr1.nonEmpty && xloggcValueStr2.nonEmpty) {
escapedXloggcValue = xloggcValueStr2.replace("<", "\\<").replace(">", "\\>")
replaceStr1 = defaultJavaOpts.replace(xloggcValueStr1, escapedXloggcValue)
replaceStr2 = envJavaOpts.replace(xloggcValueStr2, "")
}
if (xloggcValueStr1.nonEmpty && xloggcValueStr2.isEmpty) {
escapedXloggcValue = xloggcValueStr1.replace("<", "\\<").replace(">", "\\>")
replaceStr1 = defaultJavaOpts.replace(xloggcValueStr1, escapedXloggcValue)
replaceStr2 = envJavaOpts
}
if (xloggcValueStr1.isEmpty && xloggcValueStr2.isEmpty) {
replaceStr1 = defaultJavaOpts
replaceStr2 = envJavaOpts
}
val MergedStringX = keyValueMapX.foldLeft(replaceStr1) { (result, entry) =>
val (key, value) = entry
val oldValue = s"$key=[^\\s]+"
val newValue = key + "=" + value
result.replaceAll(oldValue, newValue)
}

val MergedStringD = keyValueMapD.foldLeft(MergedStringX) { (result, entry) =>
val (key, value) = entry
val oldValue = s"$key=[^\\s]+"
val newValue = key + "=" + value
result.replaceAll(oldValue, newValue)
}
val javaOpts = (MergedStringD.split("\\s+") ++ replaceStr2.split("\\s+")).distinct.mkString(" ")
javaOpts
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.engineconnplugin.flink.factory

import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration.{FLINK_CONF_DIR, FLINK_CONF_YAML, FLINK_ENV_JAVA_OPTS}
import org.apache.linkis.engineconnplugin.flink.util.FlinkValueFormatUtil
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.yaml.snakeyaml.Yaml

import java.io.{File, FileNotFoundException}
import java.util
import scala.io.Source

class TestFlinkEngineConnFactory {

@Test
private def getExtractJavaOpts(envJavaOpts: String): String = {
var defaultJavaOpts = ""
val yamlFilePath = FLINK_CONF_DIR.getValue
val yamlFile = yamlFilePath + "/" + FLINK_CONF_YAML.getHotValue()
if (new File(yamlFile).exists()) {
val source = Source.fromFile(yamlFile)
try {
val yamlContent = source.mkString
val yaml = new Yaml()
val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]])
if (configMap.containsKey(FLINK_ENV_JAVA_OPTS.getValue)) {
defaultJavaOpts = configMap.get(FLINK_ENV_JAVA_OPTS.getValue).toString
}
} finally {
source.close()
}
} else {
val inputStream = getClass.getResourceAsStream(yamlFile)
if (inputStream != null) {
val source = Source.fromInputStream(inputStream)
try {
val yamlContent = source.mkString
val yaml = new Yaml()
val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]])
if (configMap.containsKey(FLINK_ENV_JAVA_OPTS.getValue)) {
defaultJavaOpts = configMap.get(FLINK_ENV_JAVA_OPTS.getValue).toString
}
} finally {
source.close()
}
} else {
throw new FileNotFoundException("YAML file not found in both file system and classpath.")
}
}
val merged = FlinkValueFormatUtil.mergeAndDeduplicate(defaultJavaOpts, envJavaOpts)
merged
}

@Test
def testMergeAndDeduplicate: Unit = {
val defaultJavaOpts = "-Da=3 -Db=4 -XXc=5 -Dk=a1=b";
val envJavaOpts = "-DjobName=0607_1 -Dlog4j.configuration=./log4j.properties -Da=1 -Dk=a1=c";
val merged = FlinkValueFormatUtil.mergeAndDeduplicate(defaultJavaOpts, envJavaOpts)
assertEquals("-Da=1 -Db=4 -XXc=5 -Dk=a1=c -DjobName=0607_1 -Dlog4j.configuration=./log4j.properties", merged)
}
}

0 comments on commit a6fa3dd

Please sign in to comment.