Skip to content

Commit

Permalink
branch-2.1: [Fix](Job)Fix some issues in the Insert job. apache#44543 (
Browse files Browse the repository at this point in the history
…apache#44597)

Cherry-picked from apache#44543

---------

Co-authored-by: Calvin Kirs <[email protected]>
  • Loading branch information
github-actions[bot] and CalvinKirs authored Nov 30, 2024
1 parent 4b15b1f commit f9b3863
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ public void cancelAllTasks() throws JobException {
}
for (T task : runningTasks) {
task.cancel();
canceledTaskCount.incrementAndGet();
}
runningTasks = new CopyOnWriteArrayList<>();
logUpdateOperation();
Expand Down Expand Up @@ -185,6 +186,7 @@ public void cancelTaskById(long taskId) throws JobException {
runningTasks.stream().filter(task -> task.getTaskId().equals(taskId)).findFirst()
.orElseThrow(() -> new JobException("Not found task id: " + taskId)).cancel();
runningTasks.removeIf(task -> task.getTaskId().equals(taskId));
canceledTaskCount.incrementAndGet();
if (jobConfig.getExecuteType().equals(JobExecuteType.ONE_TIME)) {
updateJobStatus(JobStatus.FINISHED);
}
Expand Down Expand Up @@ -418,13 +420,13 @@ public TRow getTvfInfo() {
/**
* Generates a common error message when the execution queue is full.
*
* @param taskId The ID of the task.
* @param queueConfigName The name of the queue configuration.
* @param taskId The ID of the task.
* @param queueConfigName The name of the queue configuration.
* @param executeThreadConfigName The name of the execution thread configuration.
* @return A formatted error message.
*/
protected String commonFormatMsgWhenExecuteQueueFull(Long taskId, String queueConfigName,
String executeThreadConfigName) {
String executeThreadConfigName) {
return String.format("Dispatch task failed, jobId: %d, jobName: %s, taskId: %d, the queue size is full, "
+ "you can increase the queue size by setting the property "
+ "%s in the fe.conf file or increase the value of "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ public void run() throws JobException {

@Override
public void onFail() throws JobException {
if (isCanceled.get()) {
return;
}
isFinished.set(true);
super.onFail();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ private void dropJob(T dropJob, String jobName) throws JobException {
public void alterJobStatus(Long jobId, JobStatus status) throws JobException {
checkJobExist(jobId);
jobMap.get(jobId).updateJobStatus(status);
if (status.equals(JobStatus.RUNNING)) {
jobScheduler.scheduleOneJob(jobMap.get(jobId));
}
jobMap.get(jobId).logUpdateOperation();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ public void runTask() throws JobException {
run();
onSuccess();
} catch (Exception e) {
if (TaskStatus.CANCELED.equals(status)) {
return;
}
this.errMsg = e.getMessage();
onFail();
log.warn("execute task error, job id is {}, task id is {}", jobId, taskId, e);
Expand Down
11 changes: 10 additions & 1 deletion regression-test/suites/job_p0/test_base_insert_job.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,11 @@ suite("test_base_insert_job") {
RESUME JOB where jobname = '${jobName}'
"""
println(tasks.size())
// test resume job success
Awaitility.await("resume-job-test").atMost(60, SECONDS).until({
def afterResumeTasks = sql """ select status from tasks("type"="insert") where JobName= '${jobName}' """
println "resume tasks :" + afterResumeTasks
//resume tasks size should be greater than before pause
afterResumeTasks.size() > tasks.size()
})

Expand All @@ -249,7 +251,6 @@ suite("test_base_insert_job") {
CREATE JOB ${jobName} ON SCHEDULE at '2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213');
"""
} catch (Exception e) {
println e.getMessage()
assert e.getMessage().contains("startTimeMs must be greater than current time")
}
// assert end time less than start time
Expand Down Expand Up @@ -283,6 +284,14 @@ suite("test_base_insert_job") {
} catch (Exception e) {
assert e.getMessage().contains("Invalid interval time unit: years")
}
// assert interval time unit is -1
try {
sql """
CREATE JOB test_error_starts ON SCHEDULE every -1 second comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213');
"""
} catch (Exception e) {
//ignore
}

// test keyword as job name
sql """
Expand Down

0 comments on commit f9b3863

Please sign in to comment.