From 855a5b7d8f2e1bee4e5aa0a0993d65aa271801a2 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Fri, 22 Nov 2024 11:00:48 +0800 Subject: [PATCH] [Fix](Job)The INSERT execution failed, but the task record status is marked as successful. (#44292) ### What problem does this PR solve? After a job execution is completed, we need to verify whether the QueryState is normal. Currently, we rely on exception handling to determine success. However, in certain cases, such as execution timeouts, exceptions may not be captured. As a result, the status is incorrectly marked as successful. ### Release note None ### Check List (For Author) - Test - [x] Manual test (add detailed scripts or steps below) ``` select * from tasks('type'='insert') ; +----------------+----------------+---------+-------------------------------+---------+----------------------------------------------+---------------------+---------------------+------------+-------------+---------------+------+ | TaskId | JobId | JobName | Label | Status | ErrorMsg | CreateTime | StartTime | FinishTime | TrackingUrl | LoadStatistic | User | +----------------+----------------+---------+-------------------------------+---------+----------------------------------------------+---------------------+---------------------+------------+-------------+---------------+------+ | 53767413936871 | 53748267972932 | test | 53748267972932_53767413936871 | RUNNING | | 2024-11-19 21:39:46 | 2024-11-19 21:39:46 | | | | root | | 53758617801828 | 53748267972932 | test | 53748267972932_53758617801828 | FAILED | errCode = 2, detailMessage = Execute timeout | 2024-11-19 21:39:37 | 2024-11-19 21:39:37 | | | | root | ``` --- .../job/extensions/insert/InsertTask.java | 30 ++++++++++++------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java index b4f52808f4b04b..e7d5b8b1d54eb6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java @@ -22,6 +22,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.ScalarType; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.common.util.Util; import org.apache.doris.job.exception.JobException; import org.apache.doris.job.task.AbstractTask; import org.apache.doris.load.FailMsg; @@ -30,6 +31,7 @@ import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.QueryState; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.thrift.TCell; import org.apache.doris.thrift.TRow; @@ -195,10 +197,13 @@ public void run() throws JobException { return; } command.runWithUpdateInfo(ctx, stmtExecutor, loadStatistic); + if (ctx.getState().getStateType() != QueryState.MysqlStateType.OK) { + throw new JobException(ctx.getState().getErrorMessage()); + } } catch (Exception e) { log.warn("execute insert task error, job id is {}, task id is {},sql is {}", getJobId(), getTaskId(), sql, e); - throw new JobException(e); + throw new JobException(Util.getRootCauseMessage(e)); } } @@ -237,15 +242,7 @@ public TRow getTvfInfo(String jobName) { trow.addToColumnValue(new TCell().setStringVal(jobName)); trow.addToColumnValue(new TCell().setStringVal(getJobId() + LABEL_SPLITTER + getTaskId())); trow.addToColumnValue(new TCell().setStringVal(jobInfo.getState().name())); - // err msg - String errorMsg = ""; - if (failMsg != null) { - errorMsg = failMsg.getMsg(); - } - if (StringUtils.isNotBlank(getErrMsg())) { - errorMsg = getErrMsg(); - } - trow.addToColumnValue(new TCell().setStringVal(errorMsg)); + trow.addToColumnValue(new TCell().setStringVal(getErrorMsg())); // create time trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs()))); trow.addToColumnValue(new TCell().setStringVal(null == getStartTimeMs() ? "" @@ -275,7 +272,7 @@ private TRow getPendingTaskTVFInfo(String jobName) { trow.addToColumnValue(new TCell().setStringVal(jobName)); trow.addToColumnValue(new TCell().setStringVal(getJobId() + LABEL_SPLITTER + getTaskId())); trow.addToColumnValue(new TCell().setStringVal(getStatus().name())); - trow.addToColumnValue(new TCell().setStringVal("")); + trow.addToColumnValue(new TCell().setStringVal(getErrorMsg())); trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs()))); trow.addToColumnValue(new TCell().setStringVal(null == getStartTimeMs() ? "" : TimeUtils.longToTimeString(getStartTimeMs()))); @@ -287,4 +284,15 @@ private TRow getPendingTaskTVFInfo(String jobName) { return trow; } + private String getErrorMsg() { + // err msg + String errorMsg = ""; + if (failMsg != null) { + errorMsg = failMsg.getMsg(); + } + if (StringUtils.isNotBlank(getErrMsg())) { + errorMsg = getErrMsg(); + } + return errorMsg; + } }