Skip to content

Commit

Permalink
udf support kill engine
Browse files Browse the repository at this point in the history
  • Loading branch information
“v_kkhuang” committed Nov 28, 2024
1 parent f8368d6 commit ed98e03
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,44 @@ public Message killEngineByUpdateConfig(HttpServletRequest req, @RequestBody Jso
return Message.ok("Kill engineConn succeed");
}

@ApiOperation(
value = "kill egineconns of a ecm",
notes = "Kill engine after updating UDF",
response = Message.class)
@ApiImplicitParams({
@ApiImplicitParam(name = "engineType", dataType = "String", required = true, example = "hive"),
})
@ApiOperationSupport(ignoreParameters = {"param"})
@RequestMapping(path = "/rm/killEngineByUdf", method = RequestMethod.POST)
public Message killEngineByUdf(HttpServletRequest req, @RequestBody JsonNode jsonNode)
throws AMErrorException {
String userName = ModuleUserUtils.getOperationUser(req);
String jvmUser = StorageUtils.getJvmUser();
if (jvmUser.equals(userName)) {
return Message.error(
jvmUser + " users do not support this feature (" + jvmUser + " 用户不支持此功能)");
}
String engineType = "";
if (null != jsonNode.get("engineType")) {
engineType = jsonNode.get("engineType").textValue();
}
if (StringUtils.isNotBlank(engineType)
&& AMConfiguration.isUnAllowKilledEngineType(engineType)) {
return Message.error("multi user engine does not support this feature(多用户引擎不支持此功能)");
}
if (engineType.equals(Configuration.GLOBAL_CONF_SYMBOL())) {
Arrays.stream(AMConfiguration.UDF_KILL_ENGINE_TYPE().split(","))
.forEach(
engine ->
engineStopService.stopUnlockECByUserCreatorAndECType(
userName, Configuration.GLOBAL_CONF_SYMBOL(), engine));
} else {
engineStopService.stopUnlockECByUserCreatorAndECType(
userName, Configuration.GLOBAL_CONF_SYMBOL(), engineType);
}
return Message.ok("Kill engineConn succeed");
}

static ServiceInstance getServiceInstance(JsonNode jsonNode) throws AMErrorException {
String applicationName = jsonNode.get("applicationName").asText();
String instance = jsonNode.get("instance").asText();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ object AMConfiguration {

val AM_USER_RESET_RESOURCE = CommonVars("linkis.am.user.reset.resource.enable", true).getValue

val UDF_KILL_ENGINE_TYPE = CommonVars("linkis.udf.kill.engine.type", "spark,hive").getValue

private def getDefaultMultiEngineUser(): String = {
val jvmUser = Utils.getJvmUser
s""" {jdbc:"$jvmUser", es: "$jvmUser", presto:"$jvmUser", appconn:"$jvmUser", openlookeng:"$jvmUser", trino:"$jvmUser", io_file:"root", jobserver:"$jvmUser", nebula:"$jvmUser"}"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,24 +290,28 @@ class DefaultEngineStopService extends AbstractEngineService with EngineStopServ
dealEngineByEngineNode(engineNodes.toList, userName)
}
// kill EMnode by user creator
if (StringUtils.isNotBlank(engineType) && !creator.equals(Configuration.GLOBAL_CONF_SYMBOL)) {
val filterEngineNode = engineNodes
.filter(_.getOwner.equals(userName))
.filter(node => {
var filterResult = false
if (!node.getLabels.isEmpty) {
val userCreator = LabelUtil.getUserCreatorLabel(node.getLabels)
val engineTypeLabel = LabelUtil.getEngineTypeLabel(node.getLabels).getStringValue
if (
userCreator.getUser.equals(userName) && userCreator.getCreator
.equals(creator) && engineTypeLabel.equals(engineType)
) {
filterResult = true
}
}
filterResult
})
.toList
if (StringUtils.isNotBlank(engineType)) {
val filterEngineNode = creator match {
case Configuration.GLOBAL_CONF_SYMBOL =>
engineNodes
.filter(_.getOwner.equals(userName))
.filter(!_.getLabels.isEmpty)
.filter(node =>
LabelUtil.getUserCreatorLabel(node.getLabels).getUser.equals(userName)
&& LabelUtil.getEngineTypeLabel(node.getLabels).getEngineType.equals(engineType)
)
.toList
case _ =>
engineNodes
.filter(_.getOwner.equals(userName))
.filter(!_.getLabels.isEmpty)
.filter(node =>
LabelUtil.getUserCreatorLabel(node.getLabels).getUser.equals(userName)
&& LabelUtil.getUserCreatorLabel(node.getLabels).getCreator.equals(creator)
&& LabelUtil.getEngineTypeLabel(node.getLabels).getEngineType.equals(engineType)
)
.toList
}
dealEngineByEngineNode(filterEngineNode, userName)
}
}
Expand Down

0 comments on commit ed98e03

Please sign in to comment.