From 398725e7d3edaad5b6ce5868e4da3548b2b2d786 Mon Sep 17 00:00:00 2001 From: Casion Date: Tue, 5 Sep 2023 15:20:32 +0800 Subject: [PATCH 01/11] Dev 1.1.15 webank acrossclusters (#278) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 【1.1.15 webank】 cross cluster (#251) * fix update rule mapper * fix exception info * fix clustername tolowcase * fix yarnResourceRequester * fix cluster label set message * fix cluster label value check * fix cross cluster task * fix platform * fix platform2 * fix cluster rule isValid API * fix queue rule suffix * fix rule valid api * fix switch queue log * fix pr comments * add cross cluster rule error (#268) * fix user to user_name (#272) * fix user to user_name * fix userName * fix all userName in api * fix all username * fix all username (#277) --------- Co-authored-by: lemonjuice <86357693+lemonjuicelove@users.noreply.github.com> Co-authored-by: lemonjuicelove <735611140@qq.com> --- .../manager/rm/exception/RMErrorCode.java | 2 + .../engine/DefaultEngineCreateService.scala | 8 +++ .../external/yarn/YarnResourceRequester.scala | 49 ++++++++------- .../DriverAndYarnReqResourceService.scala | 52 +++++++++++++++- .../label/entity/cluster/ClusterLabel.java | 4 +- .../LabelCommonErrorCodeSummary.java | 3 + .../ecm/ComputationEngineConnManager.scala | 3 +- .../conf/AcrossClusterConfiguration.scala | 2 +- .../dao/AcrossClusterRuleMapper.java | 6 +- .../entity/AcrossClusterRule.java | 14 ++--- .../api/AcrossClusterRuleRestfulApi.java | 61 ++++++++++--------- .../service/AcrossClusterRuleService.java | 6 +- .../impl/AcrossClusterRuleServiceImpl.java | 22 +++---- .../mapper/common/AcrossClusterRuleMapper.xml | 31 ++++------ 14 files changed, 157 insertions(+), 106 deletions(-) diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/exception/RMErrorCode.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/exception/RMErrorCode.java index b513931414..eb69062d89 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/exception/RMErrorCode.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/exception/RMErrorCode.java @@ -61,6 +61,8 @@ public enum RMErrorCode implements LinkisErrorCode { CLUSTER_QUEUE_INSTANCES_INSUFFICIENT(12012, "Insufficient cluster queue instance(集群队列实例不足)"), + ACROSS_CLUSTER_RULE_FAILED(12012, "across cluster rule failed(跨集群规则失败)"), + ECM_RESOURCE_INSUFFICIENT(11000, "ECM resources are insufficient(ECM 资源不足)"), ECM_MEMORY_INSUFFICIENT(11001, "ECM memory resources are insufficient(ECM 内存资源不足)"), diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala index 11f622e9fe..14acefce87 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala @@ -293,6 +293,14 @@ class DefaultEngineCreateService }) } + val queueRuleSuffix = props.get("queueRuleSuffix") + if (StringUtils.isNotBlank(queueRuleSuffix)) { + val queueName = props.getOrDefault("wds.linkis.rm.yarnqueue", "default") + val newQueueName = queueName + "_" + queueRuleSuffix + props.put("wds.linkis.rm.yarnqueue", newQueueName) + logger.info(s"Switch queues according to queueRule with queue name : $queueName to $newQueueName") + } + val timeoutEngineResourceRequest = TimeoutEngineResourceRequest( timeout, engineCreateRequest.getUser, diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala index 8891c98935..a6ed7c7512 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala @@ -57,12 +57,11 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { private val HASTATE_ACTIVE = "active" - private var provider: ExternalResourceProvider = _ private val rmAddressMap: util.Map[String, String] = new ConcurrentHashMap[String, String]() - private def getAuthorizationStr = { - val user = this.provider.getConfigMap.getOrDefault("user", "").asInstanceOf[String] - val pwd = this.provider.getConfigMap.getOrDefault("pwd", "").asInstanceOf[String] + private def getAuthorizationStr(provider: ExternalResourceProvider) = { + val user = provider.getConfigMap.getOrDefault("user", "").asInstanceOf[String] + val pwd = provider.getConfigMap.getOrDefault("pwd", "").asInstanceOf[String] val authKey = user + ":" + pwd Base64.getMimeEncoder.encodeToString(authKey.getBytes) } @@ -71,9 +70,7 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { identifier: ExternalResourceIdentifier, provider: ExternalResourceProvider ): NodeResource = { - val rmWebHaAddress = provider.getConfigMap.get("rmWebAddress").asInstanceOf[String] - this.provider = provider - val rmWebAddress = getAndUpdateActiveRmWebAddress(rmWebHaAddress) + val rmWebAddress = getAndUpdateActiveRmWebAddress(provider) logger.info(s"rmWebAddress: $rmWebAddress") val queueName = identifier.asInstanceOf[YarnResourceIdentifier].getQueueName @@ -87,7 +84,7 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { ) def maxEffectiveHandle(queueValue: Option[JValue]): Option[YarnResource] = { - val metrics = getResponseByUrl("metrics", rmWebAddress) + val metrics = getResponseByUrl("metrics", rmWebAddress, provider) val totalResouceInfoResponse = ( (metrics \ "clusterMetrics" \ "totalMB").asInstanceOf[JInt].values.toLong, (metrics \ "clusterMetrics" \ "totalVirtualCores").asInstanceOf[JInt].values.toLong @@ -180,7 +177,7 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { def getChildQueuesOfCapacity(resp: JValue) = resp \ "queues" \ "queue" def getResources() = { - val resp = getResponseByUrl("scheduler", rmWebAddress) + val resp = getResponseByUrl("scheduler", rmWebAddress, provider) val schedulerType = (resp \ "scheduler" \ "schedulerInfo" \ "type").asInstanceOf[JString].values if ("capacityScheduler".equals(schedulerType)) { @@ -255,9 +252,8 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { identifier: ExternalResourceIdentifier, provider: ExternalResourceProvider ): java.util.List[ExternalAppInfo] = { - val rmWebHaAddress = provider.getConfigMap.get("rmWebAddress").asInstanceOf[String] - val rmWebAddress = getAndUpdateActiveRmWebAddress(rmWebHaAddress) + val rmWebAddress = getAndUpdateActiveRmWebAddress(provider) val queueName = identifier.asInstanceOf[YarnResourceIdentifier].getQueueName @@ -273,7 +269,7 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { val realQueueName = "root." + queueName def getAppInfos(): Array[ExternalAppInfo] = { - val resp = getResponseByUrl("apps", rmWebAddress) + val resp = getResponseByUrl("apps", rmWebAddress, provider) resp \ "apps" \ "app" match { case JArray(apps) => val appInfoBuffer = new ArrayBuffer[YarnAppInfo]() @@ -307,25 +303,29 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { override def getResourceType: ResourceType = ResourceType.Yarn - private def getResponseByUrl(url: String, rmWebAddress: String) = { + private def getResponseByUrl( + url: String, + rmWebAddress: String, + provider: ExternalResourceProvider + ) = { val httpGet = new HttpGet(rmWebAddress + "/ws/v1/cluster/" + url) httpGet.addHeader("Accept", "application/json") - val authorEnable: Any = this.provider.getConfigMap.get("authorEnable"); + val authorEnable: Any = provider.getConfigMap.get("authorEnable"); var httpResponse: HttpResponse = null authorEnable match { case flag: Boolean => if (flag) { - httpGet.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + getAuthorizationStr) + httpGet.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + getAuthorizationStr(provider)) } case _ => } - val kerberosEnable: Any = this.provider.getConfigMap.get("kerberosEnable"); + val kerberosEnable: Any = provider.getConfigMap.get("kerberosEnable"); kerberosEnable match { case flag: Boolean => if (flag) { - val principalName = this.provider.getConfigMap.get("principalName").asInstanceOf[String] - val keytabPath = this.provider.getConfigMap.get("keytabPath").asInstanceOf[String] - val krb5Path = this.provider.getConfigMap.get("krb5Path").asInstanceOf[String] + val principalName = provider.getConfigMap.get("principalName").asInstanceOf[String] + val keytabPath = provider.getConfigMap.get("keytabPath").asInstanceOf[String] + val krb5Path = provider.getConfigMap.get("krb5Path").asInstanceOf[String] val requestKuu = new RequestKerberosUrlUtils(principalName, keytabPath, krb5Path, false) val response = requestKuu.callRestUrl(rmWebAddress + "/ws/v1/cluster/" + url, principalName) @@ -341,8 +341,9 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { parse(EntityUtils.toString(httpResponse.getEntity())) } - def getAndUpdateActiveRmWebAddress(haAddress: String): String = { + def getAndUpdateActiveRmWebAddress(provider: ExternalResourceProvider): String = { // todo check if it will stuck for many requests + val haAddress = provider.getConfigMap.get("rmWebAddress").asInstanceOf[String] var activeAddress = rmAddressMap.get(haAddress) if (StringUtils.isBlank(activeAddress)) haAddress.intern().synchronized { if (StringUtils.isBlank(activeAddress)) { @@ -356,7 +357,7 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { .split(RMConfiguration.DEFAULT_YARN_RM_WEB_ADDRESS_DELIMITER.getValue) .foreach(address => { Utils.tryCatch { - val response = getResponseByUrl("info", address) + val response = getResponseByUrl("info", address, provider) response \ "clusterInfo" \ "haState" match { case state: JString => if (HASTATE_ACTIVE.equalsIgnoreCase(state.s)) { @@ -393,12 +394,10 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { override def reloadExternalResourceAddress( provider: ExternalResourceProvider ): java.lang.Boolean = { - if (null == provider) { - rmAddressMap.clear() - } else { + if (null != provider) { val rmWebHaAddress = provider.getConfigMap.get("rmWebAddress").asInstanceOf[String] rmAddressMap.remove(rmWebHaAddress) - getAndUpdateActiveRmWebAddress(rmWebHaAddress) + getAndUpdateActiveRmWebAddress(provider) } true } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DriverAndYarnReqResourceService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DriverAndYarnReqResourceService.scala index bebb34e00c..728f753977 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DriverAndYarnReqResourceService.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DriverAndYarnReqResourceService.scala @@ -21,8 +21,7 @@ import org.apache.linkis.manager.common.constant.RMConstant import org.apache.linkis.manager.common.entity.resource._ import org.apache.linkis.manager.common.entity.resource.ResourceType.DriverAndYarn import org.apache.linkis.manager.common.exception.RMWarnException -import org.apache.linkis.manager.common.protocol.engine.{EngineAskRequest, EngineCreateRequest} -import org.apache.linkis.manager.label.entity.cluster.ClusterLabel +import org.apache.linkis.manager.common.protocol.engine.EngineCreateRequest import org.apache.linkis.manager.rm.domain.RMLabelContainer import org.apache.linkis.manager.rm.exception.RMErrorCode import org.apache.linkis.manager.rm.external.service.ExternalResourceService @@ -76,6 +75,55 @@ class DriverAndYarnReqResourceService( throw new RMWarnException(notEnoughMessage._1, notEnoughMessage._2) } + if (engineCreateRequest.getProperties != null) { + val user = labelContainer.getUserCreatorLabel.getUser + val creator = labelContainer.getUserCreatorLabel.getCreator + val properties = engineCreateRequest.getProperties + val acrossClusterTask = properties.get("acrossClusterTask") + val CPUThreshold = properties.get("CPUThreshold") + val MemoryThreshold = properties.get("MemoryThreshold") + val CPUPercentageThreshold = properties.get("CPUPercentageThreshold") + val MemoryPercentageThreshold = properties.get("MemoryPercentageThreshold") + + logger.info( + s"user: $user, creator: $creator, acrossClusterTask: $acrossClusterTask, cross cluster judge" + ) + + if ( + acrossClusterTask.toBoolean && StringUtils.isNotBlank(CPUThreshold) && StringUtils + .isNotBlank(MemoryThreshold) + && StringUtils + .isNotBlank(CPUPercentageThreshold) && StringUtils.isNotBlank(MemoryPercentageThreshold) + ) { + + logger.info( + s"user: $user, creator: $creator task enter cross cluster resource judgment, " + + s"CPUThreshold: $CPUThreshold, MemoryThreshold: $MemoryThreshold," + + s"CPUPercentageThreshold: $CPUPercentageThreshold, MemoryPercentageThreshold: $MemoryPercentageThreshold" + ) + + val acrossClusterFlag = AcrossClusterRulesJudgeUtils.acrossClusterRuleJudge( + queueLeftResource.asInstanceOf[YarnResource], + usedCapacity.asInstanceOf[YarnResource], + maxCapacity.asInstanceOf[YarnResource], + CPUThreshold.toInt, + MemoryThreshold.toInt, + CPUPercentageThreshold.toDouble, + MemoryPercentageThreshold.toDouble + ) + + if (!acrossClusterFlag) { + logger.info(s"user: $user, creator: $creator task not meet the threshold rule") + + throw new RMWarnException(RMErrorCode.ACROSS_CLUSTER_RULE_FAILED.getErrorCode, RMErrorCode.ACROSS_CLUSTER_RULE_FAILED.getErrorDesc) + } + + logger.info(s"user: $user, creator: $creator task meet the threshold rule") + } else { + logger.info(s"user: $user, creator: $creator task skip cross cluster resource judgment") + } + } + true } diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/cluster/ClusterLabel.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/cluster/ClusterLabel.java index 499adb496b..5e1a3d5c23 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/cluster/ClusterLabel.java +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/cluster/ClusterLabel.java @@ -26,7 +26,7 @@ import java.util.HashMap; -import static org.apache.linkis.manager.label.errorcode.LabelCommonErrorCodeSummary.LABEL_ERROR_CODE; +import static org.apache.linkis.manager.label.errorcode.LabelCommonErrorCodeSummary.CLUSTER_LABEL_ERROR_CODE; public class ClusterLabel extends GenericLabel implements EMNodeLabel, EngineNodeLabel, UserModifiable { @@ -75,7 +75,7 @@ public void valueCheck(String stringValue) throws LabelErrorException { if (!StringUtils.isEmpty(stringValue)) { if (stringValue.split(SerializableLabel.VALUE_SEPARATOR).length != 2) { throw new LabelErrorException( - LABEL_ERROR_CODE.getErrorCode(), LABEL_ERROR_CODE.getErrorDesc()); + CLUSTER_LABEL_ERROR_CODE.getErrorCode(), CLUSTER_LABEL_ERROR_CODE.getErrorDesc()); } } } diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/errorcode/LabelCommonErrorCodeSummary.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/errorcode/LabelCommonErrorCodeSummary.java index a0ebdffe8a..c19ab8eadf 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/errorcode/LabelCommonErrorCodeSummary.java +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/errorcode/LabelCommonErrorCodeSummary.java @@ -21,6 +21,9 @@ public enum LabelCommonErrorCodeSummary implements LinkisErrorCode { UPDATE_LABEL_FAILED(25001, "Update label realtion failed(更新标签属性失败)"), + CLUSTER_LABEL_ERROR_CODE( + 25002, + "The value of the label is set incorrectly, the setting value is: ClusterType-ClusterName(Yarn-bdp) "), LABEL_ERROR_CODE( 25002, "The value of the label is set incorrectly, only one value can be set, and the separator symbol '-' cannot be used(标签的值设置错误,只能设置一个值,不能使用分割符符号 '-') "), diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala index ec18cfb228..883fd20337 100644 --- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala +++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala @@ -115,7 +115,8 @@ class ComputationEngineConnManager extends AbstractEngineConnManager with Loggin s"${mark.getMarkId()} Failed to askEngineAskRequest time taken ($taken), ${t.getMessage}" ) retryException = t - // add isCrossClusterRetryException flag + // add isCrossClusterRetryException flag + engineAskRequest.getProperties.put("isCrossClusterRetryException", "true") case t: Throwable => val taken = ByteTimeUtils.msDurationToString(System.currentTimeMillis - start) logger.warn(s"${mark.getMarkId()} Failed to askEngineAskRequest time taken ($taken)") diff --git a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/conf/AcrossClusterConfiguration.scala b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/conf/AcrossClusterConfiguration.scala index f6cd36f69c..64635cf914 100644 --- a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/conf/AcrossClusterConfiguration.scala +++ b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/conf/AcrossClusterConfiguration.scala @@ -22,6 +22,6 @@ import org.apache.linkis.common.conf.CommonVars object AcrossClusterConfiguration { val ACROSS_CLUSTER_QUEUE_SUFFIX = - CommonVars.apply("linkis.configuration.across.cluster.queue.suffix", "_bdap2bdp").getValue + CommonVars.apply("linkis.configuration.across.cluster.queue.suffix", "bdap2bdp").getValue } diff --git a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/dao/AcrossClusterRuleMapper.java b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/dao/AcrossClusterRuleMapper.java index dc10d5b02d..be82de883f 100644 --- a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/dao/AcrossClusterRuleMapper.java +++ b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/dao/AcrossClusterRuleMapper.java @@ -27,16 +27,16 @@ public interface AcrossClusterRuleMapper { AcrossClusterRule getAcrossClusterRule(@Param("id") Long id); - void deleteAcrossClusterRule(@Param("creator") String creator, @Param("user") String user); + void deleteAcrossClusterRule(@Param("creator") String creator, @Param("username") String username); void updateAcrossClusterRule(@Param("acrossClusterRule") AcrossClusterRule acrossClusterRule); void insertAcrossClusterRule(@Param("acrossClusterRule") AcrossClusterRule acrossClusterRule); List queryAcrossClusterRuleList( - @Param("user") String user, + @Param("username") String username, @Param("creator") String creator, @Param("clusterName") String clusterName); - void validAcrossClusterRule(@Param("acrossClusterRule") AcrossClusterRule acrossClusterRule); + void validAcrossClusterRule(@Param("isValid") String isValid, @Param("id") Long id); } diff --git a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/entity/AcrossClusterRule.java b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/entity/AcrossClusterRule.java index 3ea0c22d47..c24cfd3d44 100644 --- a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/entity/AcrossClusterRule.java +++ b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/entity/AcrossClusterRule.java @@ -24,7 +24,7 @@ public class AcrossClusterRule { private Long id; private String clusterName; private String creator; - private String user; + private String username; private Date createTime; private String createBy; private Date updateTime; @@ -58,12 +58,12 @@ public void setCreator(String creator) { this.creator = creator; } - public String getUser() { - return user; + public String getUsername() { + return username; } - public void setUser(String user) { - this.user = user; + public void setUsername(String username) { + this.username = username; } public Date getCreateTime() { @@ -125,8 +125,8 @@ public String toString() { + ", creator='" + creator + '\'' - + ", user='" - + user + + ", username='" + + username + '\'' + ", createTime=" + createTime diff --git a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/restful/api/AcrossClusterRuleRestfulApi.java b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/restful/api/AcrossClusterRuleRestfulApi.java index a0ae390576..568579f3d2 100644 --- a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/restful/api/AcrossClusterRuleRestfulApi.java +++ b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/restful/api/AcrossClusterRuleRestfulApi.java @@ -60,8 +60,8 @@ public class AcrossClusterRuleRestfulApi { }) @RequestMapping(path = "/isValid", method = RequestMethod.PUT) public Message isValidRule(HttpServletRequest req, @RequestBody Map json) { - String username = ModuleUserUtils.getOperationUser(req, "execute valid acrossClusterRule"); - if (!Configuration.isAdmin(username)) { + String operationUser = ModuleUserUtils.getOperationUser(req, "execute valid acrossClusterRule"); + if (!Configuration.isAdmin(operationUser)) { return Message.error( "Failed to valid acrossClusterRule List,msg: only administrators can configure"); } @@ -73,8 +73,9 @@ public Message isValidRule(HttpServletRequest req, @RequestBody Map json) { - String username = ModuleUserUtils.getOperationUser(req, "execute update acrossClusterRule"); - if (!Configuration.isAdmin(username)) { + String operationUser = ModuleUserUtils.getOperationUser(req, "execute update acrossClusterRule"); + if (!Configuration.isAdmin(operationUser)) { return Message.error( "Failed to update acrossClusterRule,msg: only administrators can configure"); } @@ -198,7 +199,7 @@ public Message updateAcrossClusterRule( Long id = idInt.longValue(); String clusterName = (String) json.get("clusterName"); String creator = (String) json.get("creator"); - String user = (String) json.get("user"); + String username = (String) json.get("username"); String isValid = (String) json.get("isValid"); String startTime = (String) json.get("startTime"); String endTime = (String) json.get("endTime"); @@ -208,7 +209,7 @@ public Message updateAcrossClusterRule( String MemoryPercentageThreshold = (String) json.get("MemoryPercentageThreshold"); if (StringUtils.isBlank(clusterName) || StringUtils.isBlank(creator) - || StringUtils.isBlank(user) + || StringUtils.isBlank(username) || StringUtils.isBlank(isValid) || StringUtils.isBlank(startTime) || StringUtils.isBlank(endTime) @@ -232,8 +233,8 @@ public Message updateAcrossClusterRule( acrossClusterRule.setId(id); acrossClusterRule.setClusterName(clusterName.toLowerCase()); acrossClusterRule.setCreator(creator); - acrossClusterRule.setUser(user); - acrossClusterRule.setUpdateBy(username); + acrossClusterRule.setUsername(username); + acrossClusterRule.setUpdateBy(operationUser); acrossClusterRule.setRules(rules); acrossClusterRule.setIsValid(isValid); acrossClusterRuleService.updateAcrossClusterRule(acrossClusterRule); @@ -252,7 +253,7 @@ public Message updateAcrossClusterRule( @ApiImplicitParam(name = "req", dataType = "HttpServletRequest", value = "req"), @ApiImplicitParam(name = "clusterName", dataType = "String", value = "clusterName"), @ApiImplicitParam(name = "creator", dataType = "String", value = "creator"), - @ApiImplicitParam(name = "user", dataType = "String", value = "user"), + @ApiImplicitParam(name = "username", dataType = "String", value = "username"), @ApiImplicitParam(name = "isValid", dataType = "String", value = "isValid"), @ApiImplicitParam(name = "startTime", dataType = "String", value = "startTime"), @ApiImplicitParam(name = "endTime", dataType = "String", value = "endTime"), @@ -270,15 +271,15 @@ public Message updateAcrossClusterRule( @RequestMapping(path = "/add", method = RequestMethod.POST) public Message insertAcrossClusterRule( HttpServletRequest req, @RequestBody Map json) { - String username = ModuleUserUtils.getOperationUser(req, "execute add acrossClusterRule"); - if (!Configuration.isAdmin(username)) { + String operationUser = ModuleUserUtils.getOperationUser(req, "execute add acrossClusterRule"); + if (!Configuration.isAdmin(operationUser)) { return Message.error( "Failed to add acrossClusterRule,msg: only administrators can configure"); } String clusterName = (String) json.get("clusterName"); String creator = (String) json.get("creator"); - String user = (String) json.get("user"); + String username = (String) json.get("username"); String isValid = (String) json.get("isValid"); String startTime = (String) json.get("startTime"); String endTime = (String) json.get("endTime"); @@ -288,7 +289,7 @@ public Message insertAcrossClusterRule( String MemoryPercentageThreshold = (String) json.get("MemoryPercentageThreshold"); if (StringUtils.isBlank(clusterName) || StringUtils.isBlank(creator) - || StringUtils.isBlank(user) + || StringUtils.isBlank(username) || StringUtils.isBlank(isValid) || StringUtils.isBlank(startTime) || StringUtils.isBlank(endTime) @@ -311,9 +312,9 @@ public Message insertAcrossClusterRule( AcrossClusterRule acrossClusterRule = new AcrossClusterRule(); acrossClusterRule.setClusterName(clusterName.toLowerCase()); acrossClusterRule.setCreator(creator); - acrossClusterRule.setUser(user); - acrossClusterRule.setCreateBy(username); - acrossClusterRule.setUpdateBy(username); + acrossClusterRule.setUsername(username); + acrossClusterRule.setCreateBy(operationUser); + acrossClusterRule.setUpdateBy(operationUser); acrossClusterRule.setRules(rules); acrossClusterRule.setIsValid(isValid); acrossClusterRuleService.insertAcrossClusterRule(acrossClusterRule); diff --git a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/AcrossClusterRuleService.java b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/AcrossClusterRuleService.java index 30588cb1ac..2fff11c871 100644 --- a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/AcrossClusterRuleService.java +++ b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/AcrossClusterRuleService.java @@ -23,15 +23,15 @@ public interface AcrossClusterRuleService { - void deleteAcrossClusterRule(String creator, String user) throws Exception; + void deleteAcrossClusterRule(String creator, String username) throws Exception; void updateAcrossClusterRule(AcrossClusterRule acrossClusterRule) throws Exception; void insertAcrossClusterRule(AcrossClusterRule acrossClusterRule) throws Exception; Map queryAcrossClusterRuleList( - String creator, String user, String clusterName, Integer pageNow, Integer pageSize) + String creator, String username, String clusterName, Integer pageNow, Integer pageSize) throws Exception; - void validAcrossClusterRule(Long id, String isValid, String updateBy) throws Exception; + void validAcrossClusterRule(Long id, String isValid) throws Exception; } diff --git a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/impl/AcrossClusterRuleServiceImpl.java b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/impl/AcrossClusterRuleServiceImpl.java index 377c27fd72..611dab5126 100644 --- a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/impl/AcrossClusterRuleServiceImpl.java +++ b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/impl/AcrossClusterRuleServiceImpl.java @@ -39,8 +39,8 @@ public class AcrossClusterRuleServiceImpl implements AcrossClusterRuleService { @Autowired private AcrossClusterRuleMapper ruleMapper; @Override - public void deleteAcrossClusterRule(String creator, String user) throws Exception { - ruleMapper.deleteAcrossClusterRule(creator, user); + public void deleteAcrossClusterRule(String creator, String username) throws Exception { + ruleMapper.deleteAcrossClusterRule(creator, username); logger.info("delete acrossClusterRule success"); return; } @@ -75,7 +75,7 @@ public void insertAcrossClusterRule(AcrossClusterRule acrossClusterRule) throws @Override public Map queryAcrossClusterRuleList( - String creator, String user, String clusterName, Integer pageNow, Integer pageSize) { + String creator, String username, String clusterName, Integer pageNow, Integer pageSize) { Map result = new HashMap<>(2); List acrossClusterRules = null; if (Objects.isNull(pageNow)) { @@ -87,7 +87,7 @@ public Map queryAcrossClusterRuleList( PageHelper.startPage(pageNow, pageSize); try { - acrossClusterRules = ruleMapper.queryAcrossClusterRuleList(user, creator, clusterName); + acrossClusterRules = ruleMapper.queryAcrossClusterRuleList(username, creator, clusterName); } finally { PageHelper.clearPage(); } @@ -98,17 +98,15 @@ public Map queryAcrossClusterRuleList( } @Override - public void validAcrossClusterRule(Long id, String isValid, String updateBy) throws Exception { - AcrossClusterRule acrossClusterRule = ruleMapper.getAcrossClusterRule(id); - if (acrossClusterRule == null) { + public void validAcrossClusterRule(Long id, String isValid) throws Exception { + AcrossClusterRule beforeRule = ruleMapper.getAcrossClusterRule(id); + + if (beforeRule == null) { logger.info("acrossClusterRule not exit"); throw new Exception("acrossClusterRule not exit"); } - acrossClusterRule.setIsValid(isValid); - acrossClusterRule.setUpdateBy(updateBy); - acrossClusterRule.setUpdateTime(new Date()); - logger.info("delete acrossClusterRule success"); - ruleMapper.validAcrossClusterRule(acrossClusterRule); + + ruleMapper.validAcrossClusterRule(isValid, id); logger.info("valid acrossClusterRule success"); return; } diff --git a/linkis-public-enhancements/linkis-configuration/src/main/resources/mapper/common/AcrossClusterRuleMapper.xml b/linkis-public-enhancements/linkis-configuration/src/main/resources/mapper/common/AcrossClusterRuleMapper.xml index 6997963313..29ae208a71 100644 --- a/linkis-public-enhancements/linkis-configuration/src/main/resources/mapper/common/AcrossClusterRuleMapper.xml +++ b/linkis-public-enhancements/linkis-configuration/src/main/resources/mapper/common/AcrossClusterRuleMapper.xml @@ -24,7 +24,7 @@ - + @@ -34,17 +34,17 @@ - id,cluster_name,creator,user,create_time,create_by,update_time,update_by,rules,is_valid + id,cluster_name,creator,username,create_time,create_by,update_time,update_by,rules,is_valid - cluster_name,creator,user,create_time,create_by,update_time,update_by,rules,is_valid + cluster_name,creator,username,create_time,create_by,update_time,update_by,rules,is_valid - SELECT `key`, `description`, `name`, `engine_conn_type`, + SELECT `id`, `key`, `description`, `name`, `engine_conn_type`, `default_value`, `validate_type`, `validate_range`,boundary_type,template_required FROM linkis_ps_configuration_config_key - - - - `engine_conn_type` = #{engineType} - AND id IN - ( - SELECT temp.config_key_id FROM - ( SELECT * from linkis_ps_configuration_key_engine_relation) AS temp - WHERE temp.config_key_id IN - ( - SELECT B.id FROM - (SELECT * from linkis_ps_configuration_config_key ) AS B - WHERE B. `engine_conn_type` = #{engineType} - ) - ); - - + WHERE `engine_conn_type` = #{engineType} + + + + + + + + + + + + + + + + + + + + From 0713575195f08cd2ee8610c308e897f8e1295e0b Mon Sep 17 00:00:00 2001 From: v-kkhuang <62878639+v-kkhuang@users.noreply.github.com> Date: Tue, 5 Sep 2023 22:03:32 +0800 Subject: [PATCH 03/11] =?UTF-8?q?=E3=80=901.1.15=E3=80=91fix=20=20spark=20?= =?UTF-8?q?=20conf=20=20bug=20(#279)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Remove spaces --------- Co-authored-by: huangKai-2323 <62878639+huangKai-2323@users.noreply.github.com> Co-authored-by: casionone --- linkis-dist/package/db/linkis_dml.sql | 6 +- .../upgrade/1.4.1_schema/mysql/linkis_dml.sql | 10 +-- .../restful/api/ConfigurationRestfulApi.java | 62 ++++++++++--------- 3 files changed, 41 insertions(+), 37 deletions(-) diff --git a/linkis-dist/package/db/linkis_dml.sql b/linkis-dist/package/db/linkis_dml.sql index a6b0541258..fe7ccfc1ce 100644 --- a/linkis-dist/package/db/linkis_dml.sql +++ b/linkis-dist/package/db/linkis_dml.sql @@ -89,9 +89,9 @@ INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.tispark.tidb.port', NULL, NULL, '4000', 'None', NULL, '0', '0', '1', 'tidb设置', 'spark'); INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.tispark.tidb.user', NULL, NULL, 'root', 'None', NULL, '0', '0', '1', 'tidb设置', 'spark'); INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.python.version', '取值范围:python2,python3', 'python版本','python2', 'OFT', '[\"python3\",\"python2\"]', '0', '0', '1', 'spark引擎设置', 'spark'); -INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`, `is_hidden`, `is_advanced`, `level`, `treeName`, `boundary_type`, `en_treeName`, `en_description`, `en_name`) VALUES ('spark.conf', '多个参数使用分号[;]分隔 例如spark.sql.shuffle.partitions=10;', 'spark自定义配置参数',null, 'None', NULL, 'spark',0, 1, 1,'spark资源设置', 0, 'Spark resource settings','Multiple parameters are separated by semicolons [;] For example, spark.sql.shuffle.partitions=10;', 'Spark custom configuration parameters'); -INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`, `is_hidden`, `is_advanced`, `level`, `treeName`, `boundary_type`, `en_treeName`, `en_description`, `en_name`) VALUES ('spark.locality.wait', '范围:0-3000,单位:毫秒', '任务调度本地等待时间', '3000', 'OFT', '[\"0\",\"1000\",\"2000\",\"3000\"]', 'spark', 0, 1, 1, 'spark资源设置', 0, 'Spark resource settings', 'Range: 0-3000, Unit: millisecond', 'Task scheduling local waiting time'); -INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`, `is_hidden`, `is_advanced`, `level`, `treeName`, `boundary_type`, `en_treeName`, `en_description`, `en_name`) VALUES ('spark.memory.fraction', '范围:0.4,0.5,0.6,单位:百分比', '执行内存和存储内存的百分比', '0.6', 'OFT', '[\"0.4\",\"0.5\",\"0.6\"]', 'spark', 0, 1, 1, 'spark资源设置', 0, 'Spark resource settings', 'Range: 0.4, 0.5, 0.6, in percentage', 'Percentage of execution memory and storage memory'); +INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`, `is_hidden`, `is_advanced`, `level`, `treeName`, `boundary_type`, `en_treeName`, `en_description`, `en_name`) VALUES ('spark.conf', '多个参数使用分号[;]分隔 例如spark.sql.shuffle.partitions=10;', 'spark自定义配置参数',null, 'None', NULL, 'spark',0, 1, 1,'spark资源设置', 0, 'Spark Resource Settings','Multiple parameters are separated by semicolons [;] For example, spark.sql.shuffle.partitions=10;', 'Spark Custom Configuration Parameters'); +INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`, `is_hidden`, `is_advanced`, `level`, `treeName`, `boundary_type`, `en_treeName`, `en_description`, `en_name`) VALUES ('spark.locality.wait', '范围:0-3000,单位:毫秒', '任务调度本地等待时间', '3000', 'OFT', '[\"0\",\"1000\",\"2000\",\"3000\"]', 'spark', 0, 1, 1, 'spark资源设置', 0, 'Spark Resource Settings', 'Range: 0-3000, Unit: millisecond', 'Task Scheduling Local Waiting Time'); +INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`, `is_hidden`, `is_advanced`, `level`, `treeName`, `boundary_type`, `en_treeName`, `en_description`, `en_name`) VALUES ('spark.memory.fraction', '范围:0.4,0.5,0.6,单位:百分比', '执行内存和存储内存的百分比', '0.6', 'OFT', '[\"0.4\",\"0.5\",\"0.6\"]', 'spark', 0, 1, 1, 'spark资源设置', 0, 'Spark Resource Settings', 'Range: 0.4, 0.5, 0.6, in percentage', 'Percentage Of Execution Memory And Storage Memory'); -- hive INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.rm.instance', '范围:1-20,单位:个', 'hive引擎最大并发数', '20', 'NumInterval', '[1,20]', '0', '0', '1', '队列资源', 'hive'); diff --git a/linkis-dist/package/db/upgrade/1.4.1_schema/mysql/linkis_dml.sql b/linkis-dist/package/db/upgrade/1.4.1_schema/mysql/linkis_dml.sql index 20d4e7445b..8393773757 100644 --- a/linkis-dist/package/db/upgrade/1.4.1_schema/mysql/linkis_dml.sql +++ b/linkis-dist/package/db/upgrade/1.4.1_schema/mysql/linkis_dml.sql @@ -20,7 +20,6 @@ update linkis_ps_configuration_config_key set engine_conn_type = "" where engin INSERT INTO linkis_ps_error_code (error_code,error_desc,error_regex,error_type) VALUES ('13008','任务产生的序列化结果总大小超过了配置的spark.driver.maxResultSize限制。请检查您的任务,看看是否有可能减小任务产生的结果大小,或则可以考虑压缩或合并结果,以减少传输的数据量','is bigger than spark.driver.maxResultSize',0); update linkis_ps_configuration_config_key set template_required = 1 where `key` in ( -"wds.linkis.rm.instance", "spark.executor.instances", "spark.executor.memory", "spark.driver.memory", @@ -28,6 +27,7 @@ update linkis_ps_configuration_config_key set template_required = 1 where `key` "mapreduce.job.running.map.limit", "mapreduce.job.running.reduce.limit", ) +update linkis_ps_configuration_config_key set template_required = 1 where `key` = "wds.linkis.rm.instance" and engine_conn_type ="spark"; -- spark.conf INSERT INTO linkis_ps_configuration_config_key (`key`, description, name, @@ -39,8 +39,8 @@ VALUES( 'spark.conf', '多个参数使用分号[;]分隔 例如spark.sql.shuffle.partitions=10;', 'spark自定义配置参数', null, 'None', NULL, 'spark', 0, 1, 1, -'spark资源设置', 0, 'Spark resource settings', -'Multiple parameters are separated by semicolons [;] For example, spark.sql.shuffle.partitions=10;', 'Spark custom configuration parameters'); +'spark资源设置', 0, 'Spark Resource Settings', +'Multiple parameters are separated by semicolons [;] For example, spark.sql.shuffle.partitions=10;', 'Spark Custom Configuration Parameters'); INSERT INTO `linkis_ps_configuration_key_engine_relation` (`config_key_id`, `engine_type_label_id`) ( @@ -68,7 +68,7 @@ INSERT INTO `linkis_ps_configuration_config_value` (`config_key_id`, `config_val INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`, `is_hidden`, `is_advanced`, `level`, `treeName`, `boundary_type`, `en_treeName`, `en_description`, `en_name`) VALUES -('spark.locality.wait', '范围:0-3000,单位:毫秒', '任务调度本地等待时间', '3000', 'OFT', '[\"0\",\"1000\",\"2000\",\"3000\"]', 'spark', 0, 1, 1, 'spark资源设置', 0, 'Spark resource settings', 'Range: 0-3000, Unit: millisecond', 'Task scheduling local waiting time'); +('spark.locality.wait', '范围:0-3000,单位:毫秒', '任务调度本地等待时间', '3000', 'OFT', '[\"0\",\"1000\",\"2000\",\"3000\"]', 'spark', 0, 1, 1, 'spark资源设置', 0, 'Spark Resource Settings', 'Range: 0-3000, Unit: millisecond', 'Task Scheduling Local Waiting Time'); -- all 默认 @@ -97,7 +97,7 @@ INSERT INTO `linkis_ps_configuration_config_value` (`config_key_id`, `config_val INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`, `is_hidden`, `is_advanced`, `level`, `treeName`, `boundary_type`, `en_treeName`, `en_description`, `en_name`) VALUES -('spark.memory.fraction', '范围:0.4,0.5,0.6,单位:百分比', '执行内存和存储内存的百分比', '0.6', 'OFT', '[\"0.4\",\"0.5\",\"0.6\"]', 'spark', 0, 1, 1, 'spark资源设置', 0, 'Spark resource settings', 'Range: 0.4, 0.5, 0.6, in percentage', 'Percentage of execution memory and storage memory'); +('spark.memory.fraction', '范围:0.4,0.5,0.6,单位:百分比', '执行内存和存储内存的百分比', '0.6', 'OFT', '[\"0.4\",\"0.5\",\"0.6\"]', 'spark', 0, 1, 1, 'spark资源设置', 0, 'Spark Resource Settings', 'Range: 0.4, 0.5, 0.6, in percentage', 'Percentage Of Execution Memory And Storage Memory'); -- all 默认 diff --git a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/restful/api/ConfigurationRestfulApi.java b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/restful/api/ConfigurationRestfulApi.java index 3d996793f9..17e32cebe4 100644 --- a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/restful/api/ConfigurationRestfulApi.java +++ b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/restful/api/ConfigurationRestfulApi.java @@ -297,11 +297,23 @@ public Message saveFullTree(HttpServletRequest req, @RequestBody JsonNode json) String username = ModuleUserUtils.getOperationUser(req, "saveFullTree"); ArrayList createList = new ArrayList<>(); ArrayList updateList = new ArrayList<>(); + ArrayList> chekList = new ArrayList<>(); + String sparkConf = ""; for (Object o : fullTrees) { String s = BDPJettyServerHelper.gson().toJson(o); ConfigTree fullTree = BDPJettyServerHelper.gson().fromJson(s, ConfigTree.class); List settings = fullTree.getSettings(); - sparkConfCheck(settings); + chekList.add(settings); + for (ConfigKeyValue configKeyValue : settings) { + if (configKeyValue.getKey().equals("spark.conf") + && StringUtils.isNotBlank(configKeyValue.getConfigValue())) { + sparkConf = configKeyValue.getConfigValue().trim(); + configKeyValue.setConfigValue(sparkConf); + } + } + } + for (List settings : chekList) { + sparkConfCheck(settings, sparkConf); Integer userLabelId = configurationService.checkAndCreateUserLabel(settings, username, creator); for (ConfigKeyValue setting : settings) { @@ -355,34 +367,26 @@ public Message saveFullTree(HttpServletRequest req, @RequestBody JsonNode json) return Message.ok(); } - private void sparkConfCheck(List settings) throws ConfigurationException { - for (ConfigKeyValue setting : settings) { - if (setting.getKey().equals("spark.conf") - && StringUtils.isNotBlank(setting.getConfigValue())) { - // Check if there are any duplicates in spark. conf - String[] split = setting.getConfigValue().split(";"); - int setSize = - Arrays.stream(split) - .map(s -> s.split("=")[0].trim()) - .collect(Collectors.toSet()) - .size(); - int listSize = - Arrays.stream(split) - .map(s -> s.split("=")[0].trim()) - .collect(Collectors.toList()) - .size(); - if (listSize != setSize) { - throw new ConfigurationException("Spark.conf contains duplicate keys"); - } - // Check if there are any duplicates in the spark.conf configuration and other individual - for (String keyValue : split) { - String key = keyValue.split("=")[0].trim(); - boolean matchResult = - settings.stream().anyMatch(settingKey -> key.equals(settingKey.getKey())); - if (matchResult) { - throw new ConfigurationException( - "Saved key is duplicated with the spark conf key , key :" + key); - } + private void sparkConfCheck(List settings, String sparkConf) + throws ConfigurationException { + if (StringUtils.isNotBlank(sparkConf)) { + // Check if there are any duplicates in spark. conf + String[] split = sparkConf.split(";"); + int setSize = + Arrays.stream(split).map(s -> s.split("=")[0].trim()).collect(Collectors.toSet()).size(); + int listSize = + Arrays.stream(split).map(s -> s.split("=")[0].trim()).collect(Collectors.toList()).size(); + if (listSize != setSize) { + throw new ConfigurationException("Spark.conf contains duplicate keys"); + } + // Check if there are any duplicates in the spark.conf configuration and other individual + for (String keyValue : split) { + String key = keyValue.split("=")[0].trim(); + boolean matchResult = + settings.stream().anyMatch(settingKey -> key.equals(settingKey.getKey())); + if (matchResult) { + throw new ConfigurationException( + "Saved key is duplicated with the spark conf key , key :" + key); } } } From 296b824e1b2c22ad16c163843290907fb4af39c4 Mon Sep 17 00:00:00 2001 From: v-kkhuang <62878639+v-kkhuang@users.noreply.github.com> Date: Tue, 5 Sep 2023 22:10:55 +0800 Subject: [PATCH 04/11] =?UTF-8?q?=E3=80=901.1.15=E3=80=91filesystem=20rena?= =?UTF-8?q?me=20add=20no=20admin=20check=20(#280)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * push code * code optimize * code optimze --------- Co-authored-by: huangKai-2323 <62878639+huangKai-2323@users.noreply.github.com> Co-authored-by: casionone --- .../filesystem/restful/api/FsRestfulApi.java | 14 +++++-- tool/dependencies/known-dependencies.txt | 40 +++++++++---------- 2 files changed, 28 insertions(+), 26 deletions(-) diff --git a/linkis-public-enhancements/linkis-script-dev/linkis-storage-script-dev-server/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java b/linkis-public-enhancements/linkis-script-dev/linkis-storage-script-dev-server/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java index 1e1aacda49..019af3c883 100644 --- a/linkis-public-enhancements/linkis-script-dev/linkis-storage-script-dev-server/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java +++ b/linkis-public-enhancements/linkis-script-dev/linkis-storage-script-dev-server/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java @@ -84,6 +84,8 @@ public class FsRestfulApi { private final Logger LOGGER = LoggerFactory.getLogger(getClass()); + + /** * check 权限 * @@ -91,7 +93,7 @@ public class FsRestfulApi { * @param userName * @return */ - private boolean checkIsUsersDirectory(String requestPath, String userName) { + private boolean checkIsUsersDirectory(String requestPath, String userName, Boolean withAdmin) { boolean ownerCheck = WorkSpaceConfiguration.FILESYSTEM_PATH_CHECK_OWNER.getValue(); if (!ownerCheck) { LOGGER.debug("not check filesystem owner."); @@ -102,11 +104,10 @@ private boolean checkIsUsersDirectory(String requestPath, String userName) { WorkspaceUtil.suffixTuning(HDFS_USER_ROOT_PATH_PREFIX.getValue()); String hdfsUserRootPathSuffix = HDFS_USER_ROOT_PATH_SUFFIX.getValue(); String localUserRootPath = WorkspaceUtil.suffixTuning(LOCAL_USER_ROOT_PATH.getValue()); - String path; String workspacePath = hdfsUserRootPathPrefix + userName + hdfsUserRootPathSuffix; String enginconnPath = localUserRootPath + userName; - if (Configuration.isJobHistoryAdmin(userName)) { + if (withAdmin && Configuration.isJobHistoryAdmin(userName)) { workspacePath = hdfsUserRootPathPrefix; enginconnPath = localUserRootPath; } @@ -117,6 +118,11 @@ private boolean checkIsUsersDirectory(String requestPath, String userName) { return (requestPath.contains(workspacePath)) || (requestPath.contains(enginconnPath)); } + private boolean checkIsUsersDirectory(String requestPath, String userName) { + return checkIsUsersDirectory(requestPath, userName, true); + } + + @ApiOperation(value = "getUserRootPath", notes = "get user root path", response = Message.class) @ApiImplicitParams({ @ApiImplicitParam(name = "pathType", required = false, dataType = "String", value = "path type") @@ -233,7 +239,7 @@ public Message rename(HttpServletRequest req, @RequestBody JsonNode json) PathValidator$.MODULE$.validate(oldDest, userName); PathValidator$.MODULE$.validate(newDest, userName); } - if (!checkIsUsersDirectory(newDest, userName)) { + if (!checkIsUsersDirectory(newDest, userName, false)) { throw WorkspaceExceptionManager.createException(80010, userName, newDest); } if (StringUtils.isEmpty(oldDest)) { diff --git a/tool/dependencies/known-dependencies.txt b/tool/dependencies/known-dependencies.txt index 06ea1b3362..ab85adce4c 100644 --- a/tool/dependencies/known-dependencies.txt +++ b/tool/dependencies/known-dependencies.txt @@ -30,9 +30,7 @@ aspectjweaver-1.9.6.jar audience-annotations-0.5.0.jar avatica-1.8.0.jar avatica-metrics-1.8.0.jar -avro-1.7.4.jar avro-1.7.7.jar -avro-1.11.0.jar bcpkix-jdk15on-1.64.jar bcprov-jdk15on-1.64.jar bonecp-0.8.0.RELEASE.jar @@ -52,6 +50,7 @@ chill-java-0.7.6.jar chill_2.11-0.7.6.jar classgraph-4.1.7.jar classmate-1.5.1.jar +clickhouse-jdbc-0.3.2-patch11.jar commons-beanutils-1.9.4.jar commons-cli-1.2.jar commons-cli-1.3.1.jar @@ -77,6 +76,7 @@ commons-math3-3.6.1.jar commons-net-3.1.jar commons-net-3.9.0.jar commons-pool-1.6.jar +commons-pool2-2.8.1.jar commons-text-1.10.0.jar concurrent-0.191.jar config-1.3.3.jar @@ -89,8 +89,8 @@ datanucleus-api-jdo-4.2.4.jar datanucleus-core-4.1.17.jar datanucleus-rdbms-4.1.19.jar derby-10.14.2.0.jar -disruptor-3.4.0.jar disruptor-3.3.0.jar +disruptor-3.4.0.jar dropwizard-metrics-hadoop-metrics2-reporter-0.1.2.jar druid-1.1.22.jar eigenbase-properties-1.1.5.jar @@ -172,6 +172,10 @@ hadoop-mapreduce-client-shuffle-2.7.2.jar hadoop-yarn-api-2.7.2.jar hadoop-yarn-client-2.7.2.jar hadoop-yarn-common-2.7.2.jar + + +decimal( 10 , 2) + hadoop-yarn-registry-2.7.1.jar hadoop-yarn-server-common-2.7.2.jar hbase-annotations-1.1.1.jar @@ -269,6 +273,7 @@ jcommander-1.30.jar jdbi3-core-3.4.0.jar jdbi3-sqlobject-3.4.0.jar jdo-api-3.0.1.jar +jedis-2.9.2.jar jersey-apache-client4-1.19.4.jar jersey-client-1.19.4.jar jersey-client-2.23.1.jar @@ -303,10 +308,10 @@ jline-2.14.6.jar jmxutils-1.19.jar jna-5.12.1.jar jna-platform-5.12.1.jar +joda-time-2.10.5.jar joda-time-2.3.jar joda-time-2.8.1.jar joda-time-2.9.3.jar -joda-time-2.10.5.jar jol-core-0.2.jar joni-2.1.2.jar jpam-1.1.jar @@ -318,7 +323,6 @@ json4s-core_2.11-3.5.3.jar json4s-jackson_2.11-3.5.3.jar json4s-scalap_2.11-3.5.3.jar jsp-api-2.1.jar -jsqlparser-1.0.jar jsqlparser-4.2.jar jsr305-1.3.9.jar jsr305-3.0.1.jar @@ -326,8 +330,8 @@ jsr305-3.0.2.jar jsr311-api-1.1.1.jar jta-1.1.jar jul-to-slf4j-1.7.30.jar -kafka-clients-2.5.1.jar kafka-clients-2.7.0.jar +kafka-clients-3.4.0.jar knife4j-annotations-2.0.9.jar knife4j-core-2.0.9.jar knife4j-spring-2.0.9.jar @@ -361,6 +365,7 @@ metrics-jvm-4.1.22.jar micrometer-core-1.5.14.jar micrometer-registry-prometheus-1.5.14.jar minlog-1.3.0.jar +mongo-java-driver-3.12.8.jar mybatis-3.5.6.jar mybatis-plus-3.4.1.jar mybatis-plus-annotation-3.4.1.jar @@ -419,13 +424,13 @@ oro-2.0.8.jar osgi-resource-locator-1.0.1.jar oshi-core-6.2.1.jar pagehelper-5.3.1.jar -paranamer-2.3.jar paranamer-2.8.jar parquet-hadoop-bundle-1.8.1.jar poi-5.2.3.jar poi-ooxml-5.2.3.jar poi-ooxml-lite-5.2.3.jar poi-shared-strings-2.5.6.jar +postgresql-42.3.8.jar presto-client-0.234.jar presto-client-1.5.0.jar presto-resource-group-managers-0.234.jar @@ -461,6 +466,9 @@ scala-reflect-2.11.12.jar scala-xml_2.11-1.0.5.jar scalap-2.11.12.jar scopt_2.11-3.5.0.jar +seatunnel-core-flink-2.1.2.jar +seatunnel-core-flink-sql-2.1.2.jar +seatunnel-core-spark-2.1.2.jar security-0.191.jar security-0.193.jar servo-core-0.12.21.jar @@ -469,11 +477,10 @@ simpleclient_common-0.8.1.jar slf4j-api-1.7.30.jar slice-0.38.jar slider-core-0.90.2-incubating.jar -snakeyaml-1.33.jar -snappy-java-1.0.4.1.jar +snakeyaml-1.33.jar>hadoop-mapreduce-client-core snappy-java-1.0.5.jar -snappy-java-1.1.4.jar snappy-java-1.1.7.7.jar +snappy-java-1.1.8.4.jar spring-aop-5.2.22.RELEASE.jar spring-beans-5.2.22.RELEASE.jar spring-boot-2.3.12.RELEASE.jar @@ -485,7 +492,6 @@ spring-boot-starter-actuator-2.3.12.RELEASE.jar spring-boot-starter-aop-2.3.12.RELEASE.jar spring-boot-starter-cache-2.3.12.RELEASE.jar spring-boot-starter-freemarker-2.3.12.RELEASE.jar -spring-boot-starter-jdbc-2.3.12.RELEASE.jar spring-boot-starter-jetty-2.3.12.RELEASE.jar spring-boot-starter-json-2.3.12.RELEASE.jar spring-boot-starter-log4j2-2.3.12.RELEASE.jar @@ -577,15 +583,5 @@ xmlenc-0.52.jar xstream-1.4.20.jar zookeeper-3.5.9.jar zookeeper-jute-3.5.9.jar -zstd-jni-1.4.4-7.jar zstd-jni-1.4.5-6.jar -commons-pool2-2.8.1.jar -jedis-2.9.2.jar -dss-gateway-support-1.1.1.jar -seatunnel-core-flink-2.1.2.jar -seatunnel-core-flink-sql-2.1.2.jar -seatunnel-core-spark-2.1.2.jar -mongo-java-driver-3.12.8.jar -clickhouse-jdbc-0.3.2-patch11.jar -postgresql-42.3.8.jar - +zstd-jni-1.5.2-1.jar \ No newline at end of file From 520cd6fcfa6b971a8df9c382f757d0c4ca07d9f5 Mon Sep 17 00:00:00 2001 From: lemonjuice <86357693+lemonjuicelove@users.noreply.github.com> Date: Tue, 5 Sep 2023 22:13:46 +0800 Subject: [PATCH 05/11] fix null exception (#282) * fix null exception * fix username in test --- .../impl/DriverAndYarnReqResourceService.scala | 4 ++-- .../errorcode/LabelCommonErrorCodeSummary.java | 2 +- .../factory/StdLabelBuilderFactoryTest.java | 16 ++++++++++++++++ 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DriverAndYarnReqResourceService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DriverAndYarnReqResourceService.scala index 728f753977..9ca2b11083 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DriverAndYarnReqResourceService.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DriverAndYarnReqResourceService.scala @@ -79,7 +79,7 @@ class DriverAndYarnReqResourceService( val user = labelContainer.getUserCreatorLabel.getUser val creator = labelContainer.getUserCreatorLabel.getCreator val properties = engineCreateRequest.getProperties - val acrossClusterTask = properties.get("acrossClusterTask") + val acrossClusterTask = properties.getOrDefault("acrossClusterTask", "false") val CPUThreshold = properties.get("CPUThreshold") val MemoryThreshold = properties.get("MemoryThreshold") val CPUPercentageThreshold = properties.get("CPUPercentageThreshold") @@ -90,7 +90,7 @@ class DriverAndYarnReqResourceService( ) if ( - acrossClusterTask.toBoolean && StringUtils.isNotBlank(CPUThreshold) && StringUtils + StringUtils.isNotBlank(acrossClusterTask) && acrossClusterTask.toBoolean && StringUtils.isNotBlank(CPUThreshold) && StringUtils .isNotBlank(MemoryThreshold) && StringUtils .isNotBlank(CPUPercentageThreshold) && StringUtils.isNotBlank(MemoryPercentageThreshold) diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/errorcode/LabelCommonErrorCodeSummary.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/errorcode/LabelCommonErrorCodeSummary.java index c19ab8eadf..7ce6c6546d 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/errorcode/LabelCommonErrorCodeSummary.java +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/errorcode/LabelCommonErrorCodeSummary.java @@ -23,7 +23,7 @@ public enum LabelCommonErrorCodeSummary implements LinkisErrorCode { UPDATE_LABEL_FAILED(25001, "Update label realtion failed(更新标签属性失败)"), CLUSTER_LABEL_ERROR_CODE( 25002, - "The value of the label is set incorrectly, the setting value is: ClusterType-ClusterName(Yarn-bdp) "), + "The value of the label is set incorrectly, the setting value is: ClusterType-ClusterName "), LABEL_ERROR_CODE( 25002, "The value of the label is set incorrectly, only one value can be set, and the separator symbol '-' cannot be used(标签的值设置错误,只能设置一个值,不能使用分割符符号 '-') "), diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/test/java/org/apache/linkis/manager/label/builder/factory/StdLabelBuilderFactoryTest.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/test/java/org/apache/linkis/manager/label/builder/factory/StdLabelBuilderFactoryTest.java index 8d0b1e4de9..e421d8528a 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/test/java/org/apache/linkis/manager/label/builder/factory/StdLabelBuilderFactoryTest.java +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/test/java/org/apache/linkis/manager/label/builder/factory/StdLabelBuilderFactoryTest.java @@ -19,6 +19,7 @@ import org.apache.linkis.manager.label.builder.CombinedLabelBuilder; import org.apache.linkis.manager.label.builder.LabelBuilder; +import org.apache.linkis.manager.label.entity.Label; import org.apache.linkis.manager.label.entity.em.EMInstanceLabel; import org.springframework.beans.factory.annotation.Autowired; @@ -26,6 +27,10 @@ import org.junit.jupiter.api.*; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + /** StdLabelBuilderFactory Tester */ @SpringBootTest(classes = {StdLabelBuilderFactory.class}) public class StdLabelBuilderFactoryTest { @@ -73,4 +78,15 @@ public void testCreateLabelForInLabelKeyInValueStreamOutLabelClassOutValueTypes( stdLabelBuilderFactory.createLabel("testLabelKey", null, EMInstanceLabel.class, null); Assertions.assertTrue(emInstanceLabel1.getLabelKey().equals("emInstance")); } + + @Test + public void test(){ + Map input= new HashMap(); + input.put("userCreator","username-IDE"); + input.put("yarnCluster","bdp-test"); + input.put("executeOnce","true"); + List