diff --git a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageUtils.scala b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageUtils.scala index 7a5b134749..7ee53a45cf 100644 --- a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageUtils.scala +++ b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageUtils.scala @@ -219,4 +219,8 @@ object StorageUtils extends Logging { StorageConfiguration.ENABLE_IO_PROXY.getValue } + def isHDFSPath(fsPath: FsPath): Boolean = { + HDFS.equals(fsPath.getFsType) + } + } diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala index 4d9e895de6..ca00508584 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala @@ -242,4 +242,7 @@ object EntranceConfiguration { val ENTRANCE_TASK_TIMEOUT_SCAN = CommonVars("wds.linkis.entrance.task.timeout.scan", new TimeType("12h")) + val ENABLE_HDFS_JVM_USER = + CommonVars[Boolean]("linkis.entrance.enable.hdfs.jvm.user", true).getValue + } diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogReader.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogReader.scala index 483cf9ab43..748f82df4b 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogReader.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogReader.scala @@ -19,7 +19,11 @@ package org.apache.linkis.entrance.log import org.apache.linkis.common.io.{Fs, FsPath} import org.apache.linkis.common.utils.Utils +import org.apache.linkis.entrance.conf.EntranceConfiguration +import org.apache.linkis.entrance.exception.LogReadFailedException import org.apache.linkis.storage.FSFactory +import org.apache.linkis.storage.fs.FileSystem +import org.apache.linkis.storage.utils.StorageUtils import java.io.{InputStream, IOException} import java.util @@ -36,13 +40,26 @@ class CacheLogReader(logPath: String, charset: String, sharedCache: Cache, user: var closed = false private def createInputStream: InputStream = { + if (!logPath.contains(user)) { + throw new LogReadFailedException( + s"${user} does not have permission to read the path $logPath" + ) + } + val fsPath = new FsPath(logPath) if (fileSystem == null) lock synchronized { if (fileSystem == null) { - fileSystem = FSFactory.getFsByProxyUser(new FsPath(logPath), user) + + fileSystem = + if (StorageUtils.isHDFSPath(fsPath) && EntranceConfiguration.ENABLE_HDFS_JVM_USER) { + FSFactory.getFs(new FsPath(logPath)).asInstanceOf[FileSystem] + } else { + FSFactory.getFsByProxyUser(new FsPath(logPath), user).asInstanceOf[FileSystem] + } + fileSystem.init(new util.HashMap[String, String]()) } } - val inputStream: InputStream = fileSystem.read(new FsPath(logPath)) + val inputStream: InputStream = fileSystem.read(fsPath) inputStream } diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/HDFSCacheLogWriter.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/HDFSCacheLogWriter.scala index 5ac90add66..a272f1e8a1 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/HDFSCacheLogWriter.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/HDFSCacheLogWriter.scala @@ -37,11 +37,15 @@ import java.util class HDFSCacheLogWriter(logPath: String, charset: String, sharedCache: Cache, user: String) extends LogWriter(charset) { - if (StringUtils.isBlank(logPath)) + if (StringUtils.isBlank(logPath)) { throw new EntranceErrorException(LOGPATH_NOT_NULL.getErrorCode, LOGPATH_NOT_NULL.getErrorDesc) + } - protected var fileSystem = + protected var fileSystem = if (EntranceConfiguration.ENABLE_HDFS_JVM_USER) { + FSFactory.getFs(new FsPath(logPath)).asInstanceOf[FileSystem] + } else { FSFactory.getFsByProxyUser(new FsPath(logPath), user).asInstanceOf[FileSystem] + } override protected var outputStream: OutputStream = null diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/exception/RMErrorCode.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/exception/RMErrorCode.java index b513931414..eb69062d89 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/exception/RMErrorCode.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/exception/RMErrorCode.java @@ -61,6 +61,8 @@ public enum RMErrorCode implements LinkisErrorCode { CLUSTER_QUEUE_INSTANCES_INSUFFICIENT(12012, "Insufficient cluster queue instance(集群队列实例不足)"), + ACROSS_CLUSTER_RULE_FAILED(12012, "across cluster rule failed(跨集群规则失败)"), + ECM_RESOURCE_INSUFFICIENT(11000, "ECM resources are insufficient(ECM 资源不足)"), ECM_MEMORY_INSUFFICIENT(11001, "ECM memory resources are insufficient(ECM 内存资源不足)"), diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/conf/AMConfiguration.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/conf/AMConfiguration.scala index c730edf0fd..edd3c001b5 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/conf/AMConfiguration.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/conf/AMConfiguration.scala @@ -23,6 +23,20 @@ import org.apache.linkis.manager.common.entity.enumeration.MaintainType object AMConfiguration { + val YARN_QUEUE_NAME_CONFIG_KEY = "wds.linkis.rm.yarnqueue" + + val ACROSS_CLUSTER_QUEUE_SUFFIX = "queueRuleSuffix" + + val ACROSS_CLUSTER_TASK = "acrossClusterTask" + + val ACROSS_CLUSTER_CPU_THRESHOLD = "CPUThreshold" + + val ACROSS_CLUSTER_MEMORY_THRESHOLD = "MemoryThreshold" + + val ACROSS_CLUSTER_CPU_PERCENTAGE_THRESHOLD = "CPUPercentageThreshold" + + val ACROSS_CLUSTER_MEMORY_PERCENTAGE_THRESHOLD = "MemoryPercentageThreshold" + val ECM_ADMIN_OPERATIONS = CommonVars("wds.linkis.governance.admin.operations", "") val ENGINE_START_MAX_TIME = diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala index 11f622e9fe..78a8adefd2 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala @@ -293,6 +293,16 @@ class DefaultEngineCreateService }) } + val queueRuleSuffix = props.get(AMConfiguration.ACROSS_CLUSTER_QUEUE_SUFFIX) + if (StringUtils.isNotBlank(queueRuleSuffix)) { + val queueName = props.getOrDefault(AMConfiguration.YARN_QUEUE_NAME_CONFIG_KEY, "default") + val newQueueName = queueName + "_" + queueRuleSuffix + props.put(AMConfiguration.YARN_QUEUE_NAME_CONFIG_KEY, newQueueName) + logger.info( + s"Switch queues according to queueRule with queue name : $queueName to $newQueueName" + ) + } + val timeoutEngineResourceRequest = TimeoutEngineResourceRequest( timeout, engineCreateRequest.getUser, diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/conf/LabelManagerConf.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/conf/LabelManagerConf.scala index 1e34f84d2d..12f1a26782 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/conf/LabelManagerConf.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/conf/LabelManagerConf.scala @@ -22,6 +22,6 @@ import org.apache.linkis.common.conf.CommonVars object LabelManagerConf { val LONG_LIVED_LABEL = - CommonVars("wds.linkis.label.node.long.lived.label.keys", "tenant").getValue + CommonVars("wds.linkis.label.node.long.lived.label.keys", "tenant|yarnCluster").getValue } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala index 8891c98935..a6ed7c7512 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala @@ -57,12 +57,11 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { private val HASTATE_ACTIVE = "active" - private var provider: ExternalResourceProvider = _ private val rmAddressMap: util.Map[String, String] = new ConcurrentHashMap[String, String]() - private def getAuthorizationStr = { - val user = this.provider.getConfigMap.getOrDefault("user", "").asInstanceOf[String] - val pwd = this.provider.getConfigMap.getOrDefault("pwd", "").asInstanceOf[String] + private def getAuthorizationStr(provider: ExternalResourceProvider) = { + val user = provider.getConfigMap.getOrDefault("user", "").asInstanceOf[String] + val pwd = provider.getConfigMap.getOrDefault("pwd", "").asInstanceOf[String] val authKey = user + ":" + pwd Base64.getMimeEncoder.encodeToString(authKey.getBytes) } @@ -71,9 +70,7 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { identifier: ExternalResourceIdentifier, provider: ExternalResourceProvider ): NodeResource = { - val rmWebHaAddress = provider.getConfigMap.get("rmWebAddress").asInstanceOf[String] - this.provider = provider - val rmWebAddress = getAndUpdateActiveRmWebAddress(rmWebHaAddress) + val rmWebAddress = getAndUpdateActiveRmWebAddress(provider) logger.info(s"rmWebAddress: $rmWebAddress") val queueName = identifier.asInstanceOf[YarnResourceIdentifier].getQueueName @@ -87,7 +84,7 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { ) def maxEffectiveHandle(queueValue: Option[JValue]): Option[YarnResource] = { - val metrics = getResponseByUrl("metrics", rmWebAddress) + val metrics = getResponseByUrl("metrics", rmWebAddress, provider) val totalResouceInfoResponse = ( (metrics \ "clusterMetrics" \ "totalMB").asInstanceOf[JInt].values.toLong, (metrics \ "clusterMetrics" \ "totalVirtualCores").asInstanceOf[JInt].values.toLong @@ -180,7 +177,7 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { def getChildQueuesOfCapacity(resp: JValue) = resp \ "queues" \ "queue" def getResources() = { - val resp = getResponseByUrl("scheduler", rmWebAddress) + val resp = getResponseByUrl("scheduler", rmWebAddress, provider) val schedulerType = (resp \ "scheduler" \ "schedulerInfo" \ "type").asInstanceOf[JString].values if ("capacityScheduler".equals(schedulerType)) { @@ -255,9 +252,8 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { identifier: ExternalResourceIdentifier, provider: ExternalResourceProvider ): java.util.List[ExternalAppInfo] = { - val rmWebHaAddress = provider.getConfigMap.get("rmWebAddress").asInstanceOf[String] - val rmWebAddress = getAndUpdateActiveRmWebAddress(rmWebHaAddress) + val rmWebAddress = getAndUpdateActiveRmWebAddress(provider) val queueName = identifier.asInstanceOf[YarnResourceIdentifier].getQueueName @@ -273,7 +269,7 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { val realQueueName = "root." + queueName def getAppInfos(): Array[ExternalAppInfo] = { - val resp = getResponseByUrl("apps", rmWebAddress) + val resp = getResponseByUrl("apps", rmWebAddress, provider) resp \ "apps" \ "app" match { case JArray(apps) => val appInfoBuffer = new ArrayBuffer[YarnAppInfo]() @@ -307,25 +303,29 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { override def getResourceType: ResourceType = ResourceType.Yarn - private def getResponseByUrl(url: String, rmWebAddress: String) = { + private def getResponseByUrl( + url: String, + rmWebAddress: String, + provider: ExternalResourceProvider + ) = { val httpGet = new HttpGet(rmWebAddress + "/ws/v1/cluster/" + url) httpGet.addHeader("Accept", "application/json") - val authorEnable: Any = this.provider.getConfigMap.get("authorEnable"); + val authorEnable: Any = provider.getConfigMap.get("authorEnable"); var httpResponse: HttpResponse = null authorEnable match { case flag: Boolean => if (flag) { - httpGet.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + getAuthorizationStr) + httpGet.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + getAuthorizationStr(provider)) } case _ => } - val kerberosEnable: Any = this.provider.getConfigMap.get("kerberosEnable"); + val kerberosEnable: Any = provider.getConfigMap.get("kerberosEnable"); kerberosEnable match { case flag: Boolean => if (flag) { - val principalName = this.provider.getConfigMap.get("principalName").asInstanceOf[String] - val keytabPath = this.provider.getConfigMap.get("keytabPath").asInstanceOf[String] - val krb5Path = this.provider.getConfigMap.get("krb5Path").asInstanceOf[String] + val principalName = provider.getConfigMap.get("principalName").asInstanceOf[String] + val keytabPath = provider.getConfigMap.get("keytabPath").asInstanceOf[String] + val krb5Path = provider.getConfigMap.get("krb5Path").asInstanceOf[String] val requestKuu = new RequestKerberosUrlUtils(principalName, keytabPath, krb5Path, false) val response = requestKuu.callRestUrl(rmWebAddress + "/ws/v1/cluster/" + url, principalName) @@ -341,8 +341,9 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { parse(EntityUtils.toString(httpResponse.getEntity())) } - def getAndUpdateActiveRmWebAddress(haAddress: String): String = { + def getAndUpdateActiveRmWebAddress(provider: ExternalResourceProvider): String = { // todo check if it will stuck for many requests + val haAddress = provider.getConfigMap.get("rmWebAddress").asInstanceOf[String] var activeAddress = rmAddressMap.get(haAddress) if (StringUtils.isBlank(activeAddress)) haAddress.intern().synchronized { if (StringUtils.isBlank(activeAddress)) { @@ -356,7 +357,7 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { .split(RMConfiguration.DEFAULT_YARN_RM_WEB_ADDRESS_DELIMITER.getValue) .foreach(address => { Utils.tryCatch { - val response = getResponseByUrl("info", address) + val response = getResponseByUrl("info", address, provider) response \ "clusterInfo" \ "haState" match { case state: JString => if (HASTATE_ACTIVE.equalsIgnoreCase(state.s)) { @@ -393,12 +394,10 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { override def reloadExternalResourceAddress( provider: ExternalResourceProvider ): java.lang.Boolean = { - if (null == provider) { - rmAddressMap.clear() - } else { + if (null != provider) { val rmWebHaAddress = provider.getConfigMap.get("rmWebAddress").asInstanceOf[String] rmAddressMap.remove(rmWebHaAddress) - getAndUpdateActiveRmWebAddress(rmWebHaAddress) + getAndUpdateActiveRmWebAddress(provider) } true } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DriverAndYarnReqResourceService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DriverAndYarnReqResourceService.scala index bebb34e00c..9a3b214318 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DriverAndYarnReqResourceService.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DriverAndYarnReqResourceService.scala @@ -17,12 +17,12 @@ package org.apache.linkis.manager.rm.service.impl +import org.apache.linkis.manager.am.conf.AMConfiguration import org.apache.linkis.manager.common.constant.RMConstant import org.apache.linkis.manager.common.entity.resource._ import org.apache.linkis.manager.common.entity.resource.ResourceType.DriverAndYarn import org.apache.linkis.manager.common.exception.RMWarnException -import org.apache.linkis.manager.common.protocol.engine.{EngineAskRequest, EngineCreateRequest} -import org.apache.linkis.manager.label.entity.cluster.ClusterLabel +import org.apache.linkis.manager.common.protocol.engine.EngineCreateRequest import org.apache.linkis.manager.rm.domain.RMLabelContainer import org.apache.linkis.manager.rm.exception.RMErrorCode import org.apache.linkis.manager.rm.external.service.ExternalResourceService @@ -76,6 +76,55 @@ class DriverAndYarnReqResourceService( throw new RMWarnException(notEnoughMessage._1, notEnoughMessage._2) } + if (engineCreateRequest.getProperties != null) { + val user = labelContainer.getUserCreatorLabel.getUser + val creator = labelContainer.getUserCreatorLabel.getCreator + val properties = engineCreateRequest.getProperties + val acrossClusterTask = properties.getOrDefault(AMConfiguration.ACROSS_CLUSTER_TASK, "false") + val CPUThreshold = properties.get(AMConfiguration.ACROSS_CLUSTER_CPU_THRESHOLD) + val MemoryThreshold = properties.get(AMConfiguration.ACROSS_CLUSTER_MEMORY_THRESHOLD) + val CPUPercentageThreshold = + properties.get(AMConfiguration.ACROSS_CLUSTER_CPU_PERCENTAGE_THRESHOLD) + val MemoryPercentageThreshold = + properties.get(AMConfiguration.ACROSS_CLUSTER_MEMORY_PERCENTAGE_THRESHOLD) + + if ( + StringUtils.isNotBlank(acrossClusterTask) && acrossClusterTask.toBoolean && StringUtils + .isNotBlank(CPUThreshold) && StringUtils + .isNotBlank(MemoryThreshold) + && StringUtils + .isNotBlank(CPUPercentageThreshold) && StringUtils.isNotBlank(MemoryPercentageThreshold) + ) { + + logger.info( + s"user: $user, creator: $creator task enter cross cluster resource judgment, " + + s"CPUThreshold: $CPUThreshold, MemoryThreshold: $MemoryThreshold," + + s"CPUPercentageThreshold: $CPUPercentageThreshold, MemoryPercentageThreshold: $MemoryPercentageThreshold" + ) + try { + AcrossClusterRulesJudgeUtils.acrossClusterRuleCheck( + queueLeftResource.asInstanceOf[YarnResource], + usedCapacity.asInstanceOf[YarnResource], + maxCapacity.asInstanceOf[YarnResource], + CPUThreshold.toInt, + MemoryThreshold.toInt, + CPUPercentageThreshold.toDouble, + MemoryPercentageThreshold.toDouble + ) + } catch { + case ex: Exception => + throw new RMWarnException( + RMErrorCode.ACROSS_CLUSTER_RULE_FAILED.getErrorCode, + ex.getMessage + ) + } + + logger.info(s"user: $user, creator: $creator task meet the threshold rule") + } else { + logger.info(s"user: $user, creator: $creator task skip cross cluster resource judgment") + } + } + true } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/utils/AcrossClusterRulesJudgeUtils.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/utils/AcrossClusterRulesJudgeUtils.scala index 51cf36bca1..76fc176395 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/utils/AcrossClusterRulesJudgeUtils.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/utils/AcrossClusterRulesJudgeUtils.scala @@ -19,47 +19,49 @@ package org.apache.linkis.manager.rm.utils import org.apache.linkis.common.utils.Logging import org.apache.linkis.manager.common.entity.resource.YarnResource +import org.apache.linkis.manager.common.exception.RMWarnException +import org.apache.linkis.manager.rm.exception.RMErrorCode object AcrossClusterRulesJudgeUtils extends Logging { - def acrossClusterRuleJudge( + def acrossClusterRuleCheck( leftResource: YarnResource, usedResource: YarnResource, maxResource: YarnResource, leftCPUThreshold: Int, leftMemoryThreshold: Int, - UsedCPUPercentageThreshold: Double, - UsedMemoryPercentageThreshold: Double - ): Boolean = { + CPUPercentageThreshold: Double, + MemoryPercentageThreshold: Double + ): Unit = { if (leftResource != null && usedResource != null && maxResource != null) { val leftQueueMemory = leftResource.queueMemory / Math.pow(1024, 3).toLong - logger.info( - s"leftResource.queueCores: ${leftResource.queueCores}, leftCPUThreshold: $leftCPUThreshold," + - s"leftQueueMemory: $leftQueueMemory, leftMemoryThreshold: $leftMemoryThreshold" - ) if (leftResource.queueCores > leftCPUThreshold && leftQueueMemory > leftMemoryThreshold) { - val usedCPUPercentage = usedResource.queueCores.asInstanceOf[Double] / maxResource.queueCores .asInstanceOf[Double] val usedMemoryPercentage = usedResource.queueMemory .asInstanceOf[Double] / maxResource.queueMemory.asInstanceOf[Double] - logger.info( - s"usedCPUPercentage: $usedCPUPercentage, UsedCPUPercentageThreshold: $UsedCPUPercentageThreshold" + - s"usedMemoryPercentage: $usedMemoryPercentage, UsedMemoryPercentageThreshold: $UsedMemoryPercentageThreshold" - ) - if ( - usedCPUPercentage < UsedCPUPercentageThreshold && usedMemoryPercentage < UsedMemoryPercentageThreshold + usedCPUPercentage < CPUPercentageThreshold && usedMemoryPercentage < MemoryPercentageThreshold ) { - return true + return + } else { + throw new RMWarnException( + RMErrorCode.ACROSS_CLUSTER_RULE_FAILED.getErrorCode, + s"usedCPUPercentage: $usedCPUPercentage, CPUPercentageThreshold: $CPUPercentageThreshold" + + s"usedMemoryPercentage: $usedMemoryPercentage, MemoryPercentageThreshold: $MemoryPercentageThreshold" + ) } + } else { + throw new RMWarnException( + RMErrorCode.ACROSS_CLUSTER_RULE_FAILED.getErrorCode, + s"leftResource.queueCores: ${leftResource.queueCores}, leftCPUThreshold: $leftCPUThreshold," + + s"leftQueueMemory: $leftQueueMemory, leftMemoryThreshold: $leftMemoryThreshold" + ) } } - - false } } diff --git a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/conf/AcrossClusterConfiguration.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/test/scala/org/apache/linkis/manager/label/conf/LabelManagerConfTest.scala similarity index 66% rename from linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/conf/AcrossClusterConfiguration.scala rename to linkis-computation-governance/linkis-manager/linkis-application-manager/src/test/scala/org/apache/linkis/manager/label/conf/LabelManagerConfTest.scala index f6cd36f69c..836278336b 100644 --- a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/conf/AcrossClusterConfiguration.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/test/scala/org/apache/linkis/manager/label/conf/LabelManagerConfTest.scala @@ -15,13 +15,19 @@ * limitations under the License. */ -package org.apache.linkis.configuration.conf +package org.apache.linkis.manager.label.conf -import org.apache.linkis.common.conf.CommonVars +import org.junit.jupiter.api.Test -object AcrossClusterConfiguration { +class LabelManagerConfTest { - val ACROSS_CLUSTER_QUEUE_SUFFIX = - CommonVars.apply("linkis.configuration.across.cluster.queue.suffix", "_bdap2bdp").getValue + @Test def testRandomFiltering(): Unit = { + var label = "tenant" + assert(LabelManagerConf.LONG_LIVED_LABEL.contains(label)) + label = "yarnCluster" + assert(LabelManagerConf.LONG_LIVED_LABEL.contains(label)) + label = "test" + assert(!LabelManagerConf.LONG_LIVED_LABEL.contains(label)) + } } diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/cluster/ClusterLabel.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/cluster/ClusterLabel.java index 499adb496b..5e1a3d5c23 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/cluster/ClusterLabel.java +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/cluster/ClusterLabel.java @@ -26,7 +26,7 @@ import java.util.HashMap; -import static org.apache.linkis.manager.label.errorcode.LabelCommonErrorCodeSummary.LABEL_ERROR_CODE; +import static org.apache.linkis.manager.label.errorcode.LabelCommonErrorCodeSummary.CLUSTER_LABEL_ERROR_CODE; public class ClusterLabel extends GenericLabel implements EMNodeLabel, EngineNodeLabel, UserModifiable { @@ -75,7 +75,7 @@ public void valueCheck(String stringValue) throws LabelErrorException { if (!StringUtils.isEmpty(stringValue)) { if (stringValue.split(SerializableLabel.VALUE_SEPARATOR).length != 2) { throw new LabelErrorException( - LABEL_ERROR_CODE.getErrorCode(), LABEL_ERROR_CODE.getErrorDesc()); + CLUSTER_LABEL_ERROR_CODE.getErrorCode(), CLUSTER_LABEL_ERROR_CODE.getErrorDesc()); } } } diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/errorcode/LabelCommonErrorCodeSummary.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/errorcode/LabelCommonErrorCodeSummary.java index a0ebdffe8a..7ce6c6546d 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/errorcode/LabelCommonErrorCodeSummary.java +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/errorcode/LabelCommonErrorCodeSummary.java @@ -21,6 +21,9 @@ public enum LabelCommonErrorCodeSummary implements LinkisErrorCode { UPDATE_LABEL_FAILED(25001, "Update label realtion failed(更新标签属性失败)"), + CLUSTER_LABEL_ERROR_CODE( + 25002, + "The value of the label is set incorrectly, the setting value is: ClusterType-ClusterName "), LABEL_ERROR_CODE( 25002, "The value of the label is set incorrectly, only one value can be set, and the separator symbol '-' cannot be used(标签的值设置错误,只能设置一个值,不能使用分割符符号 '-') "), diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/test/java/org/apache/linkis/manager/label/builder/factory/StdLabelBuilderFactoryTest.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/test/java/org/apache/linkis/manager/label/builder/factory/StdLabelBuilderFactoryTest.java index 8d0b1e4de9..50f4cb44cb 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/test/java/org/apache/linkis/manager/label/builder/factory/StdLabelBuilderFactoryTest.java +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/test/java/org/apache/linkis/manager/label/builder/factory/StdLabelBuilderFactoryTest.java @@ -19,11 +19,16 @@ import org.apache.linkis.manager.label.builder.CombinedLabelBuilder; import org.apache.linkis.manager.label.builder.LabelBuilder; +import org.apache.linkis.manager.label.entity.Label; import org.apache.linkis.manager.label.entity.em.EMInstanceLabel; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.junit.jupiter.api.*; /** StdLabelBuilderFactory Tester */ @@ -73,4 +78,14 @@ public void testCreateLabelForInLabelKeyInValueStreamOutLabelClassOutValueTypes( stdLabelBuilderFactory.createLabel("testLabelKey", null, EMInstanceLabel.class, null); Assertions.assertTrue(emInstanceLabel1.getLabelKey().equals("emInstance")); } + + @Test + public void test() { + Map input = new HashMap(); + input.put("userCreator", "username-IDE"); + input.put("yarnCluster", "bdp-test"); + input.put("executeOnce", "true"); + List