Skip to content

Commit

Permalink
[Fix](Job)After a job execution is completed, we need to verify wheth…
Browse files Browse the repository at this point in the history
…er the QueryState is normal.

### backend
Currently, we rely on exception handling to determine success. However, in certain cases, such as execution timeouts, exceptions may not be captured. As a result, the status is incorrectly marked as successful.
  • Loading branch information
CalvinKirs committed Nov 19, 2024
1 parent 34383ad commit 4a858d8
Showing 1 changed file with 18 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;
Expand Down Expand Up @@ -195,10 +196,13 @@ public void run() throws JobException {
return;
}
command.runWithUpdateInfo(ctx, stmtExecutor, loadStatistic);
if (ctx.getState().getStateType() != QueryState.MysqlStateType.OK) {
throw new JobException(ctx.getState().getErrorMessage());
}
} catch (Exception e) {
log.warn("execute insert task error, job id is {}, task id is {},sql is {}", getJobId(),
getTaskId(), sql, e);
throw new JobException(e);
throw new JobException(e.getMessage());
}
}

Expand Down Expand Up @@ -237,15 +241,7 @@ public TRow getTvfInfo(String jobName) {
trow.addToColumnValue(new TCell().setStringVal(jobName));
trow.addToColumnValue(new TCell().setStringVal(getJobId() + LABEL_SPLITTER + getTaskId()));
trow.addToColumnValue(new TCell().setStringVal(jobInfo.getState().name()));
// err msg
String errorMsg = "";
if (failMsg != null) {
errorMsg = failMsg.getMsg();
}
if (StringUtils.isNotBlank(getErrMsg())) {
errorMsg = getErrMsg();
}
trow.addToColumnValue(new TCell().setStringVal(errorMsg));
trow.addToColumnValue(new TCell().setStringVal(getErrorMsg()));
// create time
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
trow.addToColumnValue(new TCell().setStringVal(null == getStartTimeMs() ? ""
Expand Down Expand Up @@ -275,7 +271,7 @@ private TRow getPendingTaskTVFInfo(String jobName) {
trow.addToColumnValue(new TCell().setStringVal(jobName));
trow.addToColumnValue(new TCell().setStringVal(getJobId() + LABEL_SPLITTER + getTaskId()));
trow.addToColumnValue(new TCell().setStringVal(getStatus().name()));
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new TCell().setStringVal(getErrorMsg()));
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
trow.addToColumnValue(new TCell().setStringVal(null == getStartTimeMs() ? ""
: TimeUtils.longToTimeString(getStartTimeMs())));
Expand All @@ -287,4 +283,15 @@ private TRow getPendingTaskTVFInfo(String jobName) {
return trow;
}

private String getErrorMsg() {
// err msg
String errorMsg = "";
if (failMsg != null) {
errorMsg = failMsg.getMsg();
}
if (StringUtils.isNotBlank(getErrMsg())) {
errorMsg = getErrMsg();
}
return errorMsg;
}
}

0 comments on commit 4a858d8

Please sign in to comment.