diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/ExecuteTaskEvent.java b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/ExecuteTaskEvent.java index 3d8f9ed15349ae..ac67715d2ca18e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/ExecuteTaskEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/ExecuteTaskEvent.java @@ -34,4 +34,9 @@ public static EventFactory> factory return ExecuteTaskEvent::new; } + public void clear() { + this.task = null; + this.jobConfig = null; + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java index 2b2e3df0418dd3..9fb9d94e8df70f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java @@ -19,7 +19,6 @@ import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventTranslatorVararg; -import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.WorkHandler; import com.lmax.disruptor.dsl.Disruptor; @@ -28,6 +27,7 @@ import org.apache.logging.log4j.Logger; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; /** * Utility class for creating and managing a Disruptor instance. @@ -73,20 +73,42 @@ public void start() { */ public boolean publishEvent(Object... args) { try { - RingBuffer ringBuffer = disruptor.getRingBuffer(); - // Check if the RingBuffer has enough capacity to reserve 10 slots for tasks - // If there is insufficient capacity (less than 10 slots available) - // log a warning and drop the current task - if (!ringBuffer.hasAvailableCapacity(10)) { - LOG.warn("ring buffer has no available capacity,task will be dropped," - + "please check the task queue size."); - return false; + // Set the timeout to 1 second, converted to nanoseconds for precision + long timeoutInNanos = TimeUnit.SECONDS.toNanos(1); // Timeout set to 1 second + long startTime = System.nanoTime(); // Record the start time + + // Loop until the timeout is reached + while (System.nanoTime() - startTime < timeoutInNanos) { + // Check if there is enough remaining capacity in the ring buffer + // Adjusting to check if the required capacity is available (instead of hardcoding 1) + if (disruptor.getRingBuffer().remainingCapacity() > 1) { + // Publish the event if there is enough capacity + disruptor.getRingBuffer().publishEvent(eventTranslator, args); + if (LOG.isDebugEnabled()) { + LOG.debug("publishEvent success,the remaining buffer size is {}", + disruptor.getRingBuffer().remainingCapacity()); + } + return true; + } + + // Wait for a short period before retrying + try { + Thread.sleep(10); // Adjust the wait time as needed (maybe increase if not high-frequency) + } catch (InterruptedException e) { + // Log the exception and return false if interrupted + Thread.currentThread().interrupt(); // Restore interrupt status + LOG.warn("Thread interrupted while waiting to publish event", e); + return false; + } } - ringBuffer.publishEvent(eventTranslator, args); - return true; + + // Timeout reached without publishing the event + LOG.warn("Failed to publish event within the specified timeout (1 second)." + + "Queue may be full. the remaining buffer size is {}", + disruptor.getRingBuffer().remainingCapacity()); } catch (Exception e) { - LOG.warn("Failed to publish event", e); - // Handle the exception, e.g., retry or alert + // Catching general exceptions to handle unexpected errors + LOG.warn("Failed to publish event due to an unexpected error", e); } return false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DefaultTaskExecutorHandler.java b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DefaultTaskExecutorHandler.java index befa8cc35fcbcc..cdfe7c0fe08f63 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DefaultTaskExecutorHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DefaultTaskExecutorHandler.java @@ -36,35 +36,23 @@ public class DefaultTaskExecutorHandler implements WorkH @Override public void onEvent(ExecuteTaskEvent executeTaskEvent) { - T task = executeTaskEvent.getTask(); - if (null == task) { - log.warn("task is null, ignore,maybe task has been canceled"); - return; - } - if (task.isCancelled()) { - log.info("task is canceled, ignore. task id is {}", task.getTaskId()); - return; - } - log.info("start to execute task, task id is {}", task.getTaskId()); - try { - task.runTask(); - } catch (Exception e) { - //if task.onFail() throw exception, we will catch it here - log.warn("task before error, task id is {}", task.getTaskId(), e); - } - //todo we need discuss whether we need to use semaphore to control the concurrent task num - /* Semaphore semaphore = null; - // get token try { - int maxConcurrentTaskNum = executeTaskEvent.getJobConfig().getMaxConcurrentTaskNum(); - semaphore = TaskTokenManager.tryAcquire(task.getJobId(), maxConcurrentTaskNum); + T task = executeTaskEvent.getTask(); + if (null == task) { + log.warn("task is null, ignore,maybe task has been canceled"); + return; + } + if (task.isCancelled()) { + log.info("task is canceled, ignore. task id is {}", task.getTaskId()); + return; + } + log.info("start to execute task, task id is {}", task.getTaskId()); task.runTask(); } catch (Exception e) { - task.onFail(); - log.error("execute task error, task id is {}", task.getTaskId(), e); + log.error("execute task error, task id is {}", executeTaskEvent.getTask().getTaskId(), e); } finally { - if (null != semaphore) { - semaphore.release(); - }*/ + executeTaskEvent.clear(); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java index d93393aa0ef89f..b8f726c4a0c76f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java @@ -21,7 +21,6 @@ import org.apache.doris.job.common.JobStatus; import org.apache.doris.job.common.JobType; import org.apache.doris.job.common.TaskType; -import org.apache.doris.job.disruptor.TaskDisruptor; import org.apache.doris.job.disruptor.TimerJobEvent; import org.apache.doris.job.task.AbstractTask; @@ -40,9 +39,9 @@ @Log4j2 public class DispatchTaskHandler implements WorkHandler> { - private final Map> disruptorMap; + private final Map disruptorMap; - public DispatchTaskHandler(Map> disruptorMap) { + public DispatchTaskHandler(Map disruptorMap) { this.disruptorMap = disruptorMap; } @@ -66,7 +65,7 @@ public void onEvent(TimerJobEvent event) { } JobType jobType = event.getJob().getJobType(); for (AbstractTask task : tasks) { - if (!disruptorMap.get(jobType).publishEvent(task, event.getJob().getJobConfig())) { + if (!disruptorMap.get(jobType).addTask(task)) { task.cancel(); continue; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TaskProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TaskProcessor.java new file mode 100644 index 00000000000000..d9d3f25dcd8a80 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TaskProcessor.java @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.executor; + +import org.apache.doris.job.task.AbstractTask; + +import lombok.extern.log4j.Log4j2; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +@Log4j2 +public class TaskProcessor { + private ExecutorService executor; + + public TaskProcessor(int numberOfThreads, int queueSize, ThreadFactory threadFactory) { + this.executor = new ThreadPoolExecutor( + numberOfThreads, + numberOfThreads, + 0L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(queueSize), + threadFactory, + new ThreadPoolExecutor.AbortPolicy() + ); + } + + public boolean addTask(AbstractTask task) { + try { + executor.execute(() -> runTask(task)); + log.info("Add task to executor, task id: {}", task.getTaskId()); + return true; + } catch (RejectedExecutionException e) { + log.warn("Failed to add task to executor, task id: {}", task.getTaskId(), e); + return false; + } + } + + public void shutdown() { + log.info("Shutting down executor service..."); + executor.shutdown(); + try { + if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException e) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + log.info("Executor service shut down successfully."); + } + + private void runTask(AbstractTask task) { + try { + if (task == null) { + log.warn("Task is null, ignore. Maybe it has been canceled."); + return; + } + if (task.isCancelled()) { + log.info("Task is canceled, ignore. Task id: {}", task.getTaskId()); + return; + } + log.info("Start to execute task, task id: {}", task.getTaskId()); + task.runTask(); + } catch (Exception e) { + log.warn("Execute task error, task id: {}", task.getTaskId(), e); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java index cc82b59a36a36f..e77dfbadcb3742 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java @@ -22,13 +22,10 @@ import org.apache.doris.job.base.AbstractJob; import org.apache.doris.job.base.JobExecutionConfiguration; import org.apache.doris.job.common.JobType; -import org.apache.doris.job.disruptor.ExecuteTaskEvent; import org.apache.doris.job.disruptor.TaskDisruptor; import org.apache.doris.job.disruptor.TimerJobEvent; -import org.apache.doris.job.executor.DefaultTaskExecutorHandler; import org.apache.doris.job.executor.DispatchTaskHandler; -import org.apache.doris.job.extensions.insert.InsertTask; -import org.apache.doris.job.extensions.mtmv.MTMVTask; +import org.apache.doris.job.executor.TaskProcessor; import org.apache.doris.job.task.AbstractTask; import com.lmax.disruptor.EventFactory; @@ -44,7 +41,7 @@ public class TaskDisruptorGroupManager { - private final Map> disruptorMap = new EnumMap<>(JobType.class); + private final Map disruptorMap = new EnumMap<>(JobType.class); @Getter private TaskDisruptor> dispatchDisruptor; @@ -92,44 +89,27 @@ private void registerDispatchDisruptor() { } private void registerInsertDisruptor() { - EventFactory> insertEventFactory = ExecuteTaskEvent.factory(); ThreadFactory insertTaskThreadFactory = new CustomThreadFactory("insert-task-execute"); - WorkHandler[] insertTaskExecutorHandlers = new WorkHandler[DISPATCH_INSERT_THREAD_NUM]; - for (int i = 0; i < DISPATCH_INSERT_THREAD_NUM; i++) { - insertTaskExecutorHandlers[i] = new DefaultTaskExecutorHandler(); - } - EventTranslatorVararg> eventTranslator = - (event, sequence, args) -> { - event.setTask((InsertTask) args[0]); - event.setJobConfig((JobExecutionConfiguration) args[1]); - }; - TaskDisruptor insertDisruptor = new TaskDisruptor<>(insertEventFactory, DISPATCH_INSERT_TASK_QUEUE_SIZE, - insertTaskThreadFactory, new LiteTimeoutBlockingWaitStrategy(10, TimeUnit.MILLISECONDS), - insertTaskExecutorHandlers, eventTranslator); - disruptorMap.put(JobType.INSERT, insertDisruptor); + + + TaskProcessor insertTaskProcessor = new TaskProcessor(DISPATCH_INSERT_THREAD_NUM, + DISPATCH_INSERT_TASK_QUEUE_SIZE, insertTaskThreadFactory); + disruptorMap.put(JobType.INSERT, insertTaskProcessor); } private void registerMTMVDisruptor() { - EventFactory> mtmvEventFactory = ExecuteTaskEvent.factory(); + ThreadFactory mtmvTaskThreadFactory = new CustomThreadFactory("mtmv-task-execute"); - WorkHandler[] insertTaskExecutorHandlers = new WorkHandler[DISPATCH_MTMV_THREAD_NUM]; - for (int i = 0; i < DISPATCH_MTMV_THREAD_NUM; i++) { - insertTaskExecutorHandlers[i] = new DefaultTaskExecutorHandler(); - } - EventTranslatorVararg> eventTranslator = - (event, sequence, args) -> { - event.setTask((MTMVTask) args[0]); - event.setJobConfig((JobExecutionConfiguration) args[1]); - }; - TaskDisruptor mtmvDisruptor = new TaskDisruptor<>(mtmvEventFactory, DISPATCH_MTMV_TASK_QUEUE_SIZE, - mtmvTaskThreadFactory, new LiteTimeoutBlockingWaitStrategy(10, TimeUnit.MILLISECONDS), - insertTaskExecutorHandlers, eventTranslator); - disruptorMap.put(JobType.MV, mtmvDisruptor); + TaskProcessor mtmvTaskProcessor = new TaskProcessor(DISPATCH_MTMV_THREAD_NUM, + DISPATCH_MTMV_TASK_QUEUE_SIZE, mtmvTaskThreadFactory); + disruptorMap.put(JobType.MV, mtmvTaskProcessor); } public boolean dispatchInstantTask(AbstractTask task, JobType jobType, JobExecutionConfiguration jobExecutionConfiguration) { - return disruptorMap.get(jobType).publishEvent(task, jobExecutionConfiguration); + + + return disruptorMap.get(jobType).addTask(task); }