Skip to content

Commit

Permalink
Merge branch 'dev-1.1.15-webank' into dev-1.1.15-engine-conf-update
Browse files Browse the repository at this point in the history
  • Loading branch information
v-kkhuang committed Sep 12, 2023
2 parents 08df665 + 3a47847 commit 89ca2fb
Show file tree
Hide file tree
Showing 62 changed files with 786 additions and 481 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -219,4 +219,8 @@ object StorageUtils extends Logging {
StorageConfiguration.ENABLE_IO_PROXY.getValue
}

def isHDFSPath(fsPath: FsPath): Boolean = {
HDFS.equals(fsPath.getFsType)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -242,4 +242,7 @@ object EntranceConfiguration {
val ENTRANCE_TASK_TIMEOUT_SCAN =
CommonVars("wds.linkis.entrance.task.timeout.scan", new TimeType("12h"))

val ENABLE_HDFS_JVM_USER =
CommonVars[Boolean]("linkis.entrance.enable.hdfs.jvm.user", true).getValue

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ package org.apache.linkis.entrance.log

import org.apache.linkis.common.io.{Fs, FsPath}
import org.apache.linkis.common.utils.Utils
import org.apache.linkis.entrance.conf.EntranceConfiguration
import org.apache.linkis.entrance.exception.LogReadFailedException
import org.apache.linkis.storage.FSFactory
import org.apache.linkis.storage.fs.FileSystem
import org.apache.linkis.storage.utils.StorageUtils

import java.io.{InputStream, IOException}
import java.util
Expand All @@ -36,13 +40,26 @@ class CacheLogReader(logPath: String, charset: String, sharedCache: Cache, user:
var closed = false

private def createInputStream: InputStream = {
if (!logPath.contains(user)) {
throw new LogReadFailedException(
s"${user} does not have permission to read the path $logPath"
)
}
val fsPath = new FsPath(logPath)
if (fileSystem == null) lock synchronized {
if (fileSystem == null) {
fileSystem = FSFactory.getFsByProxyUser(new FsPath(logPath), user)

fileSystem =
if (StorageUtils.isHDFSPath(fsPath) && EntranceConfiguration.ENABLE_HDFS_JVM_USER) {
FSFactory.getFs(new FsPath(logPath)).asInstanceOf[FileSystem]
} else {
FSFactory.getFsByProxyUser(new FsPath(logPath), user).asInstanceOf[FileSystem]
}

fileSystem.init(new util.HashMap[String, String]())
}
}
val inputStream: InputStream = fileSystem.read(new FsPath(logPath))
val inputStream: InputStream = fileSystem.read(fsPath)
inputStream
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,15 @@ import java.util
class HDFSCacheLogWriter(logPath: String, charset: String, sharedCache: Cache, user: String)
extends LogWriter(charset) {

if (StringUtils.isBlank(logPath))
if (StringUtils.isBlank(logPath)) {
throw new EntranceErrorException(LOGPATH_NOT_NULL.getErrorCode, LOGPATH_NOT_NULL.getErrorDesc)
}

protected var fileSystem =
protected var fileSystem = if (EntranceConfiguration.ENABLE_HDFS_JVM_USER) {
FSFactory.getFs(new FsPath(logPath)).asInstanceOf[FileSystem]
} else {
FSFactory.getFsByProxyUser(new FsPath(logPath), user).asInstanceOf[FileSystem]
}

override protected var outputStream: OutputStream = null

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public enum RMErrorCode implements LinkisErrorCode {

CLUSTER_QUEUE_INSTANCES_INSUFFICIENT(12012, "Insufficient cluster queue instance(集群队列实例不足)"),

ACROSS_CLUSTER_RULE_FAILED(12012, "across cluster rule failed(跨集群规则失败)"),

ECM_RESOURCE_INSUFFICIENT(11000, "ECM resources are insufficient(ECM 资源不足)"),

ECM_MEMORY_INSUFFICIENT(11001, "ECM memory resources are insufficient(ECM 内存资源不足)"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,20 @@ import org.apache.linkis.manager.common.entity.enumeration.MaintainType

object AMConfiguration {

val YARN_QUEUE_NAME_CONFIG_KEY = "wds.linkis.rm.yarnqueue"

val ACROSS_CLUSTER_QUEUE_SUFFIX = "queueRuleSuffix"

val ACROSS_CLUSTER_TASK = "acrossClusterTask"

val ACROSS_CLUSTER_CPU_THRESHOLD = "CPUThreshold"

val ACROSS_CLUSTER_MEMORY_THRESHOLD = "MemoryThreshold"

val ACROSS_CLUSTER_CPU_PERCENTAGE_THRESHOLD = "CPUPercentageThreshold"

val ACROSS_CLUSTER_MEMORY_PERCENTAGE_THRESHOLD = "MemoryPercentageThreshold"

val ECM_ADMIN_OPERATIONS = CommonVars("wds.linkis.governance.admin.operations", "")

val ENGINE_START_MAX_TIME =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,16 @@ class DefaultEngineCreateService
})
}

val queueRuleSuffix = props.get(AMConfiguration.ACROSS_CLUSTER_QUEUE_SUFFIX)
if (StringUtils.isNotBlank(queueRuleSuffix)) {
val queueName = props.getOrDefault(AMConfiguration.YARN_QUEUE_NAME_CONFIG_KEY, "default")
val newQueueName = queueName + "_" + queueRuleSuffix
props.put(AMConfiguration.YARN_QUEUE_NAME_CONFIG_KEY, newQueueName)
logger.info(
s"Switch queues according to queueRule with queue name : $queueName to $newQueueName"
)
}

val timeoutEngineResourceRequest = TimeoutEngineResourceRequest(
timeout,
engineCreateRequest.getUser,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ import org.apache.linkis.common.conf.CommonVars
object LabelManagerConf {

val LONG_LIVED_LABEL =
CommonVars("wds.linkis.label.node.long.lived.label.keys", "tenant").getValue
CommonVars("wds.linkis.label.node.long.lived.label.keys", "tenant|yarnCluster").getValue

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,11 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {

private val HASTATE_ACTIVE = "active"

private var provider: ExternalResourceProvider = _
private val rmAddressMap: util.Map[String, String] = new ConcurrentHashMap[String, String]()

private def getAuthorizationStr = {
val user = this.provider.getConfigMap.getOrDefault("user", "").asInstanceOf[String]
val pwd = this.provider.getConfigMap.getOrDefault("pwd", "").asInstanceOf[String]
private def getAuthorizationStr(provider: ExternalResourceProvider) = {
val user = provider.getConfigMap.getOrDefault("user", "").asInstanceOf[String]
val pwd = provider.getConfigMap.getOrDefault("pwd", "").asInstanceOf[String]
val authKey = user + ":" + pwd
Base64.getMimeEncoder.encodeToString(authKey.getBytes)
}
Expand All @@ -71,9 +70,7 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
identifier: ExternalResourceIdentifier,
provider: ExternalResourceProvider
): NodeResource = {
val rmWebHaAddress = provider.getConfigMap.get("rmWebAddress").asInstanceOf[String]
this.provider = provider
val rmWebAddress = getAndUpdateActiveRmWebAddress(rmWebHaAddress)
val rmWebAddress = getAndUpdateActiveRmWebAddress(provider)
logger.info(s"rmWebAddress: $rmWebAddress")
val queueName = identifier.asInstanceOf[YarnResourceIdentifier].getQueueName

Expand All @@ -87,7 +84,7 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
)

def maxEffectiveHandle(queueValue: Option[JValue]): Option[YarnResource] = {
val metrics = getResponseByUrl("metrics", rmWebAddress)
val metrics = getResponseByUrl("metrics", rmWebAddress, provider)
val totalResouceInfoResponse = (
(metrics \ "clusterMetrics" \ "totalMB").asInstanceOf[JInt].values.toLong,
(metrics \ "clusterMetrics" \ "totalVirtualCores").asInstanceOf[JInt].values.toLong
Expand Down Expand Up @@ -180,7 +177,7 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
def getChildQueuesOfCapacity(resp: JValue) = resp \ "queues" \ "queue"

def getResources() = {
val resp = getResponseByUrl("scheduler", rmWebAddress)
val resp = getResponseByUrl("scheduler", rmWebAddress, provider)
val schedulerType =
(resp \ "scheduler" \ "schedulerInfo" \ "type").asInstanceOf[JString].values
if ("capacityScheduler".equals(schedulerType)) {
Expand Down Expand Up @@ -255,9 +252,8 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
identifier: ExternalResourceIdentifier,
provider: ExternalResourceProvider
): java.util.List[ExternalAppInfo] = {
val rmWebHaAddress = provider.getConfigMap.get("rmWebAddress").asInstanceOf[String]

val rmWebAddress = getAndUpdateActiveRmWebAddress(rmWebHaAddress)
val rmWebAddress = getAndUpdateActiveRmWebAddress(provider)

val queueName = identifier.asInstanceOf[YarnResourceIdentifier].getQueueName

Expand All @@ -273,7 +269,7 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
val realQueueName = "root." + queueName

def getAppInfos(): Array[ExternalAppInfo] = {
val resp = getResponseByUrl("apps", rmWebAddress)
val resp = getResponseByUrl("apps", rmWebAddress, provider)
resp \ "apps" \ "app" match {
case JArray(apps) =>
val appInfoBuffer = new ArrayBuffer[YarnAppInfo]()
Expand Down Expand Up @@ -307,25 +303,29 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {

override def getResourceType: ResourceType = ResourceType.Yarn

private def getResponseByUrl(url: String, rmWebAddress: String) = {
private def getResponseByUrl(
url: String,
rmWebAddress: String,
provider: ExternalResourceProvider
) = {
val httpGet = new HttpGet(rmWebAddress + "/ws/v1/cluster/" + url)
httpGet.addHeader("Accept", "application/json")
val authorEnable: Any = this.provider.getConfigMap.get("authorEnable");
val authorEnable: Any = provider.getConfigMap.get("authorEnable");
var httpResponse: HttpResponse = null
authorEnable match {
case flag: Boolean =>
if (flag) {
httpGet.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + getAuthorizationStr)
httpGet.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + getAuthorizationStr(provider))
}
case _ =>
}
val kerberosEnable: Any = this.provider.getConfigMap.get("kerberosEnable");
val kerberosEnable: Any = provider.getConfigMap.get("kerberosEnable");
kerberosEnable match {
case flag: Boolean =>
if (flag) {
val principalName = this.provider.getConfigMap.get("principalName").asInstanceOf[String]
val keytabPath = this.provider.getConfigMap.get("keytabPath").asInstanceOf[String]
val krb5Path = this.provider.getConfigMap.get("krb5Path").asInstanceOf[String]
val principalName = provider.getConfigMap.get("principalName").asInstanceOf[String]
val keytabPath = provider.getConfigMap.get("keytabPath").asInstanceOf[String]
val krb5Path = provider.getConfigMap.get("krb5Path").asInstanceOf[String]
val requestKuu = new RequestKerberosUrlUtils(principalName, keytabPath, krb5Path, false)
val response =
requestKuu.callRestUrl(rmWebAddress + "/ws/v1/cluster/" + url, principalName)
Expand All @@ -341,8 +341,9 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
parse(EntityUtils.toString(httpResponse.getEntity()))
}

def getAndUpdateActiveRmWebAddress(haAddress: String): String = {
def getAndUpdateActiveRmWebAddress(provider: ExternalResourceProvider): String = {
// todo check if it will stuck for many requests
val haAddress = provider.getConfigMap.get("rmWebAddress").asInstanceOf[String]
var activeAddress = rmAddressMap.get(haAddress)
if (StringUtils.isBlank(activeAddress)) haAddress.intern().synchronized {
if (StringUtils.isBlank(activeAddress)) {
Expand All @@ -356,7 +357,7 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
.split(RMConfiguration.DEFAULT_YARN_RM_WEB_ADDRESS_DELIMITER.getValue)
.foreach(address => {
Utils.tryCatch {
val response = getResponseByUrl("info", address)
val response = getResponseByUrl("info", address, provider)
response \ "clusterInfo" \ "haState" match {
case state: JString =>
if (HASTATE_ACTIVE.equalsIgnoreCase(state.s)) {
Expand Down Expand Up @@ -393,12 +394,10 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
override def reloadExternalResourceAddress(
provider: ExternalResourceProvider
): java.lang.Boolean = {
if (null == provider) {
rmAddressMap.clear()
} else {
if (null != provider) {
val rmWebHaAddress = provider.getConfigMap.get("rmWebAddress").asInstanceOf[String]
rmAddressMap.remove(rmWebHaAddress)
getAndUpdateActiveRmWebAddress(rmWebHaAddress)
getAndUpdateActiveRmWebAddress(provider)
}
true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

package org.apache.linkis.manager.rm.service.impl

import org.apache.linkis.manager.am.conf.AMConfiguration
import org.apache.linkis.manager.common.constant.RMConstant
import org.apache.linkis.manager.common.entity.resource._
import org.apache.linkis.manager.common.entity.resource.ResourceType.DriverAndYarn
import org.apache.linkis.manager.common.exception.RMWarnException
import org.apache.linkis.manager.common.protocol.engine.{EngineAskRequest, EngineCreateRequest}
import org.apache.linkis.manager.label.entity.cluster.ClusterLabel
import org.apache.linkis.manager.common.protocol.engine.EngineCreateRequest
import org.apache.linkis.manager.rm.domain.RMLabelContainer
import org.apache.linkis.manager.rm.exception.RMErrorCode
import org.apache.linkis.manager.rm.external.service.ExternalResourceService
Expand Down Expand Up @@ -76,6 +76,55 @@ class DriverAndYarnReqResourceService(
throw new RMWarnException(notEnoughMessage._1, notEnoughMessage._2)
}

if (engineCreateRequest.getProperties != null) {
val user = labelContainer.getUserCreatorLabel.getUser
val creator = labelContainer.getUserCreatorLabel.getCreator
val properties = engineCreateRequest.getProperties
val acrossClusterTask = properties.getOrDefault(AMConfiguration.ACROSS_CLUSTER_TASK, "false")
val CPUThreshold = properties.get(AMConfiguration.ACROSS_CLUSTER_CPU_THRESHOLD)
val MemoryThreshold = properties.get(AMConfiguration.ACROSS_CLUSTER_MEMORY_THRESHOLD)
val CPUPercentageThreshold =
properties.get(AMConfiguration.ACROSS_CLUSTER_CPU_PERCENTAGE_THRESHOLD)
val MemoryPercentageThreshold =
properties.get(AMConfiguration.ACROSS_CLUSTER_MEMORY_PERCENTAGE_THRESHOLD)

if (
StringUtils.isNotBlank(acrossClusterTask) && acrossClusterTask.toBoolean && StringUtils
.isNotBlank(CPUThreshold) && StringUtils
.isNotBlank(MemoryThreshold)
&& StringUtils
.isNotBlank(CPUPercentageThreshold) && StringUtils.isNotBlank(MemoryPercentageThreshold)
) {

logger.info(
s"user: $user, creator: $creator task enter cross cluster resource judgment, " +
s"CPUThreshold: $CPUThreshold, MemoryThreshold: $MemoryThreshold," +
s"CPUPercentageThreshold: $CPUPercentageThreshold, MemoryPercentageThreshold: $MemoryPercentageThreshold"
)
try {
AcrossClusterRulesJudgeUtils.acrossClusterRuleCheck(
queueLeftResource.asInstanceOf[YarnResource],
usedCapacity.asInstanceOf[YarnResource],
maxCapacity.asInstanceOf[YarnResource],
CPUThreshold.toInt,
MemoryThreshold.toInt,
CPUPercentageThreshold.toDouble,
MemoryPercentageThreshold.toDouble
)
} catch {
case ex: Exception =>
throw new RMWarnException(
RMErrorCode.ACROSS_CLUSTER_RULE_FAILED.getErrorCode,
ex.getMessage
)
}

logger.info(s"user: $user, creator: $creator task meet the threshold rule")
} else {
logger.info(s"user: $user, creator: $creator task skip cross cluster resource judgment")
}
}

true
}

Expand Down
Loading

0 comments on commit 89ca2fb

Please sign in to comment.