From aa9a1a8f5eab6f18b6371ea58a6a7ab741980930 Mon Sep 17 00:00:00 2001 From: taoran1250 <543121890@qq.com> Date: Tue, 10 Dec 2024 15:31:32 +0800 Subject: [PATCH] feat: queue support root.xxx --- .../server/conf/EngineConnPluginConfiguration.scala | 3 +++ .../rm/external/yarn/YarnResourceRequester.scala | 13 +++++++------ 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/engineplugin/server/conf/EngineConnPluginConfiguration.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/engineplugin/server/conf/EngineConnPluginConfiguration.scala index 9a16d9b9e6..0968b050c2 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/engineplugin/server/conf/EngineConnPluginConfiguration.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/engineplugin/server/conf/EngineConnPluginConfiguration.scala @@ -39,4 +39,7 @@ object EngineConnPluginConfiguration { val EC_BML_VERSION_MAY_WITH_PREFIX_V: CommonVars[Boolean] = CommonVars("linkis.engineconn.bml.version.may.with.prefix", true) + val QUEUE_PREFIX: CommonVars[String] = + CommonVars("wds.linkis.queue.prefix", "root.") + } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala index 11410f630f..ad904ad0bb 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala @@ -18,6 +18,7 @@ package org.apache.linkis.manager.rm.external.yarn import org.apache.linkis.common.utils.{Logging, Utils} +import org.apache.linkis.engineplugin.server.conf.EngineConnPluginConfiguration import org.apache.linkis.manager.common.conf.RMConfiguration import org.apache.linkis.manager.common.entity.resource.{ CommonNodeResource, @@ -57,7 +58,7 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { private val HASTATE_ACTIVE = "active" - private val QUEUE_PREFIX = "root." + private val queuePrefix = EngineConnPluginConfiguration.QUEUE_PREFIX.getValue private val rmAddressMap: util.Map[String, String] = new ConcurrentHashMap[String, String]() @@ -75,9 +76,9 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { val rmWebAddress = getAndUpdateActiveRmWebAddress(provider) logger.info(s"rmWebAddress: $rmWebAddress") var queueName = identifier.asInstanceOf[YarnResourceIdentifier].getQueueName - if (queueName.startsWith(QUEUE_PREFIX)) { - logger.info(s"Queue name [$queueName] starts with '[$QUEUE_PREFIX]', remove '[$QUEUE_PREFIX]]'") - queueName = queueName.substring(QUEUE_PREFIX.length) + if (queueName.startsWith(queuePrefix)) { + logger.info(s"Queue name [$queueName] starts with '[$queuePrefix]', remove '[$queuePrefix]]'") + queueName = queueName.substring(queuePrefix.length) } def getYarnResource(jValue: Option[JValue]) = jValue.map(r => @@ -116,7 +117,7 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { }) } - var realQueueName = "root." + queueName + var realQueueName = queuePrefix + queueName def getQueue(queues: JValue): Option[JValue] = queues match { case JArray(queue) => @@ -286,7 +287,7 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { ) ) - val realQueueName = "root." + queueName + val realQueueName = queuePrefix + queueName def getAppInfos(): Array[ExternalAppInfo] = { val resp = getResponseByUrl("apps", rmWebAddress, provider)