diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java index 62ac0c4d59d743..906b86494fb748 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java @@ -155,6 +155,7 @@ public void cancelAllTasks() throws JobException { } for (T task : runningTasks) { task.cancel(); + canceledTaskCount.incrementAndGet(); } runningTasks = new CopyOnWriteArrayList<>(); logUpdateOperation(); @@ -185,6 +186,7 @@ public void cancelTaskById(long taskId) throws JobException { runningTasks.stream().filter(task -> task.getTaskId().equals(taskId)).findFirst() .orElseThrow(() -> new JobException("Not found task id: " + taskId)).cancel(); runningTasks.removeIf(task -> task.getTaskId().equals(taskId)); + canceledTaskCount.incrementAndGet(); if (jobConfig.getExecuteType().equals(JobExecuteType.ONE_TIME)) { updateJobStatus(JobStatus.FINISHED); } @@ -418,13 +420,13 @@ public TRow getTvfInfo() { /** * Generates a common error message when the execution queue is full. * - * @param taskId The ID of the task. - * @param queueConfigName The name of the queue configuration. + * @param taskId The ID of the task. + * @param queueConfigName The name of the queue configuration. * @param executeThreadConfigName The name of the execution thread configuration. * @return A formatted error message. */ protected String commonFormatMsgWhenExecuteQueueFull(Long taskId, String queueConfigName, - String executeThreadConfigName) { + String executeThreadConfigName) { return String.format("Dispatch task failed, jobId: %d, jobName: %s, taskId: %d, the queue size is full, " + "you can increase the queue size by setting the property " + "%s in the fe.conf file or increase the value of " diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java index e7d5b8b1d54eb6..fcf0a6b33f998f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java @@ -209,6 +209,9 @@ public void run() throws JobException { @Override public void onFail() throws JobException { + if (isCanceled.get()) { + return; + } isFinished.set(true); super.onFail(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java index 39646bab18f785..47a3a0c5c19d5c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java @@ -189,6 +189,9 @@ private void dropJob(T dropJob, String jobName) throws JobException { public void alterJobStatus(Long jobId, JobStatus status) throws JobException { checkJobExist(jobId); jobMap.get(jobId).updateJobStatus(status); + if (status.equals(JobStatus.RUNNING)) { + jobScheduler.scheduleOneJob(jobMap.get(jobId)); + } jobMap.get(jobId).logUpdateOperation(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java index f78446aaf85cbf..8a230c0bd385f7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java @@ -167,6 +167,9 @@ public void runTask() throws JobException { run(); onSuccess(); } catch (Exception e) { + if (TaskStatus.CANCELED.equals(status)) { + return; + } this.errMsg = e.getMessage(); onFail(); log.warn("execute task error, job id is {}, task id is {}", jobId, taskId, e); diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy b/regression-test/suites/job_p0/test_base_insert_job.groovy index 76264d8ae94436..97fda38bf2c027 100644 --- a/regression-test/suites/job_p0/test_base_insert_job.groovy +++ b/regression-test/suites/job_p0/test_base_insert_job.groovy @@ -221,9 +221,11 @@ suite("test_base_insert_job") { RESUME JOB where jobname = '${jobName}' """ println(tasks.size()) + // test resume job success Awaitility.await("resume-job-test").atMost(60, SECONDS).until({ def afterResumeTasks = sql """ select status from tasks("type"="insert") where JobName= '${jobName}' """ println "resume tasks :" + afterResumeTasks + //resume tasks size should be greater than before pause afterResumeTasks.size() > tasks.size() }) @@ -249,7 +251,6 @@ suite("test_base_insert_job") { CREATE JOB ${jobName} ON SCHEDULE at '2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); """ } catch (Exception e) { - println e.getMessage() assert e.getMessage().contains("startTimeMs must be greater than current time") } // assert end time less than start time @@ -283,6 +284,14 @@ suite("test_base_insert_job") { } catch (Exception e) { assert e.getMessage().contains("Invalid interval time unit: years") } + // assert interval time unit is -1 + try { + sql """ + CREATE JOB test_error_starts ON SCHEDULE every -1 second comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); + """ + } catch (Exception e) { + //ignore + } // test keyword as job name sql """