From 91be50bb08cb811bd69af3f9f1c10ab621388555 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Mon, 25 Mar 2024 07:29:38 +0800 Subject: [PATCH] [Fix](TransientTask)Export tasks should only be run on the master node (#32700) * [Fix](TransientTask)Export tasks should only be run on the master node Add thread name Export Task runs only on the master node, so it is necessary to explicitly start the corresponding resources. At the same time, refactor some code to avoid circular dependencies. * TransientTaskManager is initialized twice. Therefore, the second initialization needs to be deleted. --- .../java/org/apache/doris/catalog/Env.java | 3 ++- .../catalog/InternalSchemaInitializer.java | 4 ++++ .../apache/doris/master/ReportHandler.java | 1 + .../workloadgroup/WorkloadGroupMgr.java | 17 +++++++-------- .../scheduler/disruptor/TaskDisruptor.java | 21 +++++++------------ .../scheduler/disruptor/TaskHandler.java | 12 ++++------- .../manager/TransientTaskManager.java | 5 ++++- 7 files changed, 30 insertions(+), 33 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 3241a318715d56..e91ff7febe7197 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -659,7 +659,6 @@ public Env(boolean isCheckpointCatalog) { this.labelProcessor = new LabelProcessor(); this.transientTaskManager = new TransientTaskManager(); this.exportTaskRegister = new ExportTaskRegister(transientTaskManager); - this.transientTaskManager = new TransientTaskManager(); this.replayedJournalId = new AtomicLong(0L); this.stmtIdCounter = new AtomicLong(0L); @@ -1631,6 +1630,8 @@ protected void startMasterOnlyDaemonThreads() { // Start txn cleaner txnCleaner.start(); jobManager.start(); + // transient task manager + transientTaskManager.start(); // Alter getAlterInstance().start(); // Consistency checker diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java index 6aaaf4004fb791..35c6227bbee563 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java @@ -57,6 +57,10 @@ public class InternalSchemaInitializer extends Thread { private static final Logger LOG = LogManager.getLogger(InternalSchemaInitializer.class); + public InternalSchemaInitializer() { + super("InternalSchemaInitializer"); + } + public void run() { if (!FeConstants.enableInternalSchemaDb) { return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 485763fdff24a3..6684fe6e71c494 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -122,6 +122,7 @@ private enum ReportType { } public ReportHandler() { + super("report-thread"); GaugeMetric gauge = new GaugeMetric( "report_queue_size", MetricUnit.NOUNIT, "report queue size") { @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java index 967efd26e654be..08de0ce338a61f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java @@ -88,15 +88,14 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { public void startUpdateThread() { WorkloadGroupMgr wgMgr = this; - updatePropThread = new Thread(new Runnable() { - public void run() { - while (true) { - try { - wgMgr.resetQueryQueueProp(); - Thread.sleep(Config.query_queue_update_interval_ms); - } catch (Throwable e) { - LOG.warn("reset query queue failed ", e); - } + updatePropThread = new Thread(() -> { + Thread.currentThread().setName("reset-query-queue-prop"); + while (true) { + try { + wgMgr.resetQueryQueueProp(); + Thread.sleep(Config.query_queue_update_interval_ms); + } catch (Throwable e) { + LOG.warn("reset query queue failed ", e); } } }); diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java index dc065360d09f62..57df84a0e89146 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java @@ -18,8 +18,8 @@ package org.apache.doris.scheduler.disruptor; import org.apache.doris.common.Config; +import org.apache.doris.common.CustomThreadFactory; import org.apache.doris.scheduler.constants.TaskType; -import org.apache.doris.scheduler.manager.TransientTaskManager; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.EventTranslatorThreeArg; @@ -27,11 +27,9 @@ import com.lmax.disruptor.WorkHandler; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; -import com.lmax.disruptor.util.DaemonThreadFactory; -import lombok.extern.slf4j.Slf4j; +import lombok.extern.log4j.Log4j2; import java.io.Closeable; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; /** @@ -43,11 +41,10 @@ * *

The work handler also handles system events by scheduling batch scheduler tasks. */ -@Slf4j +@Log4j2 public class TaskDisruptor implements Closeable { private Disruptor disruptor; - private TransientTaskManager transientTaskManager; private static final int DEFAULT_RING_BUFFER_SIZE = Config.async_task_queen_size; private static final int consumerThreadCount = Config.async_task_consumer_thread_num; @@ -74,17 +71,13 @@ public class TaskDisruptor implements Closeable { event.setTaskType(taskType); }; - public TaskDisruptor(TransientTaskManager transientTaskManager) { - this.transientTaskManager = transientTaskManager; - } - public void start() { - ThreadFactory producerThreadFactory = DaemonThreadFactory.INSTANCE; - disruptor = new Disruptor<>(TaskEvent.FACTORY, DEFAULT_RING_BUFFER_SIZE, producerThreadFactory, - ProducerType.MULTI, new BlockingWaitStrategy()); + CustomThreadFactory exportTaskThreadFactory = new CustomThreadFactory("export-task-consumer"); + disruptor = new Disruptor<>(TaskEvent.FACTORY, DEFAULT_RING_BUFFER_SIZE, exportTaskThreadFactory, + ProducerType.SINGLE, new BlockingWaitStrategy()); WorkHandler[] workers = new TaskHandler[consumerThreadCount]; for (int i = 0; i < consumerThreadCount; i++) { - workers[i] = new TaskHandler(transientTaskManager); + workers[i] = new TaskHandler(); } disruptor.handleEventsWithWorkerPool(workers); disruptor.start(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java index 6a0b9f92c5fa02..de889c1b2e49d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java @@ -17,12 +17,13 @@ package org.apache.doris.scheduler.disruptor; +import org.apache.doris.catalog.Env; import org.apache.doris.scheduler.exception.JobException; import org.apache.doris.scheduler.executor.TransientTaskExecutor; import org.apache.doris.scheduler.manager.TransientTaskManager; import com.lmax.disruptor.WorkHandler; -import lombok.extern.slf4j.Slf4j; +import lombok.extern.log4j.Log4j2; /** * This class represents a work handler for processing event tasks consumed by a Disruptor. @@ -31,16 +32,10 @@ * If the event job execution fails, the work handler logs an error message and pauses the event job. * The work handler also handles system events by scheduling batch scheduler tasks. */ -@Slf4j +@Log4j2 public class TaskHandler implements WorkHandler { - private TransientTaskManager transientTaskManager; - - public TaskHandler(TransientTaskManager transientTaskManager) { - this.transientTaskManager = transientTaskManager; - } - /** * Processes an event task by retrieving the associated event job and executing it if it is running. * If the event job is not running, it logs an error message. @@ -62,6 +57,7 @@ public void onEvent(TaskEvent event) { public void onTransientTaskHandle(TaskEvent taskEvent) { Long taskId = taskEvent.getId(); + TransientTaskManager transientTaskManager = Env.getCurrentEnv().getTransientTaskManager(); TransientTaskExecutor taskExecutor = transientTaskManager.getMemoryTaskExecutor(taskId); if (taskExecutor == null) { log.info("Memory task executor is null, task id: {}", taskId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java index 5f94fb5d998cfc..51edd4af318bb0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java @@ -42,7 +42,10 @@ public class TransientTaskManager { private TaskDisruptor disruptor; public TransientTaskManager() { - disruptor = new TaskDisruptor(this); + disruptor = new TaskDisruptor(); + } + + public void start() { disruptor.start(); }