From 50113f5ec1310df7b110c85f861be855cc4d83c6 Mon Sep 17 00:00:00 2001 From: taoran1250 <543121890@qq.com> Date: Tue, 3 Dec 2024 11:53:28 +0800 Subject: [PATCH 1/6] feat: kill reason && queue support root.xxx --- .../entrance/restful/EntranceRestfulApi.java | 18 +++++++++++++++--- .../restful/EntranceRestfulRemote.scala | 3 ++- .../external/yarn/YarnResourceRequester.scala | 6 +++++- 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java index 516af6e63a..c297f28c1d 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java +++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java @@ -516,6 +516,11 @@ public Message killJobs( JsonNode idNode = jsonNode.get("idList"); JsonNode taskIDNode = jsonNode.get("taskIDList"); ArrayList waitToForceKill = new ArrayList<>(); + JsonNode killReasonNode = jsonNode.get("killReason"); + String killReason = ""; + if (killReasonNode != null) { + killReason = killReasonNode.asText(); + } String userName = ModuleUserUtils.getOperationUser(req, "killJobs"); if (idNode.size() != taskIDNode.size()) { @@ -553,7 +558,7 @@ public Message killJobs( messages.add(message); } else { try { - logger.info("begin to kill job {} ", job.get().getId()); + logger.info("begin to kill job {}, kill reason is {} ", job.get().getId(), killReason); if (job.get() instanceof EntranceJob) { EntranceJob entranceJob = (EntranceJob) job.get(); @@ -586,6 +591,9 @@ public Message killJobs( + jobReq.getId() + "已成功取消)"); } + if (StringUtils.isNotBlank(killReason)) { + jobReq.setErrorDesc(jobReq.getErrorDesc() + "Kill reason is " + killReason); + } this.entranceServer .getEntranceContext() .getOrCreatePersistenceManager() @@ -620,7 +628,8 @@ public Message killJobs( public Message kill( HttpServletRequest req, @PathVariable("id") String id, - @RequestParam(value = "taskID", required = false) Long taskID) { + @RequestParam(value = "taskID", required = false) Long taskID, + @RequestParam(value = "killReason", required = false) String killReason) { String realId = ZuulEntranceUtils.parseExecID(id)[3]; String userName = ModuleUserUtils.getOperationUser(req, "kill task realId:" + realId); @@ -659,7 +668,7 @@ public Message kill( } } - logger.info("begin to kill job {} ", job.get().getId()); + logger.info("begin to kill job {}, kill reason is {} ", job.get().getId(), killReason); job.get().kill(); message = Message.ok("Successfully killed the job(成功kill了job)"); message.setMethod("/api/entrance/" + id + "/kill"); @@ -670,6 +679,9 @@ public Message kill( EntranceJob entranceJob = (EntranceJob) job.get(); JobRequest jobReq = entranceJob.getJobRequest(); entranceJob.updateJobRequestStatus(SchedulerEventState.Cancelled().toString()); + if (StringUtils.isNotBlank(killReason)) { + jobReq.setErrorDesc(jobReq.getErrorDesc() + "Kill reason is " + killReason); + } this.entranceServer .getEntranceContext() .getOrCreatePersistenceManager() diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/restful/EntranceRestfulRemote.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/restful/EntranceRestfulRemote.scala index ac4c3648b8..1937fce141 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/restful/EntranceRestfulRemote.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/restful/EntranceRestfulRemote.scala @@ -71,7 +71,8 @@ trait EntranceRestfulRemote { def kill( req: HttpServletRequest, @PathVariable("id") id: String, - @RequestParam("taskID") taskID: java.lang.Long + @RequestParam("taskID") taskID: java.lang.Long, + @RequestParam("killReason") killReason: String ): Message @RequestMapping(value = Array("/entrance/{id}/pause"), method = Array(RequestMethod.GET)) diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala index 209a4a4141..8ee8d72f02 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala @@ -72,7 +72,11 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { ): NodeResource = { val rmWebAddress = getAndUpdateActiveRmWebAddress(provider) logger.info(s"rmWebAddress: $rmWebAddress") - val queueName = identifier.asInstanceOf[YarnResourceIdentifier].getQueueName + var queueName = identifier.asInstanceOf[YarnResourceIdentifier].getQueueName + if (queueName.startsWith("root.")) { + logger.info(s"Queue name [$queueName] starts with 'root.', remove 'root.'") + queueName = queueName.substring("root.".length) + } def getYarnResource(jValue: Option[JValue]) = jValue.map(r => new YarnResource( From 0a45b846b19601347f60c7d5e8398376ff5601cc Mon Sep 17 00:00:00 2001 From: taoran1250 <543121890@qq.com> Date: Tue, 3 Dec 2024 18:16:57 +0800 Subject: [PATCH 2/6] Revert "feat: kill reason && queue support root.xxx" This reverts commit 50113f5ec1310df7b110c85f861be855cc4d83c6. --- .../entrance/restful/EntranceRestfulApi.java | 18 +++--------------- .../restful/EntranceRestfulRemote.scala | 3 +-- .../external/yarn/YarnResourceRequester.scala | 6 +----- 3 files changed, 5 insertions(+), 22 deletions(-) diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java index c297f28c1d..516af6e63a 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java +++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java @@ -516,11 +516,6 @@ public Message killJobs( JsonNode idNode = jsonNode.get("idList"); JsonNode taskIDNode = jsonNode.get("taskIDList"); ArrayList waitToForceKill = new ArrayList<>(); - JsonNode killReasonNode = jsonNode.get("killReason"); - String killReason = ""; - if (killReasonNode != null) { - killReason = killReasonNode.asText(); - } String userName = ModuleUserUtils.getOperationUser(req, "killJobs"); if (idNode.size() != taskIDNode.size()) { @@ -558,7 +553,7 @@ public Message killJobs( messages.add(message); } else { try { - logger.info("begin to kill job {}, kill reason is {} ", job.get().getId(), killReason); + logger.info("begin to kill job {} ", job.get().getId()); if (job.get() instanceof EntranceJob) { EntranceJob entranceJob = (EntranceJob) job.get(); @@ -591,9 +586,6 @@ public Message killJobs( + jobReq.getId() + "已成功取消)"); } - if (StringUtils.isNotBlank(killReason)) { - jobReq.setErrorDesc(jobReq.getErrorDesc() + "Kill reason is " + killReason); - } this.entranceServer .getEntranceContext() .getOrCreatePersistenceManager() @@ -628,8 +620,7 @@ public Message killJobs( public Message kill( HttpServletRequest req, @PathVariable("id") String id, - @RequestParam(value = "taskID", required = false) Long taskID, - @RequestParam(value = "killReason", required = false) String killReason) { + @RequestParam(value = "taskID", required = false) Long taskID) { String realId = ZuulEntranceUtils.parseExecID(id)[3]; String userName = ModuleUserUtils.getOperationUser(req, "kill task realId:" + realId); @@ -668,7 +659,7 @@ public Message kill( } } - logger.info("begin to kill job {}, kill reason is {} ", job.get().getId(), killReason); + logger.info("begin to kill job {} ", job.get().getId()); job.get().kill(); message = Message.ok("Successfully killed the job(成功kill了job)"); message.setMethod("/api/entrance/" + id + "/kill"); @@ -679,9 +670,6 @@ public Message kill( EntranceJob entranceJob = (EntranceJob) job.get(); JobRequest jobReq = entranceJob.getJobRequest(); entranceJob.updateJobRequestStatus(SchedulerEventState.Cancelled().toString()); - if (StringUtils.isNotBlank(killReason)) { - jobReq.setErrorDesc(jobReq.getErrorDesc() + "Kill reason is " + killReason); - } this.entranceServer .getEntranceContext() .getOrCreatePersistenceManager() diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/restful/EntranceRestfulRemote.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/restful/EntranceRestfulRemote.scala index 1937fce141..ac4c3648b8 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/restful/EntranceRestfulRemote.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/restful/EntranceRestfulRemote.scala @@ -71,8 +71,7 @@ trait EntranceRestfulRemote { def kill( req: HttpServletRequest, @PathVariable("id") id: String, - @RequestParam("taskID") taskID: java.lang.Long, - @RequestParam("killReason") killReason: String + @RequestParam("taskID") taskID: java.lang.Long ): Message @RequestMapping(value = Array("/entrance/{id}/pause"), method = Array(RequestMethod.GET)) diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala index 8ee8d72f02..209a4a4141 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala @@ -72,11 +72,7 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { ): NodeResource = { val rmWebAddress = getAndUpdateActiveRmWebAddress(provider) logger.info(s"rmWebAddress: $rmWebAddress") - var queueName = identifier.asInstanceOf[YarnResourceIdentifier].getQueueName - if (queueName.startsWith("root.")) { - logger.info(s"Queue name [$queueName] starts with 'root.', remove 'root.'") - queueName = queueName.substring("root.".length) - } + val queueName = identifier.asInstanceOf[YarnResourceIdentifier].getQueueName def getYarnResource(jValue: Option[JValue]) = jValue.map(r => new YarnResource( From 04b3557162c536f9e55d2a24e27c56ddcc328b8c Mon Sep 17 00:00:00 2001 From: taoran1250 <543121890@qq.com> Date: Tue, 3 Dec 2024 18:17:36 +0800 Subject: [PATCH 3/6] feat: queue support root.xxx --- .../manager/rm/external/yarn/YarnResourceRequester.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala index 209a4a4141..8ee8d72f02 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala @@ -72,7 +72,11 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { ): NodeResource = { val rmWebAddress = getAndUpdateActiveRmWebAddress(provider) logger.info(s"rmWebAddress: $rmWebAddress") - val queueName = identifier.asInstanceOf[YarnResourceIdentifier].getQueueName + var queueName = identifier.asInstanceOf[YarnResourceIdentifier].getQueueName + if (queueName.startsWith("root.")) { + logger.info(s"Queue name [$queueName] starts with 'root.', remove 'root.'") + queueName = queueName.substring("root.".length) + } def getYarnResource(jValue: Option[JValue]) = jValue.map(r => new YarnResource( From ea80104d06cb86d5fef594d3a1d4ca2711dead14 Mon Sep 17 00:00:00 2001 From: taoran1250 <543121890@qq.com> Date: Tue, 10 Dec 2024 15:08:12 +0800 Subject: [PATCH 4/6] feat: queue support root.xxx --- .../manager/rm/external/yarn/YarnResourceRequester.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala index 8ee8d72f02..11410f630f 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala @@ -57,6 +57,8 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { private val HASTATE_ACTIVE = "active" + private val QUEUE_PREFIX = "root." + private val rmAddressMap: util.Map[String, String] = new ConcurrentHashMap[String, String]() private def getAuthorizationStr(provider: ExternalResourceProvider) = { @@ -73,9 +75,9 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { val rmWebAddress = getAndUpdateActiveRmWebAddress(provider) logger.info(s"rmWebAddress: $rmWebAddress") var queueName = identifier.asInstanceOf[YarnResourceIdentifier].getQueueName - if (queueName.startsWith("root.")) { - logger.info(s"Queue name [$queueName] starts with 'root.', remove 'root.'") - queueName = queueName.substring("root.".length) + if (queueName.startsWith(QUEUE_PREFIX)) { + logger.info(s"Queue name [$queueName] starts with '[$QUEUE_PREFIX]', remove '[$QUEUE_PREFIX]]'") + queueName = queueName.substring(QUEUE_PREFIX.length) } def getYarnResource(jValue: Option[JValue]) = jValue.map(r => From aa9a1a8f5eab6f18b6371ea58a6a7ab741980930 Mon Sep 17 00:00:00 2001 From: taoran1250 <543121890@qq.com> Date: Tue, 10 Dec 2024 15:31:32 +0800 Subject: [PATCH 5/6] feat: queue support root.xxx --- .../server/conf/EngineConnPluginConfiguration.scala | 3 +++ .../rm/external/yarn/YarnResourceRequester.scala | 13 +++++++------ 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/engineplugin/server/conf/EngineConnPluginConfiguration.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/engineplugin/server/conf/EngineConnPluginConfiguration.scala index 9a16d9b9e6..0968b050c2 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/engineplugin/server/conf/EngineConnPluginConfiguration.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/engineplugin/server/conf/EngineConnPluginConfiguration.scala @@ -39,4 +39,7 @@ object EngineConnPluginConfiguration { val EC_BML_VERSION_MAY_WITH_PREFIX_V: CommonVars[Boolean] = CommonVars("linkis.engineconn.bml.version.may.with.prefix", true) + val QUEUE_PREFIX: CommonVars[String] = + CommonVars("wds.linkis.queue.prefix", "root.") + } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala index 11410f630f..ad904ad0bb 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala @@ -18,6 +18,7 @@ package org.apache.linkis.manager.rm.external.yarn import org.apache.linkis.common.utils.{Logging, Utils} +import org.apache.linkis.engineplugin.server.conf.EngineConnPluginConfiguration import org.apache.linkis.manager.common.conf.RMConfiguration import org.apache.linkis.manager.common.entity.resource.{ CommonNodeResource, @@ -57,7 +58,7 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { private val HASTATE_ACTIVE = "active" - private val QUEUE_PREFIX = "root." + private val queuePrefix = EngineConnPluginConfiguration.QUEUE_PREFIX.getValue private val rmAddressMap: util.Map[String, String] = new ConcurrentHashMap[String, String]() @@ -75,9 +76,9 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { val rmWebAddress = getAndUpdateActiveRmWebAddress(provider) logger.info(s"rmWebAddress: $rmWebAddress") var queueName = identifier.asInstanceOf[YarnResourceIdentifier].getQueueName - if (queueName.startsWith(QUEUE_PREFIX)) { - logger.info(s"Queue name [$queueName] starts with '[$QUEUE_PREFIX]', remove '[$QUEUE_PREFIX]]'") - queueName = queueName.substring(QUEUE_PREFIX.length) + if (queueName.startsWith(queuePrefix)) { + logger.info(s"Queue name [$queueName] starts with '[$queuePrefix]', remove '[$queuePrefix]]'") + queueName = queueName.substring(queuePrefix.length) } def getYarnResource(jValue: Option[JValue]) = jValue.map(r => @@ -116,7 +117,7 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { }) } - var realQueueName = "root." + queueName + var realQueueName = queuePrefix + queueName def getQueue(queues: JValue): Option[JValue] = queues match { case JArray(queue) => @@ -286,7 +287,7 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { ) ) - val realQueueName = "root." + queueName + val realQueueName = queuePrefix + queueName def getAppInfos(): Array[ExternalAppInfo] = { val resp = getResponseByUrl("apps", rmWebAddress, provider) From 31eaa61d7d67e2d47fb9e7a079b17c2606c66e37 Mon Sep 17 00:00:00 2001 From: taoran1250 <543121890@qq.com> Date: Fri, 13 Dec 2024 16:53:12 +0800 Subject: [PATCH 6/6] fix: deal conflicts --- .../external/yarn/YarnResourceRequester.java | 10 +- .../external/yarn/YarnResourceRequester.scala | 441 ------------------ 2 files changed, 8 insertions(+), 443 deletions(-) delete mode 100644 linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java index 006b58157f..3195f1f2a8 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java @@ -17,6 +17,7 @@ package org.apache.linkis.manager.rm.external.yarn; +import org.apache.linkis.engineplugin.server.conf.EngineConnPluginConfiguration; import org.apache.linkis.manager.common.entity.resource.CommonNodeResource; import org.apache.linkis.manager.common.entity.resource.NodeResource; import org.apache.linkis.manager.common.entity.resource.ResourceType; @@ -57,6 +58,7 @@ public class YarnResourceRequester implements ExternalResourceRequester { private final String HASTATE_ACTIVE = "active"; private static final ObjectMapper objectMapper = new ObjectMapper(); private final Map rmAddressMap = new ConcurrentHashMap<>(); + private final String queuePrefix = EngineConnPluginConfiguration.QUEUE_PREFIX().getValue(); private static final HttpClient httpClient = HttpClients.createDefault(); @@ -74,7 +76,11 @@ public NodeResource requestResourceInfo( logger.info("rmWebAddress: " + rmWebAddress); String queueName = ((YarnResourceIdentifier) identifier).getQueueName(); - String realQueueName = "root." + queueName; + if (queueName.startsWith(queuePrefix)) { + logger.info("Queue name {} starts with '{}', remove '{}'", queueName, queuePrefix, queuePrefix); + queueName = queueName.substring(queuePrefix.length()); + } + String realQueueName = queuePrefix + queueName; try { YarnQueueInfo resources = getResources(rmWebAddress, realQueueName, queueName, provider); @@ -301,7 +307,7 @@ public List requestAppInfo( String rmWebAddress = getAndUpdateActiveRmWebAddress(provider); String queueName = ((YarnResourceIdentifier) identifier).getQueueName(); - String realQueueName = "root." + queueName; + String realQueueName = queuePrefix + queueName; JsonNode resp = getResponseByUrl("apps", rmWebAddress, provider).path("apps").path("app"); if (resp.isMissingNode()) { diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala deleted file mode 100644 index ad904ad0bb..0000000000 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala +++ /dev/null @@ -1,441 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.manager.rm.external.yarn - -import org.apache.linkis.common.utils.{Logging, Utils} -import org.apache.linkis.engineplugin.server.conf.EngineConnPluginConfiguration -import org.apache.linkis.manager.common.conf.RMConfiguration -import org.apache.linkis.manager.common.entity.resource.{ - CommonNodeResource, - NodeResource, - ResourceType, - YarnResource -} -import org.apache.linkis.manager.common.errorcode.ManagerCommonErrorCodeSummary._ -import org.apache.linkis.manager.common.exception.{RMErrorException, RMWarnException} -import org.apache.linkis.manager.rm.external.domain.{ - ExternalAppInfo, - ExternalResourceIdentifier, - ExternalResourceProvider -} -import org.apache.linkis.manager.rm.external.request.ExternalResourceRequester -import org.apache.linkis.manager.rm.utils.RequestKerberosUrlUtils - -import org.apache.commons.lang3.StringUtils -import org.apache.http.{HttpHeaders, HttpResponse} -import org.apache.http.client.methods.HttpGet -import org.apache.http.impl.client.HttpClients -import org.apache.http.util.EntityUtils - -import java.text.MessageFormat -import java.util -import java.util.Base64 -import java.util.concurrent.ConcurrentHashMap - -import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer - -import org.json4s.JsonAST._ -import org.json4s.JValue -import org.json4s.jackson.JsonMethods.parse - -class YarnResourceRequester extends ExternalResourceRequester with Logging { - - private val HASTATE_ACTIVE = "active" - - private val queuePrefix = EngineConnPluginConfiguration.QUEUE_PREFIX.getValue - - private val rmAddressMap: util.Map[String, String] = new ConcurrentHashMap[String, String]() - - private def getAuthorizationStr(provider: ExternalResourceProvider) = { - val user = provider.getConfigMap.getOrDefault("user", "").asInstanceOf[String] - val pwd = provider.getConfigMap.getOrDefault("pwd", "").asInstanceOf[String] - val authKey = user + ":" + pwd - Base64.getMimeEncoder.encodeToString(authKey.getBytes) - } - - override def requestResourceInfo( - identifier: ExternalResourceIdentifier, - provider: ExternalResourceProvider - ): NodeResource = { - val rmWebAddress = getAndUpdateActiveRmWebAddress(provider) - logger.info(s"rmWebAddress: $rmWebAddress") - var queueName = identifier.asInstanceOf[YarnResourceIdentifier].getQueueName - if (queueName.startsWith(queuePrefix)) { - logger.info(s"Queue name [$queueName] starts with '[$queuePrefix]', remove '[$queuePrefix]]'") - queueName = queueName.substring(queuePrefix.length) - } - - def getYarnResource(jValue: Option[JValue]) = jValue.map(r => - new YarnResource( - (r \ "memory").asInstanceOf[JInt].values.toLong * 1024L * 1024L, - (r \ "vCores").asInstanceOf[JInt].values.toInt, - 0, - queueName - ) - ) - - def maxEffectiveHandle(queueValue: Option[JValue]): Option[YarnResource] = { - val metrics = getResponseByUrl("metrics", rmWebAddress, provider) - val totalResouceInfoResponse = ( - (metrics \ "clusterMetrics" \ "totalMB").asInstanceOf[JInt].values.toLong, - (metrics \ "clusterMetrics" \ "totalVirtualCores").asInstanceOf[JInt].values.toLong - ) - queueValue.map(r => { - val absoluteCapacity = r \ "absoluteCapacity" match { - case jDecimal: JDecimal => - jDecimal.values.toDouble - case jDouble: JDouble => - jDouble.values - case _ => - 0d - } - val effectiveResource = absoluteCapacity - new YarnResource( - math - .floor(effectiveResource * totalResouceInfoResponse._1 * 1024L * 1024L / 100) - .toLong, - math.floor(effectiveResource * totalResouceInfoResponse._2 / 100).toInt, - 0, - queueName - ) - }) - } - - var realQueueName = queuePrefix + queueName - - def getQueue(queues: JValue): Option[JValue] = queues match { - case JArray(queue) => - queue.foreach { q => - val yarnQueueName = (q \ "queueName").asInstanceOf[JString].values - if (yarnQueueName == realQueueName) return Some(q) - else if (realQueueName.startsWith(yarnQueueName + ".")) { - return getQueue(getChildQueues(q)) - } - } - None - case JObject(queue) => - if ( - queue - .find(_._1 == "queueName") - .exists(_._2.asInstanceOf[JString].values == realQueueName) - ) { - Some(queues) - } else { - val childQueues = queue.find(_._1 == "childQueues") - if (childQueues.isEmpty) None - else getQueue(childQueues.map(_._2).get) - } - case _ => None - } - - def getChildQueues(resp: JValue): JValue = { - val queues = resp \ "childQueues" \ "queue" - - if ( - queues != null && queues != JNull && queues != JNothing && null != queues.children && queues.children.nonEmpty - ) { - logger.info(s"queues:$queues") - queues - } else resp \ "childQueues" - } - - def getQueueOfCapacity(queues: JValue): Option[JValue] = queues match { - case JArray(queue) => - queue.foreach { q => - val yarnQueueName = (q \ "queueName").asInstanceOf[JString].values - if (yarnQueueName == realQueueName) return Some(q) - else if ((q \ "queues").toOption.nonEmpty) { - val matchQueue = getQueueOfCapacity(getChildQueuesOfCapacity(q)) - if (matchQueue.nonEmpty) return matchQueue - } - } - None - case JObject(queue) => - if ( - queue - .find(_._1 == "queueName") - .exists(_._2.asInstanceOf[JString].values == realQueueName) - ) { - return Some(queues) - } else if ((queues \ "queues").toOption.nonEmpty) { - val matchQueue = getQueueOfCapacity(getChildQueuesOfCapacity(queues)) - if (matchQueue.nonEmpty) return matchQueue - } - None - case _ => None - } - - def getChildQueuesOfCapacity(resp: JValue) = resp \ "queues" \ "queue" - - def getResources() = { - val resp = getResponseByUrl("scheduler", rmWebAddress, provider) - val schedulerType = - (resp \ "scheduler" \ "schedulerInfo" \ "type").asInstanceOf[JString].values - if ("capacityScheduler".equals(schedulerType)) { - realQueueName = queueName - val childQueues = getChildQueuesOfCapacity(resp \ "scheduler" \ "schedulerInfo") - val queue = getQueueOfCapacity(childQueues) - val queueOption = Option(queue) match { - case Some(queue) => queue - case None => - logger.debug(s"cannot find any information about queue $queueName, response: " + resp) - throw new RMWarnException( - YARN_NOT_EXISTS_QUEUE.getErrorCode, - MessageFormat.format(YARN_NOT_EXISTS_QUEUE.getErrorDesc, queueName) - ) - } - val queueInfo = queueOption.get.asInstanceOf[JObject] - ( - maxEffectiveHandle(queue).get, - getYarnResource(queue.map(_ \ "resourcesUsed")).get, - (queueInfo \ "maxApps").asInstanceOf[JInt].values.toInt, - (queueInfo \ "numPendingApps").asInstanceOf[JInt].values.toInt, - (queueInfo \ "numActiveApps").asInstanceOf[JInt].values.toInt - ) - } else if ("fairScheduler".equals(schedulerType)) { - if ("root".equals(queueName)) { - // get cluster total resource - val queue = (resp \ "scheduler" \ "schedulerInfo" \ "rootQueue") - val rootQueue: Option[JValue] = Some(queue) - val rootQueueInfo = rootQueue.get.asInstanceOf[JObject] - ( - getYarnResource(rootQueue.map(_ \ "maxResources")).get, - getYarnResource(rootQueue.map(_ \ "usedResources")).get, - (rootQueueInfo \ "maxApps").asInstanceOf[JInt].values.toInt, - 0, - 0 - ) - } else { - val childQueues = getChildQueues(resp \ "scheduler" \ "schedulerInfo" \ "rootQueue") - val queue = getQueue(childQueues) - if (queue.isEmpty || queue.get == null) { - logger.debug(s"cannot find any information about queue $queueName, response: " + resp) - throw new RMWarnException( - YARN_NOT_EXISTS_QUEUE.getErrorCode, - MessageFormat.format(YARN_NOT_EXISTS_QUEUE.getErrorDesc, queueName) - ) - } - val queueInfo = queue.get.asInstanceOf[JObject] - ( - getYarnResource(queue.map(_ \ "maxResources")).get, - getYarnResource(queue.map(_ \ "usedResources")).get, - (queueInfo \ "maxApps").asInstanceOf[JInt].values.toInt, - (queueInfo \ "numPendingApps").asInstanceOf[JInt].values.toInt, - (queueInfo \ "numActiveApps").asInstanceOf[JInt].values.toInt - ) - } - } else { - logger.debug( - s"only support fairScheduler or capacityScheduler, schedulerType: $schedulerType , response: " + resp - ) - throw new RMWarnException( - ONLY_SUPPORT_FAIRORCAPA.getErrorCode, - MessageFormat.format(ONLY_SUPPORT_FAIRORCAPA.getErrorDesc(), schedulerType) - ) - } - } - - Utils.tryCatch { - val yarnResource = getResources() - val nodeResource = new CommonNodeResource - nodeResource.setMaxResource(yarnResource._1) - nodeResource.setUsedResource(yarnResource._2) - nodeResource.setMaxApps(yarnResource._3) - nodeResource.setNumPendingApps(yarnResource._4) - nodeResource.setNumActiveApps(yarnResource._5) - nodeResource - }(t => { - throw new RMErrorException( - YARN_QUEUE_EXCEPTION.getErrorCode, - YARN_QUEUE_EXCEPTION.getErrorDesc, - t - ) - }) - } - - override def requestAppInfo( - identifier: ExternalResourceIdentifier, - provider: ExternalResourceProvider - ): java.util.List[ExternalAppInfo] = { - - val rmWebAddress = getAndUpdateActiveRmWebAddress(provider) - - val queueName = identifier.asInstanceOf[YarnResourceIdentifier].getQueueName - - def getYarnResource(jValue: Option[JValue]) = jValue.map(r => - new YarnResource( - (r \ "allocatedMB").asInstanceOf[JInt].values.toLong * 1024L * 1024L, - (r \ "allocatedVCores").asInstanceOf[JInt].values.toInt, - 0, - queueName - ) - ) - - val realQueueName = queuePrefix + queueName - - def getAppInfos(): Array[ExternalAppInfo] = { - val resp = getResponseByUrl("apps", rmWebAddress, provider) - resp \ "apps" \ "app" match { - case JArray(apps) => - val appInfoBuffer = new ArrayBuffer[YarnAppInfo]() - apps.foreach { app => - val yarnQueueName = (app \ "queue").asInstanceOf[JString].values - val state = (app \ "state").asInstanceOf[JString].values - if (yarnQueueName == realQueueName && (state == "RUNNING" || state == "ACCEPTED")) { - val appInfo = new YarnAppInfo( - (app \ "id").asInstanceOf[JString].values, - (app \ "user").asInstanceOf[JString].values, - state, - (app \ "applicationType").asInstanceOf[JString].values, - getYarnResource(Some(app)).get - ) - appInfoBuffer.append(appInfo) - } - } - appInfoBuffer.toArray - case _ => new ArrayBuffer[YarnAppInfo](0).toArray - } - } - - Utils.tryCatch(getAppInfos().toList.asJava)(t => { - throw new RMErrorException( - YARN_APPLICATION_EXCEPTION.getErrorCode, - YARN_APPLICATION_EXCEPTION.getErrorDesc, - t - ) - }) - } - - override def getResourceType: ResourceType = ResourceType.Yarn - - private def getResponseByUrl( - url: String, - rmWebAddress: String, - provider: ExternalResourceProvider - ) = { - val httpGet = new HttpGet(rmWebAddress + "/ws/v1/cluster/" + url) - httpGet.addHeader("Accept", "application/json") - val authorEnable: Any = provider.getConfigMap.get("authorEnable"); - var httpResponse: HttpResponse = null - authorEnable match { - case flag: Boolean => - if (flag) { - httpGet.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + getAuthorizationStr(provider)) - } - case _ => - } - val kerberosEnable: Any = provider.getConfigMap.get("kerberosEnable"); - kerberosEnable match { - case flag: Boolean => - if (flag) { - val principalName = provider.getConfigMap.get("principalName").asInstanceOf[String] - val keytabPath = provider.getConfigMap.get("keytabPath").asInstanceOf[String] - val krb5Path = provider.getConfigMap.get("krb5Path").asInstanceOf[String] - val requestKuu = new RequestKerberosUrlUtils(principalName, keytabPath, krb5Path, false) - val response = - requestKuu.callRestUrl(rmWebAddress + "/ws/v1/cluster/" + url, principalName) - httpResponse = response; - } else { - val response = YarnResourceRequester.httpClient.execute(httpGet) - httpResponse = response - } - case _ => - val response = YarnResourceRequester.httpClient.execute(httpGet) - httpResponse = response - } - parse(EntityUtils.toString(httpResponse.getEntity())) - } - - def getAndUpdateActiveRmWebAddress(provider: ExternalResourceProvider): String = { - // todo check if it will stuck for many requests - val haAddress = provider.getConfigMap.get("rmWebAddress").asInstanceOf[String] - var activeAddress = rmAddressMap.get(haAddress) - if (StringUtils.isBlank(activeAddress)) haAddress.intern().synchronized { - if (StringUtils.isBlank(activeAddress)) { - if (logger.isDebugEnabled()) { - logger.debug( - s"Cannot find value of haAddress : ${haAddress} in cacheMap with size ${rmAddressMap.size()}" - ) - } - if (StringUtils.isNotBlank(haAddress)) { - haAddress - .split(RMConfiguration.DEFAULT_YARN_RM_WEB_ADDRESS_DELIMITER.getValue) - .foreach(address => { - Utils.tryCatch { - val response = getResponseByUrl("info", address, provider) - response \ "clusterInfo" \ "haState" match { - case state: JString => - if (HASTATE_ACTIVE.equalsIgnoreCase(state.s)) { - activeAddress = address - } else { - logger.warn(s"Resourcemanager : ${address} haState : ${state.s}") - } - case _ => - } - } { case e: Exception => - logger.error("Get Yarn resourcemanager info error, " + e.getMessage, e) - } - }) - } - if (StringUtils.isNotBlank(activeAddress)) { - if (logger.isDebugEnabled()) { - logger.debug(s"Put (${haAddress}, ${activeAddress}) to cacheMap.") - } - rmAddressMap.put(haAddress, activeAddress) - } else { - throw new RMErrorException( - GET_YARN_EXCEPTION.getErrorCode, - MessageFormat.format(GET_YARN_EXCEPTION.getErrorDesc(), haAddress) - ) - } - } - } - if (logger.isDebugEnabled()) { - logger.debug(s"Get active rm address : ${activeAddress} from haAddress : ${haAddress}") - } - activeAddress - } - - override def reloadExternalResourceAddress( - provider: ExternalResourceProvider - ): java.lang.Boolean = { - if (null != provider) { - val rmWebHaAddress = provider.getConfigMap.get("rmWebAddress").asInstanceOf[String] - rmAddressMap.remove(rmWebHaAddress) - getAndUpdateActiveRmWebAddress(provider) - } - true - } - -} - -object YarnResourceRequester extends Logging { - - private val httpClient = HttpClients.createDefault() - - def init(): Unit = { - addShutdownHook() - } - - def addShutdownHook(): Unit = { - logger.info("Register shutdown hook to release httpclient connection") - Utils.addShutdownHook(httpClient.close()) - } - -}