diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala index 857faeed8d..dd3751e26e 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala @@ -44,6 +44,8 @@ object FlinkEnvConfiguration { FLINK_HOME.getValue + s"/lib/flink-dist_2.11-${FLINK_VERSION.getValue}.jar" ) + val FLINK_CONF_YAML = CommonVars("flink.conf.yaml.dir", "flink-conf.yaml") + val FLINK_PROVIDED_LIB_PATH = CommonVars("flink.lib.path", "") val FLINK_PROVIDED_USER_LIB_PATH = @@ -61,6 +63,7 @@ object FlinkEnvConfiguration { "The local lib path of each user in Flink EngineConn." ) + val FLINK_YAML_MERGE_ENABLE = CommonVars("flink.yaml.merge.enable", true) val FLINK_SHIP_DIRECTORIES = CommonVars("flink.yarn.ship-directories", "") val FLINK_SHIP_REMOTE_DIRECTORIES = CommonVars("flink.yarn.remote.ship-directories", "") @@ -137,4 +140,7 @@ object FlinkEnvConfiguration { val FLINK_HANDSHAKE_WAIT_TIME_MILLS = CommonVars("linkis.flink.handshake.wait.time.mills", 60 * 1000) + val FLINK_ENV_JAVA_OPTS = + CommonVars("flink.env.java.opts", "env.java.opts") + } diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala index 35961a652f..5d55b5d87f 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala @@ -17,8 +17,7 @@ package org.apache.linkis.engineconnplugin.flink.factory -import org.apache.linkis.common.conf.CommonVars -import org.apache.linkis.common.utils.{ClassUtils, Logging} +import org.apache.linkis.common.utils.{ClassUtils, Logging, Utils} import org.apache.linkis.engineconn.acessible.executor.conf.AccessibleExecutorConfiguration import org.apache.linkis.engineconn.common.creation.EngineCreationContext import org.apache.linkis.engineconn.launch.EngineConnServer @@ -32,35 +31,32 @@ import org.apache.linkis.engineconnplugin.flink.context.{EnvironmentContext, Fli import org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary._ import org.apache.linkis.engineconnplugin.flink.exception.FlinkInitFailedException import org.apache.linkis.engineconnplugin.flink.setting.Settings -import org.apache.linkis.engineconnplugin.flink.util.{ClassUtil, ManagerUtil} +import org.apache.linkis.engineconnplugin.flink.util.{ClassUtil, FlinkValueFormatUtil, ManagerUtil} import org.apache.linkis.governance.common.conf.GovernanceCommonConf import org.apache.linkis.manager.engineplugin.common.conf.EnvConfiguration -import org.apache.linkis.manager.engineplugin.common.creation.{ - ExecutorFactory, - MultiExecutorEngineConnFactory -} +import org.apache.linkis.manager.engineplugin.common.creation.{ExecutorFactory, MultiExecutorEngineConnFactory} import org.apache.linkis.manager.label.entity.Label import org.apache.linkis.manager.label.entity.engine._ import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType -import org.apache.linkis.protocol.utils.TaskUtils - import org.apache.commons.lang3.StringUtils import org.apache.flink.configuration._ import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup +import org.apache.flink.table.api.Expressions.e +import org.apache.flink.table.api.e import org.apache.flink.yarn.configuration.{YarnConfigOptions, YarnDeploymentTarget} -import java.io.File +import java.io.{File, FileNotFoundException} import java.net.URL import java.text.MessageFormat import java.time.Duration import java.util import java.util.{Collections, Locale} - import scala.collection.JavaConverters._ - +import scala.io.Source import com.google.common.collect.{Lists, Sets} +import org.yaml.snakeyaml.Yaml class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging { @@ -174,7 +170,15 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numberOfTaskSlots) // set extra configs options.asScala.filter { case (key, _) => key.startsWith(FLINK_CONFIG_PREFIX) }.foreach { - case (key, value) => flinkConfig.setString(key.substring(FLINK_CONFIG_PREFIX.length), value) + case (key, value) => + var flinkConfigValue = value + if ( + FlinkEnvConfiguration.FLINK_YAML_MERGE_ENABLE.getValue && key + .equals(FLINK_CONFIG_PREFIX + FLINK_ENV_JAVA_OPTS.getValue) + ) { + flinkConfigValue = getExtractJavaOpts(value) + } + flinkConfig.setString(key.substring(FLINK_CONFIG_PREFIX.length), flinkConfigValue) } // set kerberos config if (FLINK_KERBEROS_ENABLE.getValue(options)) { @@ -232,6 +236,46 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging context } + private def getExtractJavaOpts(envJavaOpts: String): String = { + var defaultJavaOpts = "" + val yamlFilePath = FLINK_CONF_DIR.getValue + val yamlFile = yamlFilePath + "/" + FLINK_CONF_YAML.getHotValue() + if (new File(yamlFile).exists()) { + val source = Source.fromFile(yamlFile) + try { + val yamlContent = source.mkString + val yaml = new Yaml() + val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]]) + if (configMap.containsKey(FLINK_ENV_JAVA_OPTS.getValue)) { + defaultJavaOpts = configMap.get(FLINK_ENV_JAVA_OPTS.getValue).toString + } + } finally { + source.close() + } + } else { + val inputStream = getClass.getResourceAsStream(yamlFile) + if (inputStream != null) { + val source = Source.fromInputStream(inputStream) + try { + val yamlContent = source.mkString + val yaml = new Yaml() + val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]]) + if (configMap.containsKey(FLINK_ENV_JAVA_OPTS.getValue)) { + defaultJavaOpts = configMap.get(FLINK_ENV_JAVA_OPTS.getValue).toString + } + } finally { + source.close() + } + } else { + throw new FileNotFoundException("YAML file not found in both file system and classpath.") + } + } + val merged = FlinkValueFormatUtil.mergeAndDeduplicate(defaultJavaOpts, envJavaOpts) + merged + } + + + protected def isOnceEngineConn(labels: util.List[Label[_]]): Boolean = { val engineConnModeLabel = getEngineConnModeLabel(labels) engineConnModeLabel != null && (EngineConnMode.toEngineConnMode( diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala index 70b3ad1b20..9f686b929d 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala @@ -39,9 +39,13 @@ import org.apache.linkis.manager.engineplugin.common.launch.process.LaunchConsta } import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel +import java.io.{File, FileInputStream} import java.util import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import org.yaml.snakeyaml.Yaml class FlinkEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder { diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/FlinkValueFormatUtil.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/FlinkValueFormatUtil.scala index 62782507eb..8335eaeb85 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/FlinkValueFormatUtil.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/FlinkValueFormatUtil.scala @@ -36,4 +36,55 @@ object FlinkValueFormatUtil { case _ => null } + def mergeAndDeduplicate(defaultJavaOpts: String, envJavaOpts: String): String = { + val patternX = """-XX:([^\s]+)=([^\s]+)""".r + val keyValueMapX = patternX.findAllMatchIn(envJavaOpts).map { matchResult => + val key = matchResult.group(1) + val value = matchResult.group(2) + (key, value) + }.toMap + + val patternD = """-D([^\s]+)=([^\s]+)""".r + val keyValueMapD = patternD.findAllMatchIn(envJavaOpts).map { matchResult => + val key = matchResult.group(1) + val value = matchResult.group(2) + (key, value) + }.toMap + val xloggcPattern = """-Xloggc:[^\s]+""".r + val xloggcValueStr1 = xloggcPattern.findFirstMatchIn(defaultJavaOpts).getOrElse("").toString + val xloggcValueStr2 = xloggcPattern.findFirstMatchIn(envJavaOpts).getOrElse("").toString + var escapedXloggcValue = "" + var replaceStr1 = "" + var replaceStr2 = "" + if (xloggcValueStr1.nonEmpty && xloggcValueStr2.nonEmpty) { + escapedXloggcValue = xloggcValueStr2.replace("<", "\\<").replace(">", "\\>") + replaceStr1 = defaultJavaOpts.replace(xloggcValueStr1, escapedXloggcValue) + replaceStr2 = envJavaOpts.replace(xloggcValueStr2, "") + } + if (xloggcValueStr1.nonEmpty && xloggcValueStr2.isEmpty) { + escapedXloggcValue = xloggcValueStr1.replace("<", "\\<").replace(">", "\\>") + replaceStr1 = defaultJavaOpts.replace(xloggcValueStr1, escapedXloggcValue) + replaceStr2 = envJavaOpts + } + if (xloggcValueStr1.isEmpty && xloggcValueStr2.isEmpty) { + replaceStr1 = defaultJavaOpts + replaceStr2 = envJavaOpts + } + val MergedStringX = keyValueMapX.foldLeft(replaceStr1) { (result, entry) => + val (key, value) = entry + val oldValue = s"$key=[^\\s]+" + val newValue = key + "=" + value + result.replaceAll(oldValue, newValue) + } + + val MergedStringD = keyValueMapD.foldLeft(MergedStringX) { (result, entry) => + val (key, value) = entry + val oldValue = s"$key=[^\\s]+" + val newValue = key + "=" + value + result.replaceAll(oldValue, newValue) + } + val javaOpts = (MergedStringD.split("\\s+") ++ replaceStr2.split("\\s+")).distinct.mkString(" ") + javaOpts + } + } diff --git a/linkis-engineconn-plugins/flink/src/test/scala/org.apache.linkis.engineconnplugin.flink/factory/TestFlinkEngineConnFactory.scala b/linkis-engineconn-plugins/flink/src/test/scala/org.apache.linkis.engineconnplugin.flink/factory/TestFlinkEngineConnFactory.scala new file mode 100644 index 0000000000..ef8dabf14e --- /dev/null +++ b/linkis-engineconn-plugins/flink/src/test/scala/org.apache.linkis.engineconnplugin.flink/factory/TestFlinkEngineConnFactory.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.factory + +import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration.{FLINK_CONF_DIR, FLINK_CONF_YAML, FLINK_ENV_JAVA_OPTS} +import org.apache.linkis.engineconnplugin.flink.util.FlinkValueFormatUtil +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.yaml.snakeyaml.Yaml + +import java.io.{File, FileNotFoundException} +import java.util +import scala.io.Source + +class TestFlinkEngineConnFactory { + + @Test + private def getExtractJavaOpts(envJavaOpts: String): String = { + var defaultJavaOpts = "" + val yamlFilePath = FLINK_CONF_DIR.getValue + val yamlFile = yamlFilePath + "/" + FLINK_CONF_YAML.getHotValue() + if (new File(yamlFile).exists()) { + val source = Source.fromFile(yamlFile) + try { + val yamlContent = source.mkString + val yaml = new Yaml() + val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]]) + if (configMap.containsKey(FLINK_ENV_JAVA_OPTS.getValue)) { + defaultJavaOpts = configMap.get(FLINK_ENV_JAVA_OPTS.getValue).toString + } + } finally { + source.close() + } + } else { + val inputStream = getClass.getResourceAsStream(yamlFile) + if (inputStream != null) { + val source = Source.fromInputStream(inputStream) + try { + val yamlContent = source.mkString + val yaml = new Yaml() + val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]]) + if (configMap.containsKey(FLINK_ENV_JAVA_OPTS.getValue)) { + defaultJavaOpts = configMap.get(FLINK_ENV_JAVA_OPTS.getValue).toString + } + } finally { + source.close() + } + } else { + throw new FileNotFoundException("YAML file not found in both file system and classpath.") + } + } + val merged = FlinkValueFormatUtil.mergeAndDeduplicate(defaultJavaOpts, envJavaOpts) + merged + } + + @Test + def testMergeAndDeduplicate: Unit = { + val defaultJavaOpts = "-Da=3 -Db=4 -XXc=5 -Dk=a1=b"; + val envJavaOpts = "-DjobName=0607_1 -Dlog4j.configuration=./log4j.properties -Da=1 -Dk=a1=c"; + val merged = FlinkValueFormatUtil.mergeAndDeduplicate(defaultJavaOpts, envJavaOpts) + assertEquals("-Da=1 -Db=4 -XXc=5 -Dk=a1=c -DjobName=0607_1 -Dlog4j.configuration=./log4j.properties", merged) + } +}