diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 3241a318715d56..e91ff7febe7197 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -659,7 +659,6 @@ public Env(boolean isCheckpointCatalog) { this.labelProcessor = new LabelProcessor(); this.transientTaskManager = new TransientTaskManager(); this.exportTaskRegister = new ExportTaskRegister(transientTaskManager); - this.transientTaskManager = new TransientTaskManager(); this.replayedJournalId = new AtomicLong(0L); this.stmtIdCounter = new AtomicLong(0L); @@ -1631,6 +1630,8 @@ protected void startMasterOnlyDaemonThreads() { // Start txn cleaner txnCleaner.start(); jobManager.start(); + // transient task manager + transientTaskManager.start(); // Alter getAlterInstance().start(); // Consistency checker diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java index 6aaaf4004fb791..35c6227bbee563 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java @@ -57,6 +57,10 @@ public class InternalSchemaInitializer extends Thread { private static final Logger LOG = LogManager.getLogger(InternalSchemaInitializer.class); + public InternalSchemaInitializer() { + super("InternalSchemaInitializer"); + } + public void run() { if (!FeConstants.enableInternalSchemaDb) { return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 485763fdff24a3..6684fe6e71c494 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -122,6 +122,7 @@ private enum ReportType { } public ReportHandler() { + super("report-thread"); GaugeMetric gauge = new GaugeMetric( "report_queue_size", MetricUnit.NOUNIT, "report queue size") { @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java index 967efd26e654be..08de0ce338a61f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java @@ -88,15 +88,14 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { public void startUpdateThread() { WorkloadGroupMgr wgMgr = this; - updatePropThread = new Thread(new Runnable() { - public void run() { - while (true) { - try { - wgMgr.resetQueryQueueProp(); - Thread.sleep(Config.query_queue_update_interval_ms); - } catch (Throwable e) { - LOG.warn("reset query queue failed ", e); - } + updatePropThread = new Thread(() -> { + Thread.currentThread().setName("reset-query-queue-prop"); + while (true) { + try { + wgMgr.resetQueryQueueProp(); + Thread.sleep(Config.query_queue_update_interval_ms); + } catch (Throwable e) { + LOG.warn("reset query queue failed ", e); } } }); diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java index dc065360d09f62..57df84a0e89146 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java @@ -18,8 +18,8 @@ package org.apache.doris.scheduler.disruptor; import org.apache.doris.common.Config; +import org.apache.doris.common.CustomThreadFactory; import org.apache.doris.scheduler.constants.TaskType; -import org.apache.doris.scheduler.manager.TransientTaskManager; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.EventTranslatorThreeArg; @@ -27,11 +27,9 @@ import com.lmax.disruptor.WorkHandler; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; -import com.lmax.disruptor.util.DaemonThreadFactory; -import lombok.extern.slf4j.Slf4j; +import lombok.extern.log4j.Log4j2; import java.io.Closeable; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; /** @@ -43,11 +41,10 @@ * *

The work handler also handles system events by scheduling batch scheduler tasks. */ -@Slf4j +@Log4j2 public class TaskDisruptor implements Closeable { private Disruptor disruptor; - private TransientTaskManager transientTaskManager; private static final int DEFAULT_RING_BUFFER_SIZE = Config.async_task_queen_size; private static final int consumerThreadCount = Config.async_task_consumer_thread_num; @@ -74,17 +71,13 @@ public class TaskDisruptor implements Closeable { event.setTaskType(taskType); }; - public TaskDisruptor(TransientTaskManager transientTaskManager) { - this.transientTaskManager = transientTaskManager; - } - public void start() { - ThreadFactory producerThreadFactory = DaemonThreadFactory.INSTANCE; - disruptor = new Disruptor<>(TaskEvent.FACTORY, DEFAULT_RING_BUFFER_SIZE, producerThreadFactory, - ProducerType.MULTI, new BlockingWaitStrategy()); + CustomThreadFactory exportTaskThreadFactory = new CustomThreadFactory("export-task-consumer"); + disruptor = new Disruptor<>(TaskEvent.FACTORY, DEFAULT_RING_BUFFER_SIZE, exportTaskThreadFactory, + ProducerType.SINGLE, new BlockingWaitStrategy()); WorkHandler[] workers = new TaskHandler[consumerThreadCount]; for (int i = 0; i < consumerThreadCount; i++) { - workers[i] = new TaskHandler(transientTaskManager); + workers[i] = new TaskHandler(); } disruptor.handleEventsWithWorkerPool(workers); disruptor.start(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java index 6a0b9f92c5fa02..de889c1b2e49d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java @@ -17,12 +17,13 @@ package org.apache.doris.scheduler.disruptor; +import org.apache.doris.catalog.Env; import org.apache.doris.scheduler.exception.JobException; import org.apache.doris.scheduler.executor.TransientTaskExecutor; import org.apache.doris.scheduler.manager.TransientTaskManager; import com.lmax.disruptor.WorkHandler; -import lombok.extern.slf4j.Slf4j; +import lombok.extern.log4j.Log4j2; /** * This class represents a work handler for processing event tasks consumed by a Disruptor. @@ -31,16 +32,10 @@ * If the event job execution fails, the work handler logs an error message and pauses the event job. * The work handler also handles system events by scheduling batch scheduler tasks. */ -@Slf4j +@Log4j2 public class TaskHandler implements WorkHandler { - private TransientTaskManager transientTaskManager; - - public TaskHandler(TransientTaskManager transientTaskManager) { - this.transientTaskManager = transientTaskManager; - } - /** * Processes an event task by retrieving the associated event job and executing it if it is running. * If the event job is not running, it logs an error message. @@ -62,6 +57,7 @@ public void onEvent(TaskEvent event) { public void onTransientTaskHandle(TaskEvent taskEvent) { Long taskId = taskEvent.getId(); + TransientTaskManager transientTaskManager = Env.getCurrentEnv().getTransientTaskManager(); TransientTaskExecutor taskExecutor = transientTaskManager.getMemoryTaskExecutor(taskId); if (taskExecutor == null) { log.info("Memory task executor is null, task id: {}", taskId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java index 5f94fb5d998cfc..51edd4af318bb0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java @@ -42,7 +42,10 @@ public class TransientTaskManager { private TaskDisruptor disruptor; public TransientTaskManager() { - disruptor = new TaskDisruptor(this); + disruptor = new TaskDisruptor(); + } + + public void start() { disruptor.start(); }