Skip to content

Commit

Permalink
[Optimization][admin] Add job reconnecting status and optimize job mo…
Browse files Browse the repository at this point in the history
…nitor (DataLinkDC#2360)

* [Optimization][admin] Add job reconnecting status and optimize job monitor

* spotless apply

---------

Co-authored-by: wenmo <[email protected]>
  • Loading branch information
aiwenmo and aiwenmo authored Oct 9, 2023
1 parent 5a8a568 commit bafa357
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,11 @@ public void check(JobInfoDetail jobInfoDetail) {
AlertRuleOptions.JOB_ALERT_RULE_EXCEPTIONS_MSG,
jobInfoDetail.getJobDataDto().getErrorMsg());
} else {
ruleFacts.put(
AlertRuleOptions.JOB_ALERT_RULE_EXCEPTIONS_MSG,
jobInfoDetail.getJobDataDto().getExceptions().getRootException());
if (Asserts.isNotNull(jobInfoDetail.getJobDataDto().getExceptions().getRootException())) {
ruleFacts.put(
AlertRuleOptions.JOB_ALERT_RULE_EXCEPTIONS_MSG,
jobInfoDetail.getJobDataDto().getExceptions().getRootException());
}
}

rulesEngine.fire(rules, ruleFacts);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,26 @@ public static boolean refreshJob(JobInfoDetail jobInfoDetail, boolean needSave)
}
jobInstance.setUpdateTime(LocalDateTime.now());

// Set to true if the job status has completed
// If the job status is Unknown and the status fails to be updated for 1 minute, set to true and discard the
// The transition status include failed and reconnecting ( Dinky custom )
// The done status include failed and canceled and finished and unknown ( Dinky custom )
// The task status of batch job which network unstable: run -> transition -> run -> transition -> done
// The task status of stream job which automatically restart after failure: run -> transition -> run ->
// transition -> run
// Set to true if the job status which is done has completed
// If the job status is transition and the status fails to be updated for 1 minute, set to true and discard the
// update

boolean isTransition = JobStatus.isTransition(jobInstance.getStatus())
&& (TimeUtil.localDateTimeToLong(jobInstance.getFinishTime()) > 0
&& Duration.between(jobInstance.getFinishTime(), LocalDateTime.now())
.toMinutes()
< 1);

if (isTransition) {
log.debug("Job is transition: {}->{}", jobInstance.getId(), jobInstance.getName());
return false;
}

boolean isDone = (JobStatus.isDone(jobInstance.getStatus()))
|| (TimeUtil.localDateTimeToLong(jobInstance.getFinishTime()) > 0
&& Duration.between(jobInstance.getFinishTime(), LocalDateTime.now())
Expand Down
21 changes: 21 additions & 0 deletions dinky-common/src/main/java/org/dinky/data/enums/JobStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ public enum JobStatus {
/** The job is currently reconciling and waits for task execution report to recover state. */
RECONCILING("RECONCILING"),

/**
* The job is reconnecting.
*/
RECONNECTING("RECONNECTING"),

/** The job can't get any info. */
UNKNOWN("UNKNOWN");

Expand All @@ -97,6 +102,17 @@ public static boolean isDone(String value) {
case FAILED:
case CANCELED:
case FINISHED:
case UNKNOWN:
return true;
default:
return false;
}
}

public static boolean isTransition(String value) {
switch (get(value)) {
case FAILED:
case RECONNECTING:
return true;
default:
return false;
Expand All @@ -108,6 +124,7 @@ public boolean isDone() {
case FAILED:
case CANCELED:
case FINISHED:
case UNKNOWN:
return true;
default:
return false;
Expand All @@ -117,4 +134,8 @@ public boolean isDone() {
public static List<JobStatus> getAllDoneStatus() {
return Lists.newArrayList(FAILED, CANCELED, FINISHED, UNKNOWN);
}

public boolean equalVal(String value) {
return this.equals(get(value));
}
}
4 changes: 2 additions & 2 deletions dinky-flink/dinky-flink-1.16/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@

<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-3-uber -->
<dependency>
<!--<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-3-uber</artifactId>
<version>3.1.1.7.2.9.0-173-9.0</version>
</dependency>
</dependency>-->

<dependency>
<groupId>org.apache.flink</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,6 @@

import cn.hutool.core.io.FileUtil;

/**
* YarnSubmiter
*
* @since 2021/10/29
*/
public abstract class YarnGateway extends AbstractGateway {

public static final String HADOOP_CONFIG = "fs.hdfs.hadoopconf";
Expand Down Expand Up @@ -254,14 +249,17 @@ public JobStatus getJobStatusById(String id) {
case KILLED:
return JobStatus.CANCELED;
case SUBMITTED:
case ACCEPTED:
case NEW:
case NEW_SAVING:
return JobStatus.CREATED;
default:
return JobStatus.INITIALIZING;
return JobStatus.UNKNOWN;
}
} catch (YarnException | IOException e) {
logger.error(e.getMessage());
return JobStatus.UNKNOWN;
}
return JobStatus.UNKNOWN;
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions dinky-web/src/components/FlinkDag/component/DagDataNode.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ const DagDataNode = (props: any) => {
<Text style={{ display: 'inline-flex', alignItems: 'center' }} type='secondary'>
{' '}
{l('devops.baseinfo.busy')}:
{renderRatio(backpressure?backpressure.subtasks[0]?.busyRatio:0, false)}
{renderRatio((backpressure && backpressure.subtasks)?backpressure.subtasks[0]?.busyRatio:0, false)}
</Text>
</Col>
<Col flex='auto'>
Expand All @@ -113,7 +113,7 @@ const DagDataNode = (props: any) => {
<Col flex='35%'>
<Text style={{ display: 'inline-flex', alignItems: 'center' }} type='secondary'>
{l('devops.baseinfo.idle')}:
{renderRatio(backpressure?backpressure.subtasks[0]?.idleRatio:0, true)}
{renderRatio((backpressure && backpressure.subtasks)?backpressure.subtasks[0]?.idleRatio:0, true)}
</Text>
</Col>
<Col flex='auto'>
Expand Down
6 changes: 6 additions & 0 deletions dinky-web/src/components/JobTags/StatusTag.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ const StatusTag = (props: StatusTagProps) => {
color: 'default',
text: 'CREATED'
};
case JOB_STATUS.RECONNECTING:
return {
icon: <ClockCircleOutlined />,
color: 'default',
text: 'RECONNECTING'
};
case JOB_STATUS.UNKNOWN:
return {
icon: <QuestionCircleOutlined />,
Expand Down
1 change: 1 addition & 0 deletions dinky-web/src/pages/DevOps/constants.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export enum JOB_STATUS {
FAILING = 'FAILING',
SUSPENDED = 'SUSPENDED',
CANCELLING = 'CANCELLING',
RECONNECTING = 'RECONNECTING',
UNKNOWN = 'UNKNOWN'
}

Expand Down

0 comments on commit bafa357

Please sign in to comment.