From bafa357ce5763f7bd3cba40a723cd2ab4aa69855 Mon Sep 17 00:00:00 2001 From: aiwenmo <32723967+aiwenmo@users.noreply.github.com> Date: Mon, 9 Oct 2023 08:01:53 +0800 Subject: [PATCH] [Optimization][admin] Add job reconnecting status and optimize job monitor (#2360) * [Optimization][admin] Add job reconnecting status and optimize job monitor * spotless apply --------- Co-authored-by: wenmo <32723967+wenmo@users.noreply.github.com> --- .../dinky/job/handler/JobAlertHandler.java | 8 ++++--- .../dinky/job/handler/JobRefreshHandler.java | 21 +++++++++++++++++-- .../java/org/dinky/data/enums/JobStatus.java | 21 +++++++++++++++++++ dinky-flink/dinky-flink-1.16/pom.xml | 4 ++-- .../org/dinky/gateway/yarn/YarnGateway.java | 12 +++++------ .../FlinkDag/component/DagDataNode.tsx | 4 ++-- .../src/components/JobTags/StatusTag.tsx | 6 ++++++ dinky-web/src/pages/DevOps/constants.tsx | 1 + 8 files changed, 61 insertions(+), 16 deletions(-) diff --git a/dinky-admin/src/main/java/org/dinky/job/handler/JobAlertHandler.java b/dinky-admin/src/main/java/org/dinky/job/handler/JobAlertHandler.java index 29f3d5dc0a..4ae9b6ae59 100644 --- a/dinky-admin/src/main/java/org/dinky/job/handler/JobAlertHandler.java +++ b/dinky-admin/src/main/java/org/dinky/job/handler/JobAlertHandler.java @@ -150,9 +150,11 @@ public void check(JobInfoDetail jobInfoDetail) { AlertRuleOptions.JOB_ALERT_RULE_EXCEPTIONS_MSG, jobInfoDetail.getJobDataDto().getErrorMsg()); } else { - ruleFacts.put( - AlertRuleOptions.JOB_ALERT_RULE_EXCEPTIONS_MSG, - jobInfoDetail.getJobDataDto().getExceptions().getRootException()); + if (Asserts.isNotNull(jobInfoDetail.getJobDataDto().getExceptions().getRootException())) { + ruleFacts.put( + AlertRuleOptions.JOB_ALERT_RULE_EXCEPTIONS_MSG, + jobInfoDetail.getJobDataDto().getExceptions().getRootException()); + } } rulesEngine.fire(rules, ruleFacts); diff --git a/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java b/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java index dee8fdd1fb..9d422fd252 100644 --- a/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java +++ b/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java @@ -115,9 +115,26 @@ public static boolean refreshJob(JobInfoDetail jobInfoDetail, boolean needSave) } jobInstance.setUpdateTime(LocalDateTime.now()); - // Set to true if the job status has completed - // If the job status is Unknown and the status fails to be updated for 1 minute, set to true and discard the + // The transition status include failed and reconnecting ( Dinky custom ) + // The done status include failed and canceled and finished and unknown ( Dinky custom ) + // The task status of batch job which network unstable: run -> transition -> run -> transition -> done + // The task status of stream job which automatically restart after failure: run -> transition -> run -> + // transition -> run + // Set to true if the job status which is done has completed + // If the job status is transition and the status fails to be updated for 1 minute, set to true and discard the // update + + boolean isTransition = JobStatus.isTransition(jobInstance.getStatus()) + && (TimeUtil.localDateTimeToLong(jobInstance.getFinishTime()) > 0 + && Duration.between(jobInstance.getFinishTime(), LocalDateTime.now()) + .toMinutes() + < 1); + + if (isTransition) { + log.debug("Job is transition: {}->{}", jobInstance.getId(), jobInstance.getName()); + return false; + } + boolean isDone = (JobStatus.isDone(jobInstance.getStatus())) || (TimeUtil.localDateTimeToLong(jobInstance.getFinishTime()) > 0 && Duration.between(jobInstance.getFinishTime(), LocalDateTime.now()) diff --git a/dinky-common/src/main/java/org/dinky/data/enums/JobStatus.java b/dinky-common/src/main/java/org/dinky/data/enums/JobStatus.java index ea062204c8..7445d7d6c4 100644 --- a/dinky-common/src/main/java/org/dinky/data/enums/JobStatus.java +++ b/dinky-common/src/main/java/org/dinky/data/enums/JobStatus.java @@ -72,6 +72,11 @@ public enum JobStatus { /** The job is currently reconciling and waits for task execution report to recover state. */ RECONCILING("RECONCILING"), + /** + * The job is reconnecting. + */ + RECONNECTING("RECONNECTING"), + /** The job can't get any info. */ UNKNOWN("UNKNOWN"); @@ -97,6 +102,17 @@ public static boolean isDone(String value) { case FAILED: case CANCELED: case FINISHED: + case UNKNOWN: + return true; + default: + return false; + } + } + + public static boolean isTransition(String value) { + switch (get(value)) { + case FAILED: + case RECONNECTING: return true; default: return false; @@ -108,6 +124,7 @@ public boolean isDone() { case FAILED: case CANCELED: case FINISHED: + case UNKNOWN: return true; default: return false; @@ -117,4 +134,8 @@ public boolean isDone() { public static List getAllDoneStatus() { return Lists.newArrayList(FAILED, CANCELED, FINISHED, UNKNOWN); } + + public boolean equalVal(String value) { + return this.equals(get(value)); + } } diff --git a/dinky-flink/dinky-flink-1.16/pom.xml b/dinky-flink/dinky-flink-1.16/pom.xml index 20fa9ef8de..d2d1eb0409 100644 --- a/dinky-flink/dinky-flink-1.16/pom.xml +++ b/dinky-flink/dinky-flink-1.16/pom.xml @@ -22,11 +22,11 @@ - + org.apache.flink diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java b/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java index 9b7c2db1a3..85ef3575cf 100644 --- a/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java +++ b/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java @@ -65,11 +65,6 @@ import cn.hutool.core.io.FileUtil; -/** - * YarnSubmiter - * - * @since 2021/10/29 - */ public abstract class YarnGateway extends AbstractGateway { public static final String HADOOP_CONFIG = "fs.hdfs.hadoopconf"; @@ -254,14 +249,17 @@ public JobStatus getJobStatusById(String id) { case KILLED: return JobStatus.CANCELED; case SUBMITTED: + case ACCEPTED: + case NEW: + case NEW_SAVING: return JobStatus.CREATED; default: - return JobStatus.INITIALIZING; + return JobStatus.UNKNOWN; } } catch (YarnException | IOException e) { logger.error(e.getMessage()); + return JobStatus.UNKNOWN; } - return JobStatus.UNKNOWN; } @Override diff --git a/dinky-web/src/components/FlinkDag/component/DagDataNode.tsx b/dinky-web/src/components/FlinkDag/component/DagDataNode.tsx index 21275807b9..51ed6a7fff 100644 --- a/dinky-web/src/components/FlinkDag/component/DagDataNode.tsx +++ b/dinky-web/src/components/FlinkDag/component/DagDataNode.tsx @@ -98,7 +98,7 @@ const DagDataNode = (props: any) => { {' '} {l('devops.baseinfo.busy')}: - {renderRatio(backpressure?backpressure.subtasks[0]?.busyRatio:0, false)} + {renderRatio((backpressure && backpressure.subtasks)?backpressure.subtasks[0]?.busyRatio:0, false)} @@ -113,7 +113,7 @@ const DagDataNode = (props: any) => { {l('devops.baseinfo.idle')}: - {renderRatio(backpressure?backpressure.subtasks[0]?.idleRatio:0, true)} + {renderRatio((backpressure && backpressure.subtasks)?backpressure.subtasks[0]?.idleRatio:0, true)} diff --git a/dinky-web/src/components/JobTags/StatusTag.tsx b/dinky-web/src/components/JobTags/StatusTag.tsx index 5e56b63597..a5debf5b72 100644 --- a/dinky-web/src/components/JobTags/StatusTag.tsx +++ b/dinky-web/src/components/JobTags/StatusTag.tsx @@ -76,6 +76,12 @@ const StatusTag = (props: StatusTagProps) => { color: 'default', text: 'CREATED' }; + case JOB_STATUS.RECONNECTING: + return { + icon: , + color: 'default', + text: 'RECONNECTING' + }; case JOB_STATUS.UNKNOWN: return { icon: , diff --git a/dinky-web/src/pages/DevOps/constants.tsx b/dinky-web/src/pages/DevOps/constants.tsx index a7475b4d34..fede6d8b6b 100644 --- a/dinky-web/src/pages/DevOps/constants.tsx +++ b/dinky-web/src/pages/DevOps/constants.tsx @@ -36,6 +36,7 @@ export enum JOB_STATUS { FAILING = 'FAILING', SUSPENDED = 'SUSPENDED', CANCELLING = 'CANCELLING', + RECONNECTING = 'RECONNECTING', UNKNOWN = 'UNKNOWN' }