Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

udf support kill engine #661

Merged
merged 2 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@ public Message executeEngineConnOperation(HttpServletRequest req, @RequestBody J

@ApiOperation(
value = "kill egineconns of a ecm",
notes = "Kill engine after updating configuration",
notes = "Kill engine by cteator or engineType",
response = Message.class)
@ApiImplicitParams({
@ApiImplicitParam(name = "creator", dataType = "String", required = true, example = "IDE"),
Expand All @@ -748,7 +748,7 @@ public Message executeEngineConnOperation(HttpServletRequest req, @RequestBody J
example = "hive-2.3.3"),
})
@ApiOperationSupport(ignoreParameters = {"param"})
@RequestMapping(path = "/rm/killEngineByUpdateConfig", method = RequestMethod.POST)
@RequestMapping(path = "/rm/killEngineByCreatorEngineType", method = RequestMethod.POST)
public Message killEngineByUpdateConfig(HttpServletRequest req, @RequestBody JsonNode jsonNode)
throws AMErrorException {
String userName = ModuleUserUtils.getOperationUser(req);
Expand All @@ -770,10 +770,18 @@ public Message killEngineByUpdateConfig(HttpServletRequest req, @RequestBody Jso
&& AMConfiguration.isUnAllowKilledEngineType(engineType)) {
return Message.error("multi user engine does not support this feature(多用户引擎不支持此功能)");
}
engineStopService.stopUnlockECByUserCreatorAndECType(userName, creatorStr, engineType);
if (engineType.equals(Configuration.GLOBAL_CONF_SYMBOL())) {
Arrays.stream(AMConfiguration.UDF_KILL_ENGINE_TYPE().split(","))
.forEach(
engine ->
engineStopService.stopUnlockECByUserCreatorAndECType(
userName, creatorStr, engine));
} else {
engineStopService.stopUnlockECByUserCreatorAndECType(
userName, creatorStr, engineType);
}
return Message.ok("Kill engineConn succeed");
}

static ServiceInstance getServiceInstance(JsonNode jsonNode) throws AMErrorException {
String applicationName = jsonNode.get("applicationName").asText();
String instance = jsonNode.get("instance").asText();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ object AMConfiguration {

val AM_USER_RESET_RESOURCE = CommonVars("linkis.am.user.reset.resource.enable", true).getValue

val UDF_KILL_ENGINE_TYPE = CommonVars("linkis.udf.kill.engine.type", "spark,hive").getValue

private def getDefaultMultiEngineUser(): String = {
val jvmUser = Utils.getJvmUser
s""" {jdbc:"$jvmUser", es: "$jvmUser", presto:"$jvmUser", appconn:"$jvmUser", openlookeng:"$jvmUser", trino:"$jvmUser", io_file:"root", jobserver:"$jvmUser", nebula:"$jvmUser"}"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,24 +290,28 @@ class DefaultEngineStopService extends AbstractEngineService with EngineStopServ
dealEngineByEngineNode(engineNodes.toList, userName)
}
// kill EMnode by user creator
if (StringUtils.isNotBlank(engineType) && !creator.equals(Configuration.GLOBAL_CONF_SYMBOL)) {
val filterEngineNode = engineNodes
.filter(_.getOwner.equals(userName))
.filter(node => {
var filterResult = false
if (!node.getLabels.isEmpty) {
val userCreator = LabelUtil.getUserCreatorLabel(node.getLabels)
val engineTypeLabel = LabelUtil.getEngineTypeLabel(node.getLabels).getStringValue
if (
userCreator.getUser.equals(userName) && userCreator.getCreator
.equals(creator) && engineTypeLabel.equals(engineType)
) {
filterResult = true
}
}
filterResult
})
.toList
if (StringUtils.isNotBlank(engineType)) {
val filterEngineNode = creator match {
case Configuration.GLOBAL_CONF_SYMBOL =>
engineNodes
.filter(_.getOwner.equals(userName))
.filter(!_.getLabels.isEmpty)
.filter(node =>
LabelUtil.getUserCreatorLabel(node.getLabels).getUser.equals(userName)
&& LabelUtil.getEngineTypeLabel(node.getLabels).getEngineType.equals(engineType)
)
.toList
case _ =>
engineNodes
.filter(_.getOwner.equals(userName))
.filter(!_.getLabels.isEmpty)
.filter(node =>
LabelUtil.getUserCreatorLabel(node.getLabels).getUser.equals(userName)
&& LabelUtil.getUserCreatorLabel(node.getLabels).getCreator.equals(creator)
&& LabelUtil.getEngineTypeLabel(node.getLabels).getEngineType.equals(engineType)
)
.toList
}
dealEngineByEngineNode(filterEngineNode, userName)
}
}
Expand Down
Loading