From 8ab0e6deec814c50cda20219d802dc83747d4884 Mon Sep 17 00:00:00 2001
From: aiwenmo <32723967+aiwenmo@users.noreply.github.com>
Date: Sat, 14 Jan 2023 21:22:44 +0800
Subject: [PATCH] [Fix-1556] [core] Fix job submission succeeded but failed
(#1557)
Co-authored-by: wenmo <32723967+wenmo@users.noreply.github.com>
---
.../src/main/java/com/dlink/model/Table.java | 18 ++++----
.../src/main/java/com/dlink/job/Job.java | 20 +++++----
.../main/java/com/dlink/job/JobManager.java | 27 ++++++++----
pom.xml | 44 -------------------
4 files changed, 41 insertions(+), 68 deletions(-)
diff --git a/dlink-common/src/main/java/com/dlink/model/Table.java b/dlink-common/src/main/java/com/dlink/model/Table.java
index 0e55e2edfb..ab4f28bcd9 100644
--- a/dlink-common/src/main/java/com/dlink/model/Table.java
+++ b/dlink-common/src/main/java/com/dlink/model/Table.java
@@ -21,8 +21,6 @@
import com.dlink.assertion.Asserts;
import com.dlink.utils.SqlUtil;
-import lombok.Getter;
-import lombok.Setter;
import java.beans.Transient;
import java.io.Serializable;
@@ -30,7 +28,8 @@
import java.util.Date;
import java.util.List;
-
+import lombok.Getter;
+import lombok.Setter;
/**
* Table
@@ -63,7 +62,6 @@ public class Table implements Serializable, Comparable
, Cloneable {
*/
private List schemaTableNameList;
-
private List columns;
public Table() {
@@ -77,12 +75,16 @@ public Table(String name, String schema, List columns) {
@Transient
public String getSchemaTableName() {
- return Asserts.isNullString(replaceSchemaMiddleLine(schema)) ? name : replaceSchemaMiddleLine(schema) + "." + name;
+ return Asserts.isNullString(replaceSchemaMiddleLine(schema))
+ ? name
+ : replaceSchemaMiddleLine(schema) + "." + name;
}
@Transient
public String getSchemaTableNameWithUnderline() {
- return Asserts.isNullString(replaceSchemaMiddleLine(schema)) ? name : replaceSchemaMiddleLine(schema) + "_" + name;
+ return Asserts.isNullString(replaceSchemaMiddleLine(schema))
+ ? name
+ : replaceSchemaMiddleLine(schema) + "_" + name;
}
@Override
@@ -274,8 +276,8 @@ public Object clone() {
}
private String replaceSchemaMiddleLine(String schema) {
- if (schema.contains("-")){
- return schema.replaceAll("-","_");
+ if (schema.contains("-")) {
+ return schema.replaceAll("-", "_");
}
return schema;
}
diff --git a/dlink-core/src/main/java/com/dlink/job/Job.java b/dlink-core/src/main/java/com/dlink/job/Job.java
index c6a8d8d296..70af0435a0 100644
--- a/dlink-core/src/main/java/com/dlink/job/Job.java
+++ b/dlink-core/src/main/java/com/dlink/job/Job.java
@@ -39,6 +39,7 @@
@Getter
@Setter
public class Job {
+
private Integer id;
private Integer jobInstanceId;
private JobConfig jobConfig;
@@ -57,14 +58,11 @@ public class Job {
private List jids;
public enum JobStatus {
- INITIALIZE,
- RUNNING,
- SUCCESS,
- FAILED,
- CANCEL
+ INITIALIZE, RUNNING, SUCCESS, FAILED, CANCEL
}
- public Job(JobConfig jobConfig, GatewayType type, JobStatus status, String statement, ExecutorSetting executorSetting, Executor executor, boolean useGateway) {
+ public Job(JobConfig jobConfig, GatewayType type, JobStatus status, String statement,
+ ExecutorSetting executorSetting, Executor executor, boolean useGateway) {
this.jobConfig = jobConfig;
this.type = type;
this.status = status;
@@ -75,11 +73,17 @@ public Job(JobConfig jobConfig, GatewayType type, JobStatus status, String state
this.useGateway = useGateway;
}
- public static Job init(GatewayType type, JobConfig jobConfig, ExecutorSetting executorSetting, Executor executor, String statement, boolean useGateway) {
+ public static Job init(GatewayType type, JobConfig jobConfig, ExecutorSetting executorSetting, Executor executor,
+ String statement, boolean useGateway) {
return new Job(jobConfig, type, JobStatus.INITIALIZE, statement, executorSetting, executor, useGateway);
}
public JobResult getJobResult() {
- return new JobResult(id, jobInstanceId, jobConfig, jobManagerAddress, status, statement, jobId, error, result, startTime, endTime);
+ return new JobResult(id, jobInstanceId, jobConfig, jobManagerAddress, status, statement, jobId, error, result,
+ startTime, endTime);
+ }
+
+ public boolean isFailed() {
+ return status.equals(JobStatus.FAILED);
}
}
diff --git a/dlink-core/src/main/java/com/dlink/job/JobManager.java b/dlink-core/src/main/java/com/dlink/job/JobManager.java
index ab777cedec..e1568fa6c8 100644
--- a/dlink-core/src/main/java/com/dlink/job/JobManager.java
+++ b/dlink-core/src/main/java/com/dlink/job/JobManager.java
@@ -436,6 +436,12 @@ public JobResult executeSql(String statement) {
job.setJobId(gatewayResult.getAppId());
job.setJids(gatewayResult.getJids());
job.setJobManagerAddress(formatAddress(gatewayResult.getWebURL()));
+ if (gatewayResult.isSucess()) {
+ job.setStatus(Job.JobStatus.SUCCESS);
+ } else {
+ job.setStatus(Job.JobStatus.FAILED);
+ job.setError(gatewayResult.getError());
+ }
} else if (useStatementSet && !useGateway) {
List inserts = new ArrayList<>();
for (StatementParam item : jobParam.getTrans()) {
@@ -478,6 +484,12 @@ public JobResult executeSql(String statement) {
job.setJobId(gatewayResult.getAppId());
job.setJids(gatewayResult.getJids());
job.setJobManagerAddress(formatAddress(gatewayResult.getWebURL()));
+ if (gatewayResult.isSucess()) {
+ job.setStatus(Job.JobStatus.SUCCESS);
+ } else {
+ job.setStatus(Job.JobStatus.FAILED);
+ job.setError(gatewayResult.getError());
+ }
} else {
for (StatementParam item : jobParam.getTrans()) {
currentSql = item.getValue();
@@ -542,10 +554,9 @@ public JobResult executeSql(String statement) {
job.setJobId(gatewayResult.getAppId());
job.setJids(gatewayResult.getJids());
job.setJobManagerAddress(formatAddress(gatewayResult.getWebURL()));
-
- if (gatewayResult.isSucess()){
+ if (gatewayResult.isSucess()) {
job.setStatus(Job.JobStatus.SUCCESS);
- }else {
+ } else {
job.setStatus(Job.JobStatus.FAILED);
job.setError(gatewayResult.getError());
}
@@ -573,13 +584,13 @@ public JobResult executeSql(String statement) {
.getResult(null);
job.setResult(result);
}
- job.setStatus(Job.JobStatus.SUCCESS);
}
}
job.setEndTime(LocalDateTime.now());
- if (job.getStatus().name().equals(Job.JobStatus.FAILED.name())){
+ if (job.isFailed()) {
failed();
- }else {
+ } else {
+ job.setStatus(Job.JobStatus.SUCCESS);
success();
}
} catch (Exception e) {
@@ -740,10 +751,10 @@ public JobResult executeJar() {
job.setJobManagerAddress(formatAddress(gatewayResult.getWebURL()));
job.setEndTime(LocalDateTime.now());
- if (gatewayResult.isSucess()){
+ if (gatewayResult.isSucess()) {
job.setStatus(Job.JobStatus.SUCCESS);
success();
- }else {
+ } else {
job.setError(gatewayResult.getError());
job.setStatus(Job.JobStatus.FAILED);
failed();
diff --git a/pom.xml b/pom.xml
index 5d389acb59..88acc44a10 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,7 +56,6 @@
UTF-8
3.8.1
3.0.0-M5
- 3.1.2
2.12
2.12.10
@@ -583,38 +582,6 @@
maven-surefire-plugin
${maven-surefire-plugin.version}
-
-
- org.apache.maven.plugins
- maven-checkstyle-plugin
- ${maven-checkstyle-plugin.version}
-
- true
- UTF-8
- style/checkstyle.xml
- true
- true
-
- ${project.build.sourceDirectory}
-
- **\/generated-sources\/
-
-
-
- com.puppycrawl.tools
- checkstyle
- 8.45
-
-
-
-
-
- check
-
- compile
-
-
-
@@ -649,10 +616,6 @@
org.apache.maven.plugins
maven-surefire-plugin
-
- org.apache.maven.plugins
- maven-checkstyle-plugin
-
com.diffplug.spotless
spotless-maven-plugin
@@ -996,13 +959,6 @@
-
- org.apache.maven.plugins
- maven-checkstyle-plugin
-
- true
-
-
com.diffplug.spotless
spotless-maven-plugin