Skip to content

Commit

Permalink
code format
Browse files Browse the repository at this point in the history
  • Loading branch information
yangwenzea committed Dec 6, 2023
1 parent 439c85f commit 43b89a0
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,5 +181,4 @@ object FlinkEnvConfiguration {
val FLINK_ENV_JAVA_OPTS =
CommonVars("flink.env.java.opts", "env.java.opts")


}
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,25 @@ import org.apache.linkis.engineconnplugin.flink.client.shims.config.Environment
import org.apache.linkis.engineconnplugin.flink.client.shims.config.entries.ExecutionEntry
import org.apache.linkis.engineconnplugin.flink.client.shims.errorcode.FlinkErrorCodeSummary._
import org.apache.linkis.engineconnplugin.flink.client.shims.exception.FlinkInitFailedException
import org.apache.linkis.engineconnplugin.flink.config.{FlinkEnvConfiguration, FlinkExecutionTargetType}
import org.apache.linkis.engineconnplugin.flink.config.{
FlinkEnvConfiguration,
FlinkExecutionTargetType
}
import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration._
import org.apache.linkis.engineconnplugin.flink.config.FlinkResourceConfiguration._
import org.apache.linkis.engineconnplugin.flink.context.{EnvironmentContext, FlinkEngineConnContext}
import org.apache.linkis.engineconnplugin.flink.setting.Settings
import org.apache.linkis.engineconnplugin.flink.util.{ClassUtil, FlinkValueFormatUtil, ManagerUtil}
import org.apache.linkis.governance.common.conf.GovernanceCommonConf
import org.apache.linkis.manager.engineplugin.common.conf.EnvConfiguration
import org.apache.linkis.manager.engineplugin.common.creation.{ExecutorFactory, MultiExecutorEngineConnFactory}
import org.apache.linkis.manager.engineplugin.common.creation.{
ExecutorFactory,
MultiExecutorEngineConnFactory
}
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine._
import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType

import org.apache.commons.lang3.StringUtils
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.configuration._
Expand All @@ -54,12 +61,13 @@ import java.text.MessageFormat
import java.time.Duration
import java.util
import java.util.{Collections, Locale}

import scala.collection.JavaConverters._
import scala.io.Source

import com.google.common.collect.{Lists, Sets}
import org.yaml.snakeyaml.Yaml

import scala.io.Source

class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging {

override protected def createEngineConnSession(
Expand Down Expand Up @@ -192,7 +200,10 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
options.asScala.filter { case (key, _) => key.startsWith(FLINK_CONFIG_PREFIX) }.foreach {
case (key, value) =>
var flinkConfigValue = value
if (FlinkEnvConfiguration.FLINK_YAML_MERGE_ENABLE.getValue && key.equals(FLINK_CONFIG_PREFIX + FLINK_ENV_JAVA_OPTS.getValue)) {
if (
FlinkEnvConfiguration.FLINK_YAML_MERGE_ENABLE.getValue && key
.equals(FLINK_CONFIG_PREFIX + FLINK_ENV_JAVA_OPTS.getValue)
) {
flinkConfigValue = getExtractJavaOpts(value)
}
flinkConfig.setString(key.substring(FLINK_CONFIG_PREFIX.length), flinkConfigValue)
Expand Down Expand Up @@ -294,7 +305,7 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
context
}

private def getExtractJavaOpts(envJavaOpts: String): String = {
private def getExtractJavaOpts(envJavaOpts: String): String = {
var defaultJavaOpts = ""
val yamlFilePath = FLINK_CONF_DIR.getValue
val yamlFile = yamlFilePath + "/" + FLINK_CONF_YAML.getHotValue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,24 @@ object FlinkValueFormatUtil {

def mergeAndDeduplicate(defaultJavaOpts: String, envJavaOpts: String): String = {
val patternX = """-XX:([^\s]+)=([^\s]+)""".r
val keyValueMapX = patternX.findAllMatchIn(envJavaOpts).map { matchResult =>
val key = matchResult.group(1)
val value = matchResult.group(2)
(key, value)
}.toMap
val keyValueMapX = patternX
.findAllMatchIn(envJavaOpts)
.map { matchResult =>
val key = matchResult.group(1)
val value = matchResult.group(2)
(key, value)
}
.toMap

val patternD = """-D([^\s]+)=([^\s]+)""".r
val keyValueMapD = patternD.findAllMatchIn(envJavaOpts).map { matchResult =>
val key = matchResult.group(1)
val value = matchResult.group(2)
(key, value)
}.toMap
val keyValueMapD = patternD
.findAllMatchIn(envJavaOpts)
.map { matchResult =>
val key = matchResult.group(1)
val value = matchResult.group(2)
(key, value)
}
.toMap
val xloggcPattern = """-Xloggc:[^\s]+""".r
val xloggcValueStr1 = xloggcPattern.findFirstMatchIn(defaultJavaOpts).getOrElse("").toString
val xloggcValueStr2 = xloggcPattern.findFirstMatchIn(envJavaOpts).getOrElse("").toString
Expand Down Expand Up @@ -87,5 +93,4 @@ object FlinkValueFormatUtil {
javaOpts
}


}

0 comments on commit 43b89a0

Please sign in to comment.