Skip to content

Commit

Permalink
code optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
aiceflower committed Dec 18, 2024
1 parent 635643f commit fcd3523
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,16 @@

public class SchedulerUtils {
private static final String EVENT_ID_SPLIT = "_";
private static final String ALL_CREATORS = "ALL_CREATORS";
private static final String SPACIAL_USER_SPLIT = "_v_";

public static boolean isSupportPriorityUsers(String groupName) {
/**
* support priority queue with config username or creator
*
* @param groupName
* @return
*/
public static boolean isSupportPriority(String groupName) {
String users = SchedulerConfiguration.SUPPORT_PRIORITY_TASK_USERS();
if (StringUtils.isEmpty(users)) {
return false;
Expand All @@ -33,25 +41,42 @@ public static boolean isSupportPriorityUsers(String groupName) {
if (StringUtils.isEmpty(userName)) {
return false;
}
return users.contains(userName);
}

public static String getGroupNameItem(String groupName, int index) {
if (StringUtils.isEmpty(groupName)) {
return "";
String creators = SchedulerConfiguration.SUPPORT_PRIORITY_TASK_CREATORS();
if (ALL_CREATORS.equalsIgnoreCase(creators)) {
return users.contains(userName);
} else {
String creatorName = getCreatorFromGroupName(groupName);
return users.contains(userName) && creators.contains(creatorName);
}
String[] groupItems = groupName.split(EVENT_ID_SPLIT);
if (index < 0 || index >= groupItems.length) {
return "";
}
return groupItems[index];
}

public static String getUserFromGroupName(String groupName) {
return getGroupNameItem(groupName, 2);
if (groupName.contains(SPACIAL_USER_SPLIT)) {
int vIndex = groupName.lastIndexOf(SPACIAL_USER_SPLIT);
int lastIndex = groupName.lastIndexOf(EVENT_ID_SPLIT);
String user = groupName.substring(vIndex + 1, lastIndex);
return user;
}
String[] groupNames = groupName.split(EVENT_ID_SPLIT);
String user = groupNames[groupNames.length - 2];
return user;
}

public static String getEngineTypeFromGroupName(String groupName) {
return getGroupNameItem(groupName, 3);
String[] groupNames = groupName.split(EVENT_ID_SPLIT);
String ecType = groupNames[groupNames.length - 1];
return ecType;
}

public static String getCreatorFromGroupName(String groupName) {
if (groupName.contains(SPACIAL_USER_SPLIT)) {
int vIndex = groupName.lastIndexOf(SPACIAL_USER_SPLIT);
String creatorName = groupName.substring(0, vIndex);
return creatorName;
}
int lastIndex = groupName.lastIndexOf(EVENT_ID_SPLIT);
int secondLastIndex = groupName.lastIndexOf(EVENT_ID_SPLIT, lastIndex - 1);
String creatorName = groupName.substring(0, secondLastIndex);
return creatorName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,11 @@ object SchedulerConfiguration {
val SUPPORT_PRIORITY_TASK_USERS =
CommonVars("linkis.fifo.queue.support.priority.users", "").getValue

val SUPPORT_PRIORITY_TASK_CREATORS =
CommonVars("linkis.fifo.queue.support.priority.creators", "ALL_CREATORS").getValue

val MAX_PRIORITY_QUEUE_CACHE_SIZE =
CommonVars("linkis.fifo.priority.queue.max.cache.size", 5).getValue
CommonVars("linkis.fifo.priority.queue.max.cache.size", 1000).getValue

val ENGINE_PRIORITY_RUNTIME_KEY = "wds.linkis.engine.runtime.priority"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@

package org.apache.linkis.scheduler.queue

import org.apache.linkis.common.utils.Logging
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.scheduler.conf.SchedulerConfiguration

import java.util
import java.util.Collections
import java.util.Comparator
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.PriorityBlockingQueue
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantReadWriteLock

/**
* 优先级队列元素
Expand Down Expand Up @@ -74,15 +73,15 @@ class PriorityLoopArrayQueue(var group: Group) extends ConsumeQueue with Logging
private val index = new AtomicInteger

/** 记录队列中当前所有元素索引,元素存入优先级队列时添加,从优先级队列移除时删除 */
private val indexSet = ConcurrentHashMap.newKeySet[Int]
val indexMap = new util.HashMap[Int, Any]()

/** 记录已经消费的元素,会有固定缓存大小,默认1000,元素从优先级队列移除时添加 */
val fixedSizeCollection = Collections.synchronizedMap(
val fixedSizeCollection =
new FixedSizeCollection[Integer, Any](SchedulerConfiguration.MAX_PRIORITY_QUEUE_CACHE_SIZE)
)

private val writeLock = new Array[Byte](0)
private val readLock = new Array[Byte](0)
private val rwLock = new ReentrantReadWriteLock

protected[this] var realSize = size
override def isEmpty: Boolean = size <= 0
Expand All @@ -96,7 +95,8 @@ class PriorityLoopArrayQueue(var group: Group) extends ConsumeQueue with Logging
*/
private def addToPriorityQueue(element: PriorityQueueElement): Boolean = {
priorityEventQueue.offer(element)
indexSet.add(element.index)
rwLock.writeLock.lock
Utils.tryFinally(indexMap.put(element.index, element))(rwLock.writeLock.unlock())
true
}

Expand All @@ -106,8 +106,11 @@ class PriorityLoopArrayQueue(var group: Group) extends ConsumeQueue with Logging
*/
private def getAndRemoveTop: PriorityQueueElement = {
val top: PriorityQueueElement = priorityEventQueue.take()
indexSet.remove(top.index)
fixedSizeCollection.put(top.index, top.element)
rwLock.writeLock.lock
Utils.tryFinally {
indexMap.remove(top.index)
fixedSizeCollection.put(top.index, top.element)
}(rwLock.writeLock.unlock())
top
}

Expand All @@ -116,29 +119,25 @@ class PriorityLoopArrayQueue(var group: Group) extends ConsumeQueue with Logging
}

override def getWaitingEvents: Array[SchedulerEvent] = {
priorityEventQueue synchronized {
toIndexedSeq
.filter(x =>
x.getState.equals(SchedulerEventState.Inited) || x.getState
.equals(SchedulerEventState.Scheduled)
)
.toArray
}
toIndexedSeq
.filter(x =>
x.getState.equals(SchedulerEventState.Inited) || x.getState
.equals(SchedulerEventState.Scheduled)
)
.toArray
}

override def clearAll(): Unit = priorityEventQueue synchronized {
realSize = 0
index.set(0)
priorityEventQueue.clear()
fixedSizeCollection.clear()
indexSet.clear()
indexMap.clear()
}

override def get(event: SchedulerEvent): Option[SchedulerEvent] = {
priorityEventQueue synchronized {
val eventSeq = toIndexedSeq.filter(x => x.getId.equals(event.getId)).seq
if (eventSeq.size > 0) Some(eventSeq(0)) else None
}
val eventSeq = toIndexedSeq.filter(x => x.getId.equals(event.getId)).seq
if (eventSeq.size > 0) Some(eventSeq(0)) else None
}

/**
Expand All @@ -147,20 +146,23 @@ class PriorityLoopArrayQueue(var group: Group) extends ConsumeQueue with Logging
* @return
*/
override def get(index: Int): Option[SchedulerEvent] = {
var event: SchedulerEvent = null
priorityEventQueue synchronized {
if (!indexSet.contains(index) && !fixedSizeCollection.containsKey(index)) {
throw new IllegalArgumentException(
"The index " + index + " has already been deleted, now index must be better than " + index
)
}
event = fixedSizeCollection.get(index).asInstanceOf[SchedulerEvent]
if (!indexMap.containsKey(index) && !fixedSizeCollection.containsKey(index)) {
throw new IllegalArgumentException(
"The index " + index + " has already been deleted, now index must be better than " + index
)
}
rwLock.readLock().lock()
Utils.tryFinally {
var event: SchedulerEvent = fixedSizeCollection.get(index).asInstanceOf[SchedulerEvent]
if (event == null) {
val eventSeq = toIndexedSeq.filter(x => x.getIndex.equals(index)).seq
if (eventSeq.size > 0) event = eventSeq(0)
event = indexMap
.get(index)
.asInstanceOf[PriorityQueueElement]
.element
.asInstanceOf[SchedulerEvent]
}
}
Option(event)
Option(event)
}(rwLock.readLock().unlock())
}

override def getGroup: Group = group
Expand All @@ -172,20 +174,16 @@ class PriorityLoopArrayQueue(var group: Group) extends ConsumeQueue with Logging
def toIndexedSeq: IndexedSeq[SchedulerEvent] = if (size == 0) {
IndexedSeq.empty[SchedulerEvent]
} else {
priorityEventQueue synchronized {
priorityEventQueue
.toArray()
.map(_.asInstanceOf[PriorityQueueElement].element.asInstanceOf[SchedulerEvent])
.toIndexedSeq
}
priorityEventQueue
.toArray()
.map(_.asInstanceOf[PriorityQueueElement].element.asInstanceOf[SchedulerEvent])
.toIndexedSeq
}

def add(event: SchedulerEvent): Int = {
priorityEventQueue synchronized {
// 每次添加的时候需要给计数器+1,优先级相同时,控制先进先出
event.setIndex(index.addAndGet(1))
addToPriorityQueue(PriorityQueueElement(event, event.getPriority, event.getIndex))
}
// 每次添加的时候需要给计数器+1,优先级相同时,控制先进先出
event.setIndex(index.addAndGet(1))
addToPriorityQueue(PriorityQueueElement(event, event.getPriority, event.getIndex))
event.getIndex
}

Expand Down Expand Up @@ -232,11 +230,7 @@ class PriorityLoopArrayQueue(var group: Group) extends ConsumeQueue with Logging
* @return
*/
override def take(): SchedulerEvent = {
val t: Option[SchedulerEvent] = readLock synchronized {
Option(getAndRemoveTop.element.asInstanceOf[SchedulerEvent])
}
writeLock synchronized { writeLock.notify() }
t.get
getAndRemoveTop.element.asInstanceOf[SchedulerEvent]
}

/**
Expand Down Expand Up @@ -264,15 +258,8 @@ class PriorityLoopArrayQueue(var group: Group) extends ConsumeQueue with Logging
* @return
*/
override def poll(): Option[SchedulerEvent] = {
val event: Option[SchedulerEvent] = readLock synchronized {
val t: Option[SchedulerEvent] = Option(getAndRemoveTop.element.asInstanceOf[SchedulerEvent])
if (t == null) {
logger.info("null, notice...")
}
t
}
writeLock synchronized { writeLock.notify() }
event
if (waitingSize == 0) None
else Option(getAndRemoveTop.element.asInstanceOf[SchedulerEvent])
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.linkis.scheduler.conf.SchedulerConfiguration.{
import org.apache.linkis.scheduler.listener.ConsumerListener
import org.apache.linkis.scheduler.queue._
import org.apache.linkis.scheduler.queue.fifoqueue.FIFOUserConsumer
import org.apache.linkis.scheduler.util.SchedulerUtils.isSupportPriorityUsers
import org.apache.linkis.scheduler.util.SchedulerUtils.isSupportPriority

import java.util.concurrent.{ExecutorService, TimeUnit}

Expand Down Expand Up @@ -121,7 +121,7 @@ class ParallelConsumerManager(maxParallelismUsers: Int, schedulerName: String)
val consumerQueue: ConsumeQueue =
if (
PFIFO_SCHEDULER_STRATEGY
.equals(fifoQueueStrategy) && isSupportPriorityUsers(groupName)
.equals(fifoQueueStrategy) && isSupportPriority(groupName)
) {
new PriorityLoopArrayQueue(group)
} else new LoopArrayQueue(group)
Expand Down
Loading

0 comments on commit fcd3523

Please sign in to comment.