From 1284975b9bc24ef0515f4e7af80169c212851348 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Thu, 28 Dec 2023 10:24:39 +0800 Subject: [PATCH] [Improve](Job) Create task adds concurrency control (#29144) --- .../apache/doris/job/base/AbstractJob.java | 28 +++++++++++++------ .../job/extensions/insert/InsertJob.java | 2 +- 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java index bc8f921d841dbe..9d7b817f73222c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java @@ -50,6 +50,8 @@ import java.util.Comparator; import java.util.List; import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @Data @@ -105,7 +107,7 @@ public AbstractJob(Long jobId, String jobName, JobStatus jobStatus, UserIdentity createUser, JobExecutionConfiguration jobConfig) { this(jobId, jobName, jobStatus, currentDbName, comment, - createUser, jobConfig, System.currentTimeMillis(), null, null); + createUser, jobConfig, System.currentTimeMillis(), null); } public AbstractJob(Long jobId, String jobName, JobStatus jobStatus, @@ -114,8 +116,7 @@ public AbstractJob(Long jobId, String jobName, JobStatus jobStatus, UserIdentity createUser, JobExecutionConfiguration jobConfig, Long createTimeMs, - String executeSql, - List runningTasks) { + String executeSql) { this.jobId = jobId; this.jobName = jobName; this.jobStatus = jobStatus; @@ -125,11 +126,12 @@ public AbstractJob(Long jobId, String jobName, JobStatus jobStatus, this.jobConfig = jobConfig; this.createTimeMs = createTimeMs; this.executeSql = executeSql; - this.runningTasks = runningTasks; } private List runningTasks = new ArrayList<>(); + private Lock createTaskLock = new ReentrantLock(); + @Override public void cancelAllTasks() throws JobException { if (CollectionUtils.isEmpty(runningTasks)) { @@ -201,13 +203,22 @@ public List commonCreateTasks(TaskType taskType, C taskContext) { log.info("job is not ready for scheduling, job id is {}", jobId); return new ArrayList<>(); } - return createTasks(taskType, taskContext); + try { + //it's better to use tryLock and add timeout limit + createTaskLock.lock(); + if (!isReadyForScheduling(taskContext)) { + log.info("job is not ready for scheduling, job id is {}", jobId); + return new ArrayList<>(); + } + List tasks = createTasks(taskType, taskContext); + tasks.forEach(task -> log.info("common create task, job id is {}, task id is {}", jobId, task.getTaskId())); + return tasks; + } finally { + createTaskLock.unlock(); + } } public void initTasks(Collection tasks, TaskType taskType) { - if (CollectionUtils.isEmpty(getRunningTasks())) { - runningTasks = new ArrayList<>(); - } tasks.forEach(task -> { task.setTaskType(taskType); task.setJobId(getJobId()); @@ -260,6 +271,7 @@ public static AbstractJob readFields(DataInput in) throws IOException { String jsonJob = Text.readString(in); AbstractJob job = GsonUtils.GSON.fromJson(jsonJob, AbstractJob.class); job.runningTasks = new ArrayList<>(); + job.createTaskLock = new ReentrantLock(); return job; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java index 90fc126b723c27..44cc63ee9992d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java @@ -217,7 +217,7 @@ public InsertJob(String jobName, Long createTimeMs, String executeSql) { super(getNextJobId(), jobName, jobStatus, dbName, comment, createUser, - jobConfig, createTimeMs, executeSql, null); + jobConfig, createTimeMs, executeSql); this.dbId = ConnectContext.get().getCurrentDbId(); }