Skip to content

Commit

Permalink
[Improve](Job) Create task adds concurrency control (apache#29144)
Browse files Browse the repository at this point in the history
  • Loading branch information
CalvinKirs authored Dec 28, 2023
1 parent 8b225c6 commit 1284975
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

@Data
Expand Down Expand Up @@ -105,7 +107,7 @@ public AbstractJob(Long jobId, String jobName, JobStatus jobStatus,
UserIdentity createUser,
JobExecutionConfiguration jobConfig) {
this(jobId, jobName, jobStatus, currentDbName, comment,
createUser, jobConfig, System.currentTimeMillis(), null, null);
createUser, jobConfig, System.currentTimeMillis(), null);
}

public AbstractJob(Long jobId, String jobName, JobStatus jobStatus,
Expand All @@ -114,8 +116,7 @@ public AbstractJob(Long jobId, String jobName, JobStatus jobStatus,
UserIdentity createUser,
JobExecutionConfiguration jobConfig,
Long createTimeMs,
String executeSql,
List<T> runningTasks) {
String executeSql) {
this.jobId = jobId;
this.jobName = jobName;
this.jobStatus = jobStatus;
Expand All @@ -125,11 +126,12 @@ public AbstractJob(Long jobId, String jobName, JobStatus jobStatus,
this.jobConfig = jobConfig;
this.createTimeMs = createTimeMs;
this.executeSql = executeSql;
this.runningTasks = runningTasks;
}

private List<T> runningTasks = new ArrayList<>();

private Lock createTaskLock = new ReentrantLock();

@Override
public void cancelAllTasks() throws JobException {
if (CollectionUtils.isEmpty(runningTasks)) {
Expand Down Expand Up @@ -201,13 +203,22 @@ public List<T> commonCreateTasks(TaskType taskType, C taskContext) {
log.info("job is not ready for scheduling, job id is {}", jobId);
return new ArrayList<>();
}
return createTasks(taskType, taskContext);
try {
//it's better to use tryLock and add timeout limit
createTaskLock.lock();
if (!isReadyForScheduling(taskContext)) {
log.info("job is not ready for scheduling, job id is {}", jobId);
return new ArrayList<>();
}
List<T> tasks = createTasks(taskType, taskContext);
tasks.forEach(task -> log.info("common create task, job id is {}, task id is {}", jobId, task.getTaskId()));
return tasks;
} finally {
createTaskLock.unlock();
}
}

public void initTasks(Collection<? extends T> tasks, TaskType taskType) {
if (CollectionUtils.isEmpty(getRunningTasks())) {
runningTasks = new ArrayList<>();
}
tasks.forEach(task -> {
task.setTaskType(taskType);
task.setJobId(getJobId());
Expand Down Expand Up @@ -260,6 +271,7 @@ public static AbstractJob readFields(DataInput in) throws IOException {
String jsonJob = Text.readString(in);
AbstractJob job = GsonUtils.GSON.fromJson(jsonJob, AbstractJob.class);
job.runningTasks = new ArrayList<>();
job.createTaskLock = new ReentrantLock();
return job;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public InsertJob(String jobName,
Long createTimeMs,
String executeSql) {
super(getNextJobId(), jobName, jobStatus, dbName, comment, createUser,
jobConfig, createTimeMs, executeSql, null);
jobConfig, createTimeMs, executeSql);
this.dbId = ConnectContext.get().getCurrentDbId();
}

Expand Down

0 comments on commit 1284975

Please sign in to comment.