Skip to content

Commit

Permalink
add creator task running number limit
Browse files Browse the repository at this point in the history
  • Loading branch information
peacewong committed Nov 26, 2023
1 parent fe4f1f6 commit dfb7faf
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 151 deletions.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.linkis.entrance.conf.EntranceConfiguration
import org.apache.linkis.entrance.job.EntranceExecutionJob
import org.apache.linkis.entrance.utils.JobHistoryHelper
import org.apache.linkis.scheduler.SchedulerContext
import org.apache.linkis.scheduler.queue.Group
import org.apache.linkis.scheduler.queue.{Consumer, Group}
import org.apache.linkis.scheduler.queue.fifoqueue.FIFOUserConsumer

import java.util
Expand Down Expand Up @@ -67,4 +67,45 @@ class EntranceFIFOUserConsumer(

}

override def runScheduleIntercept: Boolean = {
val consumers = getSchedulerContext.getOrCreateConsumerManager.listConsumers
var creatorRunningJobNum = 0
// APP_TEST_hadoop_hive or IDE_hadoop_hive
val groupNameStr = getGroup.getGroupName
val groupNames = groupNameStr.split("_")
val length = groupNames.length
if (length < 3) return true
// APP_TEST
val lastIndex = groupNameStr.lastIndexOf("_")
val secondLastIndex = groupNameStr.lastIndexOf("_", lastIndex - 1)
val creatorName = groupNameStr.substring(0, secondLastIndex)
// hive
val ecType = groupNames(length - 1)
for (consumer <- consumers) {
val groupName = consumer.getGroup.getGroupName
if (groupName.startsWith(creatorName) && groupName.endsWith(ecType))
creatorRunningJobNum += consumer.getRunningEvents.length
}
val creatorECTypeMaxRunningJobs =
CreatorECTypeDefaultConf.getCreatorECTypeMaxRunningJobs(creatorName, ecType)
if (logger.isDebugEnabled)
logger.debug(
"Creator: {} EC: {} there are currently:{} jobs running and maximum limit: {}",
creatorName,
ecType,
creatorRunningJobNum,
creatorECTypeMaxRunningJobs
)
if (creatorRunningJobNum > creatorECTypeMaxRunningJobs) {
logger.error(
"Creator: {} EC: {} there are currently:{} jobs running that exceed the maximum limit: {}",
creatorName,
ecType,
creatorRunningJobNum,
creatorECTypeMaxRunningJobs
)
false
} else true
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.linkis.entrance.scheduler
import org.apache.linkis.common.ServiceInstance
import org.apache.linkis.common.utils.Utils
import org.apache.linkis.entrance.conf.EntranceConfiguration
import org.apache.linkis.entrance.utils.EntranceUtils
import org.apache.linkis.instance.label.client.InstanceLabelClient
import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactoryContext
import org.apache.linkis.manager.label.constant.{LabelKeyConstant, LabelValueConstant}
Expand All @@ -44,30 +45,11 @@ class EntranceParallelConsumerManager(maxParallelismUsers: Int, schedulerName: S

if (EntranceConfiguration.ENTRANCE_GROUP_SCAN_ENABLED.getValue) {
Utils.defaultScheduler.scheduleAtFixedRate(
new Runnable {
override def run(): Unit = {
Utils.tryAndError {
logger.info("start refresh consumer group maxAllowRunningJobs")
// get all entrance server from eureka
val serviceInstances =
Sender.getInstances(Sender.getThisServiceInstance.getApplicationName)
if (null == serviceInstances || serviceInstances.isEmpty) return

// get all offline label server
val routeLabel = LabelBuilderFactoryContext.getLabelBuilderFactory
.createLabel[RouteLabel](LabelKeyConstant.ROUTE_KEY, LabelValueConstant.OFFLINE_VALUE)
val labels = new util.ArrayList[Label[_]]
labels.add(routeLabel)
val labelInstances = InstanceLabelClient.getInstance.getInstanceFromLabel(labels)

// get active entrance server
val allInstances = new util.ArrayList[ServiceInstance]()
allInstances.addAll(serviceInstances.toList.asJava)
allInstances.removeAll(labelInstances)
// refresh all group maxAllowRunningJobs
refreshAllGroupMaxAllowRunningJobs(allInstances.size())
logger.info("Finished to refresh consumer group maxAllowRunningJobs")
}
() => {
Utils.tryAndError {
// refresh all group maxAllowRunningJobs
refreshAllGroupMaxAllowRunningJobs(EntranceUtils.getRunningEntranceNumber())
logger.info("Finished to refresh consumer group maxAllowRunningJobs")
}
},
EntranceConfiguration.ENTRANCE_GROUP_SCAN_INIT_TIME.getValue,
Expand Down

0 comments on commit dfb7faf

Please sign in to comment.