Skip to content

Commit

Permalink
[Fix](Job)The INSERT execution failed, but the task record status is …
Browse files Browse the repository at this point in the history
…marked as successful. (#44292)

### What problem does this PR solve?

After a job execution is completed, we need to verify whether the
QueryState is normal.
Currently, we rely on exception handling to determine success. However,
in certain cases, such as execution timeouts, exceptions may not be
captured. As a result, the status is incorrectly marked as successful.

### Release note


None

### Check List (For Author)

- Test <!-- At least one of them must be included. -->
    - [x] Manual test (add detailed scripts or steps below)
```
select * from tasks('type'='insert') ;


+----------------+----------------+---------+-------------------------------+---------+----------------------------------------------+---------------------+---------------------+------------+-------------+---------------+------+
| TaskId         | JobId          | JobName | Label                         | Status  | ErrorMsg                                     | CreateTime          | StartTime           | FinishTime | TrackingUrl | LoadStatistic | User |
+----------------+----------------+---------+-------------------------------+---------+----------------------------------------------+---------------------+---------------------+------------+-------------+---------------+------+
| 53767413936871 | 53748267972932 | test    | 53748267972932_53767413936871 | RUNNING |                                              | 2024-11-19 21:39:46 | 2024-11-19 21:39:46 |            |             |               | root |
| 53758617801828 | 53748267972932 | test    | 53748267972932_53758617801828 | FAILED  | errCode = 2, detailMessage = Execute timeout | 2024-11-19 21:39:37 | 2024-11-19 21:39:37 |            |             |               | root |

```
  • Loading branch information
CalvinKirs authored and Your Name committed Nov 22, 2024
1 parent 702abbf commit 855a5b7
Showing 1 changed file with 19 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.load.FailMsg;
Expand All @@ -30,6 +31,7 @@
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;
Expand Down Expand Up @@ -195,10 +197,13 @@ public void run() throws JobException {
return;
}
command.runWithUpdateInfo(ctx, stmtExecutor, loadStatistic);
if (ctx.getState().getStateType() != QueryState.MysqlStateType.OK) {
throw new JobException(ctx.getState().getErrorMessage());
}
} catch (Exception e) {
log.warn("execute insert task error, job id is {}, task id is {},sql is {}", getJobId(),
getTaskId(), sql, e);
throw new JobException(e);
throw new JobException(Util.getRootCauseMessage(e));
}
}

Expand Down Expand Up @@ -237,15 +242,7 @@ public TRow getTvfInfo(String jobName) {
trow.addToColumnValue(new TCell().setStringVal(jobName));
trow.addToColumnValue(new TCell().setStringVal(getJobId() + LABEL_SPLITTER + getTaskId()));
trow.addToColumnValue(new TCell().setStringVal(jobInfo.getState().name()));
// err msg
String errorMsg = "";
if (failMsg != null) {
errorMsg = failMsg.getMsg();
}
if (StringUtils.isNotBlank(getErrMsg())) {
errorMsg = getErrMsg();
}
trow.addToColumnValue(new TCell().setStringVal(errorMsg));
trow.addToColumnValue(new TCell().setStringVal(getErrorMsg()));
// create time
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
trow.addToColumnValue(new TCell().setStringVal(null == getStartTimeMs() ? ""
Expand Down Expand Up @@ -275,7 +272,7 @@ private TRow getPendingTaskTVFInfo(String jobName) {
trow.addToColumnValue(new TCell().setStringVal(jobName));
trow.addToColumnValue(new TCell().setStringVal(getJobId() + LABEL_SPLITTER + getTaskId()));
trow.addToColumnValue(new TCell().setStringVal(getStatus().name()));
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new TCell().setStringVal(getErrorMsg()));
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
trow.addToColumnValue(new TCell().setStringVal(null == getStartTimeMs() ? ""
: TimeUtils.longToTimeString(getStartTimeMs())));
Expand All @@ -287,4 +284,15 @@ private TRow getPendingTaskTVFInfo(String jobName) {
return trow;
}

private String getErrorMsg() {
// err msg
String errorMsg = "";
if (failMsg != null) {
errorMsg = failMsg.getMsg();
}
if (StringUtils.isNotBlank(getErrMsg())) {
errorMsg = getErrMsg();
}
return errorMsg;
}
}

0 comments on commit 855a5b7

Please sign in to comment.