From fcd35237c0aa38e64e80e243d0c44f700d9f6b77 Mon Sep 17 00:00:00 2001 From: aiceflower Date: Wed, 18 Dec 2024 15:29:46 +0800 Subject: [PATCH] code optimization --- .../linkis/scheduler/util/SchedulerUtils.java | 53 ++++++--- .../conf/SchedulerConfiguration.scala | 5 +- .../queue/PriorityLoopArrayQueue.scala | 105 ++++++++---------- .../ParallelConsumerManager.scala | 4 +- .../queue/PriorityLoopArrayQueueTest.java | 43 +++---- .../scheduler/util/TestSchedulerUtils.scala | 42 ++++++- 6 files changed, 152 insertions(+), 100 deletions(-) diff --git a/linkis-commons/linkis-scheduler/src/main/java/org/apache/linkis/scheduler/util/SchedulerUtils.java b/linkis-commons/linkis-scheduler/src/main/java/org/apache/linkis/scheduler/util/SchedulerUtils.java index 90244dc073..9d37f0f639 100644 --- a/linkis-commons/linkis-scheduler/src/main/java/org/apache/linkis/scheduler/util/SchedulerUtils.java +++ b/linkis-commons/linkis-scheduler/src/main/java/org/apache/linkis/scheduler/util/SchedulerUtils.java @@ -23,8 +23,16 @@ public class SchedulerUtils { private static final String EVENT_ID_SPLIT = "_"; + private static final String ALL_CREATORS = "ALL_CREATORS"; + private static final String SPACIAL_USER_SPLIT = "_v_"; - public static boolean isSupportPriorityUsers(String groupName) { + /** + * support priority queue with config username or creator + * + * @param groupName + * @return + */ + public static boolean isSupportPriority(String groupName) { String users = SchedulerConfiguration.SUPPORT_PRIORITY_TASK_USERS(); if (StringUtils.isEmpty(users)) { return false; @@ -33,25 +41,42 @@ public static boolean isSupportPriorityUsers(String groupName) { if (StringUtils.isEmpty(userName)) { return false; } - return users.contains(userName); - } - - public static String getGroupNameItem(String groupName, int index) { - if (StringUtils.isEmpty(groupName)) { - return ""; + String creators = SchedulerConfiguration.SUPPORT_PRIORITY_TASK_CREATORS(); + if (ALL_CREATORS.equalsIgnoreCase(creators)) { + return users.contains(userName); + } else { + String creatorName = getCreatorFromGroupName(groupName); + return users.contains(userName) && creators.contains(creatorName); } - String[] groupItems = groupName.split(EVENT_ID_SPLIT); - if (index < 0 || index >= groupItems.length) { - return ""; - } - return groupItems[index]; } public static String getUserFromGroupName(String groupName) { - return getGroupNameItem(groupName, 2); + if (groupName.contains(SPACIAL_USER_SPLIT)) { + int vIndex = groupName.lastIndexOf(SPACIAL_USER_SPLIT); + int lastIndex = groupName.lastIndexOf(EVENT_ID_SPLIT); + String user = groupName.substring(vIndex + 1, lastIndex); + return user; + } + String[] groupNames = groupName.split(EVENT_ID_SPLIT); + String user = groupNames[groupNames.length - 2]; + return user; } public static String getEngineTypeFromGroupName(String groupName) { - return getGroupNameItem(groupName, 3); + String[] groupNames = groupName.split(EVENT_ID_SPLIT); + String ecType = groupNames[groupNames.length - 1]; + return ecType; + } + + public static String getCreatorFromGroupName(String groupName) { + if (groupName.contains(SPACIAL_USER_SPLIT)) { + int vIndex = groupName.lastIndexOf(SPACIAL_USER_SPLIT); + String creatorName = groupName.substring(0, vIndex); + return creatorName; + } + int lastIndex = groupName.lastIndexOf(EVENT_ID_SPLIT); + int secondLastIndex = groupName.lastIndexOf(EVENT_ID_SPLIT, lastIndex - 1); + String creatorName = groupName.substring(0, secondLastIndex); + return creatorName; } } diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/conf/SchedulerConfiguration.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/conf/SchedulerConfiguration.scala index cf2d09d19b..24e7d3c4f0 100644 --- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/conf/SchedulerConfiguration.scala +++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/conf/SchedulerConfiguration.scala @@ -43,8 +43,11 @@ object SchedulerConfiguration { val SUPPORT_PRIORITY_TASK_USERS = CommonVars("linkis.fifo.queue.support.priority.users", "").getValue + val SUPPORT_PRIORITY_TASK_CREATORS = + CommonVars("linkis.fifo.queue.support.priority.creators", "ALL_CREATORS").getValue + val MAX_PRIORITY_QUEUE_CACHE_SIZE = - CommonVars("linkis.fifo.priority.queue.max.cache.size", 5).getValue + CommonVars("linkis.fifo.priority.queue.max.cache.size", 1000).getValue val ENGINE_PRIORITY_RUNTIME_KEY = "wds.linkis.engine.runtime.priority" diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/PriorityLoopArrayQueue.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/PriorityLoopArrayQueue.scala index 66675b036f..23bd7be45c 100644 --- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/PriorityLoopArrayQueue.scala +++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/PriorityLoopArrayQueue.scala @@ -17,15 +17,14 @@ package org.apache.linkis.scheduler.queue -import org.apache.linkis.common.utils.Logging +import org.apache.linkis.common.utils.{Logging, Utils} import org.apache.linkis.scheduler.conf.SchedulerConfiguration import java.util -import java.util.Collections import java.util.Comparator -import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.PriorityBlockingQueue import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.locks.ReentrantReadWriteLock /** * 优先级队列元素 @@ -74,15 +73,15 @@ class PriorityLoopArrayQueue(var group: Group) extends ConsumeQueue with Logging private val index = new AtomicInteger /** 记录队列中当前所有元素索引,元素存入优先级队列时添加,从优先级队列移除时删除 */ - private val indexSet = ConcurrentHashMap.newKeySet[Int] + val indexMap = new util.HashMap[Int, Any]() /** 记录已经消费的元素,会有固定缓存大小,默认1000,元素从优先级队列移除时添加 */ - val fixedSizeCollection = Collections.synchronizedMap( + val fixedSizeCollection = new FixedSizeCollection[Integer, Any](SchedulerConfiguration.MAX_PRIORITY_QUEUE_CACHE_SIZE) - ) private val writeLock = new Array[Byte](0) private val readLock = new Array[Byte](0) + private val rwLock = new ReentrantReadWriteLock protected[this] var realSize = size override def isEmpty: Boolean = size <= 0 @@ -96,7 +95,8 @@ class PriorityLoopArrayQueue(var group: Group) extends ConsumeQueue with Logging */ private def addToPriorityQueue(element: PriorityQueueElement): Boolean = { priorityEventQueue.offer(element) - indexSet.add(element.index) + rwLock.writeLock.lock + Utils.tryFinally(indexMap.put(element.index, element))(rwLock.writeLock.unlock()) true } @@ -106,8 +106,11 @@ class PriorityLoopArrayQueue(var group: Group) extends ConsumeQueue with Logging */ private def getAndRemoveTop: PriorityQueueElement = { val top: PriorityQueueElement = priorityEventQueue.take() - indexSet.remove(top.index) - fixedSizeCollection.put(top.index, top.element) + rwLock.writeLock.lock + Utils.tryFinally { + indexMap.remove(top.index) + fixedSizeCollection.put(top.index, top.element) + }(rwLock.writeLock.unlock()) top } @@ -116,14 +119,12 @@ class PriorityLoopArrayQueue(var group: Group) extends ConsumeQueue with Logging } override def getWaitingEvents: Array[SchedulerEvent] = { - priorityEventQueue synchronized { - toIndexedSeq - .filter(x => - x.getState.equals(SchedulerEventState.Inited) || x.getState - .equals(SchedulerEventState.Scheduled) - ) - .toArray - } + toIndexedSeq + .filter(x => + x.getState.equals(SchedulerEventState.Inited) || x.getState + .equals(SchedulerEventState.Scheduled) + ) + .toArray } override def clearAll(): Unit = priorityEventQueue synchronized { @@ -131,14 +132,12 @@ class PriorityLoopArrayQueue(var group: Group) extends ConsumeQueue with Logging index.set(0) priorityEventQueue.clear() fixedSizeCollection.clear() - indexSet.clear() + indexMap.clear() } override def get(event: SchedulerEvent): Option[SchedulerEvent] = { - priorityEventQueue synchronized { - val eventSeq = toIndexedSeq.filter(x => x.getId.equals(event.getId)).seq - if (eventSeq.size > 0) Some(eventSeq(0)) else None - } + val eventSeq = toIndexedSeq.filter(x => x.getId.equals(event.getId)).seq + if (eventSeq.size > 0) Some(eventSeq(0)) else None } /** @@ -147,20 +146,23 @@ class PriorityLoopArrayQueue(var group: Group) extends ConsumeQueue with Logging * @return */ override def get(index: Int): Option[SchedulerEvent] = { - var event: SchedulerEvent = null - priorityEventQueue synchronized { - if (!indexSet.contains(index) && !fixedSizeCollection.containsKey(index)) { - throw new IllegalArgumentException( - "The index " + index + " has already been deleted, now index must be better than " + index - ) - } - event = fixedSizeCollection.get(index).asInstanceOf[SchedulerEvent] + if (!indexMap.containsKey(index) && !fixedSizeCollection.containsKey(index)) { + throw new IllegalArgumentException( + "The index " + index + " has already been deleted, now index must be better than " + index + ) + } + rwLock.readLock().lock() + Utils.tryFinally { + var event: SchedulerEvent = fixedSizeCollection.get(index).asInstanceOf[SchedulerEvent] if (event == null) { - val eventSeq = toIndexedSeq.filter(x => x.getIndex.equals(index)).seq - if (eventSeq.size > 0) event = eventSeq(0) + event = indexMap + .get(index) + .asInstanceOf[PriorityQueueElement] + .element + .asInstanceOf[SchedulerEvent] } - } - Option(event) + Option(event) + }(rwLock.readLock().unlock()) } override def getGroup: Group = group @@ -172,20 +174,16 @@ class PriorityLoopArrayQueue(var group: Group) extends ConsumeQueue with Logging def toIndexedSeq: IndexedSeq[SchedulerEvent] = if (size == 0) { IndexedSeq.empty[SchedulerEvent] } else { - priorityEventQueue synchronized { - priorityEventQueue - .toArray() - .map(_.asInstanceOf[PriorityQueueElement].element.asInstanceOf[SchedulerEvent]) - .toIndexedSeq - } + priorityEventQueue + .toArray() + .map(_.asInstanceOf[PriorityQueueElement].element.asInstanceOf[SchedulerEvent]) + .toIndexedSeq } def add(event: SchedulerEvent): Int = { - priorityEventQueue synchronized { - // 每次添加的时候需要给计数器+1,优先级相同时,控制先进先出 - event.setIndex(index.addAndGet(1)) - addToPriorityQueue(PriorityQueueElement(event, event.getPriority, event.getIndex)) - } + // 每次添加的时候需要给计数器+1,优先级相同时,控制先进先出 + event.setIndex(index.addAndGet(1)) + addToPriorityQueue(PriorityQueueElement(event, event.getPriority, event.getIndex)) event.getIndex } @@ -232,11 +230,7 @@ class PriorityLoopArrayQueue(var group: Group) extends ConsumeQueue with Logging * @return */ override def take(): SchedulerEvent = { - val t: Option[SchedulerEvent] = readLock synchronized { - Option(getAndRemoveTop.element.asInstanceOf[SchedulerEvent]) - } - writeLock synchronized { writeLock.notify() } - t.get + getAndRemoveTop.element.asInstanceOf[SchedulerEvent] } /** @@ -264,15 +258,8 @@ class PriorityLoopArrayQueue(var group: Group) extends ConsumeQueue with Logging * @return */ override def poll(): Option[SchedulerEvent] = { - val event: Option[SchedulerEvent] = readLock synchronized { - val t: Option[SchedulerEvent] = Option(getAndRemoveTop.element.asInstanceOf[SchedulerEvent]) - if (t == null) { - logger.info("null, notice...") - } - t - } - writeLock synchronized { writeLock.notify() } - event + if (waitingSize == 0) None + else Option(getAndRemoveTop.element.asInstanceOf[SchedulerEvent]) } /** diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala index f0de8e5cba..b079f12006 100644 --- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala +++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala @@ -26,7 +26,7 @@ import org.apache.linkis.scheduler.conf.SchedulerConfiguration.{ import org.apache.linkis.scheduler.listener.ConsumerListener import org.apache.linkis.scheduler.queue._ import org.apache.linkis.scheduler.queue.fifoqueue.FIFOUserConsumer -import org.apache.linkis.scheduler.util.SchedulerUtils.isSupportPriorityUsers +import org.apache.linkis.scheduler.util.SchedulerUtils.isSupportPriority import java.util.concurrent.{ExecutorService, TimeUnit} @@ -121,7 +121,7 @@ class ParallelConsumerManager(maxParallelismUsers: Int, schedulerName: String) val consumerQueue: ConsumeQueue = if ( PFIFO_SCHEDULER_STRATEGY - .equals(fifoQueueStrategy) && isSupportPriorityUsers(groupName) + .equals(fifoQueueStrategy) && isSupportPriority(groupName) ) { new PriorityLoopArrayQueue(group) } else new LoopArrayQueue(group) diff --git a/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/PriorityLoopArrayQueueTest.java b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/PriorityLoopArrayQueueTest.java index 4f78576456..147484f1b3 100644 --- a/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/PriorityLoopArrayQueueTest.java +++ b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/PriorityLoopArrayQueueTest.java @@ -28,23 +28,21 @@ import java.util.concurrent.atomic.AtomicInteger; class PriorityLoopArrayQueueTest { - private volatile AtomicInteger ac = new AtomicInteger(); + AtomicInteger productCounter = new AtomicInteger(); + AtomicInteger consumerCounter = new AtomicInteger(); @Test public void testConcurrentPutAndTake() throws Exception { AtomicInteger counter = new AtomicInteger(); - FIFOGroup group = new FIFOGroup("test", 100, 100); + FIFOGroup group = new FIFOGroup("test", 5000, 5000); PriorityLoopArrayQueue queue = new PriorityLoopArrayQueue(group); - final long time = System.currentTimeMillis(); // 获取开始时间的毫秒数 long startTime = System.currentTimeMillis(); // 三分钟的毫秒数 - long threeMinutesInMillis = 1 * 60 * 1000; - - - int genLen = 2; - int getLen = 1; + long threeMinutesInMillis = 1 * 30 * 1000; + int genLen = 5; + int getLen = 7; final CountDownLatch latch = new CountDownLatch(genLen + getLen + 1); // 5 个生产者 for (int i = 0; i < genLen; i++) { @@ -61,7 +59,7 @@ public void testConcurrentPutAndTake() throws Exception { while ((System.currentTimeMillis() - startTime) < threeMinutesInMillis) { //生产 try { - Thread.sleep(1000); + Thread.sleep(getRandom(200)); product(counter, queue); product(counter, queue); } catch (InterruptedException e) { @@ -69,8 +67,8 @@ public void testConcurrentPutAndTake() throws Exception { } //消费 //consume(queue); - } + System.out.println(Thread.currentThread().getName() + "结束生产:"); }, "生产t-" + i).start(); } // 5 个消费者 @@ -78,7 +76,7 @@ public void testConcurrentPutAndTake() throws Exception { final int id = i; new Thread(() -> { try{ - Thread.sleep(500 * id); + Thread.sleep(getRandom(200)); latch.countDown(); latch.await(); } catch (InterruptedException e){ @@ -86,15 +84,13 @@ public void testConcurrentPutAndTake() throws Exception { } System.out.println(Thread.currentThread().getName() + "开始消费:"); while (true) { - try { - Thread.sleep(1000); + Thread.sleep(getRandom(200)); //消费 consume(queue); } catch (InterruptedException e) { throw new RuntimeException(e); } - } }, "消费t-" + i).start(); @@ -116,7 +112,11 @@ public void testConcurrentPutAndTake() throws Exception { } IndexedSeq schedulerEventIndexedSeq = queue.toIndexedSeq(); //Object[] objects = queue.toArray(); + System.out.println("生产大小:" + productCounter.get()); + System.out.println("消费大小:" + consumerCounter.get()); System.out.println("队列当前大小:" + queue.size()); + System.out.println("index size: " + queue.indexMap().size()); + System.out.println("cache size: " + queue.fixedSizeCollection().size()); //Iterator it = schedulerEventIndexedSeq.iterator(); // while (it.hasNext()) { // SchedulerEvent event = it.next(); @@ -124,7 +124,9 @@ public void testConcurrentPutAndTake() throws Exception { // } } }).start(); - Thread.sleep(threeMinutesInMillis * 3); + Thread.sleep(threeMinutesInMillis * 2); + System.out.println("product:" + productCounter.get() + ", consumer: " + consumerCounter.get()); + Assertions.assertEquals(productCounter.get(), consumerCounter.get()); } //消费 @@ -132,6 +134,7 @@ private void consume(PriorityLoopArrayQueue queue) { SchedulerEvent take = null; try { take = queue.take(); + consumerCounter.addAndGet(1); } catch (Exception e) { throw new RuntimeException(e); } @@ -146,15 +149,17 @@ private void product(AtomicInteger counter, PriorityLoopArrayQueue queue) { System.out.println("生产:" + name); Option offer = queue.offer(getJob(name, priority)); if (offer.nonEmpty()) { - System.out.println(offer); + productCounter.addAndGet(1); + Option schedulerEventOption = queue.get((int) offer.get()); + printEvent("get:", schedulerEventOption.get()); } else { System.out.println("当前队列已满,大小:" + queue.size()); } - - //Option schedulerEventOption = queue.get((int) offer.get()); - //printEvent("生产-get:" + offer.get(), schedulerEventOption.get()); } + @Test + void testFinally() { + } @Test void enqueue() { // 压测 offer take get diff --git a/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/util/TestSchedulerUtils.scala b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/util/TestSchedulerUtils.scala index 568c59b3f2..6443c645ef 100644 --- a/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/util/TestSchedulerUtils.scala +++ b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/util/TestSchedulerUtils.scala @@ -17,16 +17,48 @@ package org.apache.linkis.scheduler.util -import org.apache.linkis.scheduler.util.SchedulerUtils.{getEngineTypeFromGroupName, getUserFromGroupName} +import org.apache.linkis.scheduler.util.SchedulerUtils.{ + getCreatorFromGroupName, + getEngineTypeFromGroupName, + getUserFromGroupName +} + import org.junit.jupiter.api.{Assertions, Test} class TestSchedulerUtils { + @Test def testShellDangerCode: Unit = { - val groupName = "exec_id018033linkis-cg-entrancegz.xg.bdplinkis110002.webank:9104APPName_hadoop_spark_0, taskId: 2392611" - val username: String = getUserFromGroupName(groupName) - val engineType: String = getEngineTypeFromGroupName(groupName) + var groupName = "IDE_hadoop_hive" + var username: String = getUserFromGroupName(groupName) + var engineType: String = getEngineTypeFromGroupName(groupName) + var creator: String = getCreatorFromGroupName(groupName) Assertions.assertEquals("hadoop", username) - Assertions.assertEquals("spark", engineType) + Assertions.assertEquals("hive", engineType) + Assertions.assertEquals("IDE", creator) + groupName = "APP_TEST_v_hadoop_hive" + username = getUserFromGroupName(groupName) + engineType = getEngineTypeFromGroupName(groupName) + creator = getCreatorFromGroupName(groupName) + Assertions.assertEquals("v_hadoop", username) + Assertions.assertEquals("hive", engineType) + Assertions.assertEquals("APP_TEST", creator) + + groupName = "TEST_v_hadoop_hive" + username = getUserFromGroupName(groupName) + engineType = getEngineTypeFromGroupName(groupName) + creator = getCreatorFromGroupName(groupName) + Assertions.assertEquals("v_hadoop", username) + Assertions.assertEquals("hive", engineType) + Assertions.assertEquals("TEST", creator) + + groupName = "APP_TEST_hadoop_hive" + username = getUserFromGroupName(groupName) + engineType = getEngineTypeFromGroupName(groupName) + creator = getCreatorFromGroupName(groupName) + Assertions.assertEquals("hadoop", username) + Assertions.assertEquals("hive", engineType) + Assertions.assertEquals("APP_TEST", creator) } + }