From 4960c3d31a6912d30c5fc8a03082753a21f8455e Mon Sep 17 00:00:00 2001 From: lemonjuice <86357693+lemonjuicelove@users.noreply.github.com> Date: Thu, 7 Sep 2023 15:15:05 +0800 Subject: [PATCH] [Linkis-1.1.15-Code Review] fix code review (#285) * fix code review * fix code review2 * fix code review3 * fix get entrance log error * code format * add hdfs path * add try catch --------- Co-authored-by: aiceflower Co-authored-by: Casion --- .../linkis/entrance/log/CacheLogReader.scala | 12 +++-- .../manager/am/conf/AMConfiguration.scala | 14 ++++++ .../engine/DefaultEngineCreateService.scala | 6 +-- .../DriverAndYarnReqResourceService.scala | 47 +++++++++---------- .../utils/AcrossClusterRulesJudgeUtils.scala | 38 ++++++++------- .../conf/AcrossClusterConfiguration.scala | 27 ----------- .../conf/AcrossClusterRuleKeys.java | 43 +++++++++++++++++ .../api/AcrossClusterRuleRestfulApi.java | 3 +- .../impl/AcrossClusterRuleServiceImpl.java | 10 ---- .../configuration/util/CommonUtils.java | 26 +++++----- 10 files changed, 125 insertions(+), 101 deletions(-) delete mode 100644 linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/conf/AcrossClusterConfiguration.scala create mode 100644 linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/conf/AcrossClusterRuleKeys.java diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogReader.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogReader.scala index fae6648752..748f82df4b 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogReader.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogReader.scala @@ -49,11 +49,13 @@ class CacheLogReader(logPath: String, charset: String, sharedCache: Cache, user: if (fileSystem == null) lock synchronized { if (fileSystem == null) { - if (StorageUtils.isHDFSPath(fsPath) && EntranceConfiguration.ENABLE_HDFS_JVM_USER) { - FSFactory.getFs(fsPath).asInstanceOf[FileSystem] - } else { - fileSystem = FSFactory.getFsByProxyUser(fsPath, user) - } + fileSystem = + if (StorageUtils.isHDFSPath(fsPath) && EntranceConfiguration.ENABLE_HDFS_JVM_USER) { + FSFactory.getFs(new FsPath(logPath)).asInstanceOf[FileSystem] + } else { + FSFactory.getFsByProxyUser(new FsPath(logPath), user).asInstanceOf[FileSystem] + } + fileSystem.init(new util.HashMap[String, String]()) } } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/conf/AMConfiguration.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/conf/AMConfiguration.scala index c730edf0fd..edd3c001b5 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/conf/AMConfiguration.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/conf/AMConfiguration.scala @@ -23,6 +23,20 @@ import org.apache.linkis.manager.common.entity.enumeration.MaintainType object AMConfiguration { + val YARN_QUEUE_NAME_CONFIG_KEY = "wds.linkis.rm.yarnqueue" + + val ACROSS_CLUSTER_QUEUE_SUFFIX = "queueRuleSuffix" + + val ACROSS_CLUSTER_TASK = "acrossClusterTask" + + val ACROSS_CLUSTER_CPU_THRESHOLD = "CPUThreshold" + + val ACROSS_CLUSTER_MEMORY_THRESHOLD = "MemoryThreshold" + + val ACROSS_CLUSTER_CPU_PERCENTAGE_THRESHOLD = "CPUPercentageThreshold" + + val ACROSS_CLUSTER_MEMORY_PERCENTAGE_THRESHOLD = "MemoryPercentageThreshold" + val ECM_ADMIN_OPERATIONS = CommonVars("wds.linkis.governance.admin.operations", "") val ENGINE_START_MAX_TIME = diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala index f8210302d9..78a8adefd2 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala @@ -293,11 +293,11 @@ class DefaultEngineCreateService }) } - val queueRuleSuffix = props.get("queueRuleSuffix") + val queueRuleSuffix = props.get(AMConfiguration.ACROSS_CLUSTER_QUEUE_SUFFIX) if (StringUtils.isNotBlank(queueRuleSuffix)) { - val queueName = props.getOrDefault("wds.linkis.rm.yarnqueue", "default") + val queueName = props.getOrDefault(AMConfiguration.YARN_QUEUE_NAME_CONFIG_KEY, "default") val newQueueName = queueName + "_" + queueRuleSuffix - props.put("wds.linkis.rm.yarnqueue", newQueueName) + props.put(AMConfiguration.YARN_QUEUE_NAME_CONFIG_KEY, newQueueName) logger.info( s"Switch queues according to queueRule with queue name : $queueName to $newQueueName" ) diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DriverAndYarnReqResourceService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DriverAndYarnReqResourceService.scala index 7beecce634..9a3b214318 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DriverAndYarnReqResourceService.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DriverAndYarnReqResourceService.scala @@ -17,6 +17,7 @@ package org.apache.linkis.manager.rm.service.impl +import org.apache.linkis.manager.am.conf.AMConfiguration import org.apache.linkis.manager.common.constant.RMConstant import org.apache.linkis.manager.common.entity.resource._ import org.apache.linkis.manager.common.entity.resource.ResourceType.DriverAndYarn @@ -79,15 +80,13 @@ class DriverAndYarnReqResourceService( val user = labelContainer.getUserCreatorLabel.getUser val creator = labelContainer.getUserCreatorLabel.getCreator val properties = engineCreateRequest.getProperties - val acrossClusterTask = properties.getOrDefault("acrossClusterTask", "false") - val CPUThreshold = properties.get("CPUThreshold") - val MemoryThreshold = properties.get("MemoryThreshold") - val CPUPercentageThreshold = properties.get("CPUPercentageThreshold") - val MemoryPercentageThreshold = properties.get("MemoryPercentageThreshold") - - logger.info( - s"user: $user, creator: $creator, acrossClusterTask: $acrossClusterTask, cross cluster judge" - ) + val acrossClusterTask = properties.getOrDefault(AMConfiguration.ACROSS_CLUSTER_TASK, "false") + val CPUThreshold = properties.get(AMConfiguration.ACROSS_CLUSTER_CPU_THRESHOLD) + val MemoryThreshold = properties.get(AMConfiguration.ACROSS_CLUSTER_MEMORY_THRESHOLD) + val CPUPercentageThreshold = + properties.get(AMConfiguration.ACROSS_CLUSTER_CPU_PERCENTAGE_THRESHOLD) + val MemoryPercentageThreshold = + properties.get(AMConfiguration.ACROSS_CLUSTER_MEMORY_PERCENTAGE_THRESHOLD) if ( StringUtils.isNotBlank(acrossClusterTask) && acrossClusterTask.toBoolean && StringUtils @@ -102,23 +101,21 @@ class DriverAndYarnReqResourceService( s"CPUThreshold: $CPUThreshold, MemoryThreshold: $MemoryThreshold," + s"CPUPercentageThreshold: $CPUPercentageThreshold, MemoryPercentageThreshold: $MemoryPercentageThreshold" ) - - val acrossClusterFlag = AcrossClusterRulesJudgeUtils.acrossClusterRuleJudge( - queueLeftResource.asInstanceOf[YarnResource], - usedCapacity.asInstanceOf[YarnResource], - maxCapacity.asInstanceOf[YarnResource], - CPUThreshold.toInt, - MemoryThreshold.toInt, - CPUPercentageThreshold.toDouble, - MemoryPercentageThreshold.toDouble - ) - - if (!acrossClusterFlag) { - logger.info(s"user: $user, creator: $creator task not meet the threshold rule") - - throw new RMWarnException( + try { + AcrossClusterRulesJudgeUtils.acrossClusterRuleCheck( + queueLeftResource.asInstanceOf[YarnResource], + usedCapacity.asInstanceOf[YarnResource], + maxCapacity.asInstanceOf[YarnResource], + CPUThreshold.toInt, + MemoryThreshold.toInt, + CPUPercentageThreshold.toDouble, + MemoryPercentageThreshold.toDouble + ) + } catch { + case ex: Exception => + throw new RMWarnException( RMErrorCode.ACROSS_CLUSTER_RULE_FAILED.getErrorCode, - RMErrorCode.ACROSS_CLUSTER_RULE_FAILED.getErrorDesc + ex.getMessage ) } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/utils/AcrossClusterRulesJudgeUtils.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/utils/AcrossClusterRulesJudgeUtils.scala index 51cf36bca1..76fc176395 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/utils/AcrossClusterRulesJudgeUtils.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/utils/AcrossClusterRulesJudgeUtils.scala @@ -19,47 +19,49 @@ package org.apache.linkis.manager.rm.utils import org.apache.linkis.common.utils.Logging import org.apache.linkis.manager.common.entity.resource.YarnResource +import org.apache.linkis.manager.common.exception.RMWarnException +import org.apache.linkis.manager.rm.exception.RMErrorCode object AcrossClusterRulesJudgeUtils extends Logging { - def acrossClusterRuleJudge( + def acrossClusterRuleCheck( leftResource: YarnResource, usedResource: YarnResource, maxResource: YarnResource, leftCPUThreshold: Int, leftMemoryThreshold: Int, - UsedCPUPercentageThreshold: Double, - UsedMemoryPercentageThreshold: Double - ): Boolean = { + CPUPercentageThreshold: Double, + MemoryPercentageThreshold: Double + ): Unit = { if (leftResource != null && usedResource != null && maxResource != null) { val leftQueueMemory = leftResource.queueMemory / Math.pow(1024, 3).toLong - logger.info( - s"leftResource.queueCores: ${leftResource.queueCores}, leftCPUThreshold: $leftCPUThreshold," + - s"leftQueueMemory: $leftQueueMemory, leftMemoryThreshold: $leftMemoryThreshold" - ) if (leftResource.queueCores > leftCPUThreshold && leftQueueMemory > leftMemoryThreshold) { - val usedCPUPercentage = usedResource.queueCores.asInstanceOf[Double] / maxResource.queueCores .asInstanceOf[Double] val usedMemoryPercentage = usedResource.queueMemory .asInstanceOf[Double] / maxResource.queueMemory.asInstanceOf[Double] - logger.info( - s"usedCPUPercentage: $usedCPUPercentage, UsedCPUPercentageThreshold: $UsedCPUPercentageThreshold" + - s"usedMemoryPercentage: $usedMemoryPercentage, UsedMemoryPercentageThreshold: $UsedMemoryPercentageThreshold" - ) - if ( - usedCPUPercentage < UsedCPUPercentageThreshold && usedMemoryPercentage < UsedMemoryPercentageThreshold + usedCPUPercentage < CPUPercentageThreshold && usedMemoryPercentage < MemoryPercentageThreshold ) { - return true + return + } else { + throw new RMWarnException( + RMErrorCode.ACROSS_CLUSTER_RULE_FAILED.getErrorCode, + s"usedCPUPercentage: $usedCPUPercentage, CPUPercentageThreshold: $CPUPercentageThreshold" + + s"usedMemoryPercentage: $usedMemoryPercentage, MemoryPercentageThreshold: $MemoryPercentageThreshold" + ) } + } else { + throw new RMWarnException( + RMErrorCode.ACROSS_CLUSTER_RULE_FAILED.getErrorCode, + s"leftResource.queueCores: ${leftResource.queueCores}, leftCPUThreshold: $leftCPUThreshold," + + s"leftQueueMemory: $leftQueueMemory, leftMemoryThreshold: $leftMemoryThreshold" + ) } } - - false } } diff --git a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/conf/AcrossClusterConfiguration.scala b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/conf/AcrossClusterConfiguration.scala deleted file mode 100644 index 64635cf914..0000000000 --- a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/conf/AcrossClusterConfiguration.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.configuration.conf - -import org.apache.linkis.common.conf.CommonVars - -object AcrossClusterConfiguration { - - val ACROSS_CLUSTER_QUEUE_SUFFIX = - CommonVars.apply("linkis.configuration.across.cluster.queue.suffix", "bdap2bdp").getValue - -} diff --git a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/conf/AcrossClusterRuleKeys.java b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/conf/AcrossClusterRuleKeys.java new file mode 100644 index 0000000000..f2fee2ff1f --- /dev/null +++ b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/conf/AcrossClusterRuleKeys.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.configuration.conf; + +public class AcrossClusterRuleKeys { + + public static final String KEY_QUEUE_SUFFIX = "suffix"; + + public static final String KEY_ACROSS_CLUSTER_QUEUE_SUFFIX = "bdap2bdp"; + + public static final String KEY_START_TIME = "startTime"; + + public static final String KEY_END_TIME = "endTime"; + + public static final String KEY_CPU_THRESHOLD = "CPUThreshold"; + + public static final String KEY_MEMORY_THRESHOLD = "MemoryThreshold"; + + public static final String KEY_CPU_PERCENTAGE_THRESHOLD = "CPUPercentageThreshold"; + + public static final String KEY_MEMORY_PERCENTAGE_THRESHOLD = "MemoryPercentageThreshold"; + + public static final String KEY_QUEUE_RULE = "queueRule"; + + public static final String KEY_TIME_RULE = "timeRule"; + + public static final String KEY_THRESHOLD_RULE = "thresholdRule"; +} diff --git a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/restful/api/AcrossClusterRuleRestfulApi.java b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/restful/api/AcrossClusterRuleRestfulApi.java index 10c274eed1..3a01c86060 100644 --- a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/restful/api/AcrossClusterRuleRestfulApi.java +++ b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/restful/api/AcrossClusterRuleRestfulApi.java @@ -77,7 +77,8 @@ public Message isValidRule(HttpServletRequest req, @RequestBody Map timeRuleMap = new HashMap<>(); Map thresholdRuleMap = new HashMap<>(); Map ruleMap = new HashMap<>(); - queueRuleMap.put("suffix", AcrossClusterConfiguration.ACROSS_CLUSTER_QUEUE_SUFFIX()); - timeRuleMap.put("startTime", startTime); - timeRuleMap.put("endTime", endTime); - thresholdRuleMap.put("CPUThreshold", CPUThreshold); - thresholdRuleMap.put("MemoryThreshold", MemoryThreshold); - thresholdRuleMap.put("CPUPercentageThreshold", CPUPercentageThreshold); - thresholdRuleMap.put("MemoryPercentageThreshold", MemoryPercentageThreshold); - ruleMap.put("queueRule", queueRuleMap); - ruleMap.put("timeRule", timeRuleMap); - ruleMap.put("thresholdRule", thresholdRuleMap); - ObjectMapper map2Json = new ObjectMapper(); + queueRuleMap.put(KEY_QUEUE_SUFFIX, KEY_ACROSS_CLUSTER_QUEUE_SUFFIX); + timeRuleMap.put(KEY_START_TIME, startTime); + timeRuleMap.put(KEY_END_TIME, endTime); + thresholdRuleMap.put(KEY_CPU_THRESHOLD, CPUThreshold); + thresholdRuleMap.put(KEY_MEMORY_THRESHOLD, MemoryThreshold); + thresholdRuleMap.put(KEY_CPU_PERCENTAGE_THRESHOLD, CPUPercentageThreshold); + thresholdRuleMap.put(KEY_MEMORY_PERCENTAGE_THRESHOLD, MemoryPercentageThreshold); + ruleMap.put(KEY_QUEUE_RULE, queueRuleMap); + ruleMap.put(KEY_TIME_RULE, timeRuleMap); + ruleMap.put(KEY_THRESHOLD_RULE, thresholdRuleMap); + ObjectMapper map2Json = BDPJettyServerHelper.jacksonJson(); String rules = map2Json.writeValueAsString(ruleMap); return rules;