From e644ddb69e3ce837743078d81c91991a5fd2da61 Mon Sep 17 00:00:00 2001 From: v-kkhuang <62878639+v-kkhuang@users.noreply.github.com> Date: Wed, 11 Dec 2024 17:05:35 +0800 Subject: [PATCH] udf support kill engine (#661) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * udf support kill engine * Code optimization --------- Co-authored-by: “v_kkhuang” <“420895376@qq.com”> --- .../manager/am/restful/EngineRestfulApi.java | 16 ++++++-- .../manager/am/conf/AMConfiguration.scala | 2 + .../engine/DefaultEngineStopService.scala | 40 ++++++++++--------- 3 files changed, 36 insertions(+), 22 deletions(-) diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java index e9210326ef..feb835e743 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java @@ -737,7 +737,7 @@ public Message executeEngineConnOperation(HttpServletRequest req, @RequestBody J @ApiOperation( value = "kill egineconns of a ecm", - notes = "Kill engine after updating configuration", + notes = "Kill engine by cteator or engineType", response = Message.class) @ApiImplicitParams({ @ApiImplicitParam(name = "creator", dataType = "String", required = true, example = "IDE"), @@ -748,7 +748,7 @@ public Message executeEngineConnOperation(HttpServletRequest req, @RequestBody J example = "hive-2.3.3"), }) @ApiOperationSupport(ignoreParameters = {"param"}) - @RequestMapping(path = "/rm/killEngineByUpdateConfig", method = RequestMethod.POST) + @RequestMapping(path = "/rm/killEngineByCreatorEngineType", method = RequestMethod.POST) public Message killEngineByUpdateConfig(HttpServletRequest req, @RequestBody JsonNode jsonNode) throws AMErrorException { String userName = ModuleUserUtils.getOperationUser(req); @@ -770,10 +770,18 @@ public Message killEngineByUpdateConfig(HttpServletRequest req, @RequestBody Jso && AMConfiguration.isUnAllowKilledEngineType(engineType)) { return Message.error("multi user engine does not support this feature(多用户引擎不支持此功能)"); } - engineStopService.stopUnlockECByUserCreatorAndECType(userName, creatorStr, engineType); + if (engineType.equals(Configuration.GLOBAL_CONF_SYMBOL())) { + Arrays.stream(AMConfiguration.UDF_KILL_ENGINE_TYPE().split(",")) + .forEach( + engine -> + engineStopService.stopUnlockECByUserCreatorAndECType( + userName, creatorStr, engine)); + } else { + engineStopService.stopUnlockECByUserCreatorAndECType( + userName, creatorStr, engineType); + } return Message.ok("Kill engineConn succeed"); } - static ServiceInstance getServiceInstance(JsonNode jsonNode) throws AMErrorException { String applicationName = jsonNode.get("applicationName").asText(); String instance = jsonNode.get("instance").asText(); diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/conf/AMConfiguration.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/conf/AMConfiguration.scala index 1d1b141e26..91dca891d9 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/conf/AMConfiguration.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/conf/AMConfiguration.scala @@ -154,6 +154,8 @@ object AMConfiguration { val AM_USER_RESET_RESOURCE = CommonVars("linkis.am.user.reset.resource.enable", true).getValue + val UDF_KILL_ENGINE_TYPE = CommonVars("linkis.udf.kill.engine.type", "spark,hive").getValue + private def getDefaultMultiEngineUser(): String = { val jvmUser = Utils.getJvmUser s""" {jdbc:"$jvmUser", es: "$jvmUser", presto:"$jvmUser", appconn:"$jvmUser", openlookeng:"$jvmUser", trino:"$jvmUser", io_file:"root", jobserver:"$jvmUser", nebula:"$jvmUser"}""" diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineStopService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineStopService.scala index dc039786f4..822d0d0823 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineStopService.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineStopService.scala @@ -290,24 +290,28 @@ class DefaultEngineStopService extends AbstractEngineService with EngineStopServ dealEngineByEngineNode(engineNodes.toList, userName) } // kill EMnode by user creator - if (StringUtils.isNotBlank(engineType) && !creator.equals(Configuration.GLOBAL_CONF_SYMBOL)) { - val filterEngineNode = engineNodes - .filter(_.getOwner.equals(userName)) - .filter(node => { - var filterResult = false - if (!node.getLabels.isEmpty) { - val userCreator = LabelUtil.getUserCreatorLabel(node.getLabels) - val engineTypeLabel = LabelUtil.getEngineTypeLabel(node.getLabels).getStringValue - if ( - userCreator.getUser.equals(userName) && userCreator.getCreator - .equals(creator) && engineTypeLabel.equals(engineType) - ) { - filterResult = true - } - } - filterResult - }) - .toList + if (StringUtils.isNotBlank(engineType)) { + val filterEngineNode = creator match { + case Configuration.GLOBAL_CONF_SYMBOL => + engineNodes + .filter(_.getOwner.equals(userName)) + .filter(!_.getLabels.isEmpty) + .filter(node => + LabelUtil.getUserCreatorLabel(node.getLabels).getUser.equals(userName) + && LabelUtil.getEngineTypeLabel(node.getLabels).getEngineType.equals(engineType) + ) + .toList + case _ => + engineNodes + .filter(_.getOwner.equals(userName)) + .filter(!_.getLabels.isEmpty) + .filter(node => + LabelUtil.getUserCreatorLabel(node.getLabels).getUser.equals(userName) + && LabelUtil.getUserCreatorLabel(node.getLabels).getCreator.equals(creator) + && LabelUtil.getEngineTypeLabel(node.getLabels).getEngineType.equals(engineType) + ) + .toList + } dealEngineByEngineNode(filterEngineNode, userName) } }