From 16835aa399ff43a44445154e3b871c249465bae2 Mon Sep 17 00:00:00 2001
From: zhu-mingye <934230207@qq.com>
Date: Tue, 10 Oct 2023 09:26:15 +0800
Subject: [PATCH] refactor lineage (#2369)
* support_lineage
* fix dep
---
.../java/org/dinky/data/dto/JobDataDto.java | 6 +-
.../dinky/job/handler/JobRefreshHandler.java | 10 +-
dinky-admin/src/main/resources/db/db-h2.sql | 2 +-
.../org/dinky/alert/Rules/ExceptionRule.java | 13 +-
dinky-common/pom.xml | 5 +-
.../FlinkJobNodeBackPressure.java | 2 +-
.../data}/flink/config/ExecutionConfig.java | 2 +-
.../flink/config/FlinkJobConfigInfo.java | 2 +-
.../exceptions/FlinkJobExceptionsDetail.java | 2 +-
.../data}/flink/job/FlinkJobDetailInfo.java | 2 +-
.../dinky/data}/flink/job/FlinkJobPlan.java | 2 +-
.../data}/flink/job/FlinkJobPlanNode.java | 6 +-
.../flink/job/FlinkJobPlanNodeInput.java | 2 +-
.../dinky/data}/flink/job/FlinkJobVertex.java | 2 +-
.../watermark/FlinkJobNodeWaterMark.java | 2 +-
dinky-web/package.json | 2 +
.../components/CallBackButton/CircleBtn.tsx | 4 +-
.../FlinkDag/component/DagDataNode.tsx | 16 +-
dinky-web/src/components/FlinkDag/index.tsx | 435 ++++++++-------
.../src/components/LineageGraph/index.tsx | 353 ++++++++++++
dinky-web/src/global.less | 155 +++++
dinky-web/src/locales/en-US/pages.ts | 13 +-
dinky-web/src/locales/zh-CN/pages.ts | 13 +-
.../BottomContainer/Lineage/index.tsx | 79 +++
.../DataStudio/HeaderContainer/index.tsx | 21 +-
.../LeftContainer/Project/JobTree/index.tsx | 34 +-
.../LeftContainer/Project/index.tsx | 1 -
.../pages/DataStudio/LeftContainer/index.tsx | 18 +-
dinky-web/src/pages/DataStudio/route.tsx | 13 +-
.../CheckPointsTab/components/CkDesc.tsx | 51 +-
.../DevOps/JobDetail/JobLineage/index.tsx | 46 ++
.../JobDetail/JobOverview/JobOverview.tsx | 6 +-
.../src/pages/DevOps/JobDetail/index.tsx | 3 +-
dinky-web/src/pages/Metrics/Job/index.tsx | 528 +++++++++---------
dinky-web/src/services/endpoints.tsx | 4 +
dinky-web/src/types/DevOps/data.d.ts | 25 +
script/sql/dinky-mysql.sql | 2 +-
script/sql/dinky-pg.sql | 2 +-
.../1.0.0-SNAPSHOT_schema/mysql/dinky_dml.sql | 2 +
39 files changed, 1316 insertions(+), 570 deletions(-)
rename {dinky-admin/src/main/java/org/dinky/data/model => dinky-common/src/main/java/org/dinky/data}/flink/backpressure/FlinkJobNodeBackPressure.java (98%)
rename {dinky-admin/src/main/java/org/dinky/data/model => dinky-common/src/main/java/org/dinky/data}/flink/config/ExecutionConfig.java (98%)
rename {dinky-admin/src/main/java/org/dinky/data/model => dinky-common/src/main/java/org/dinky/data}/flink/config/FlinkJobConfigInfo.java (98%)
rename {dinky-admin/src/main/java/org/dinky/data/model => dinky-common/src/main/java/org/dinky/data}/flink/exceptions/FlinkJobExceptionsDetail.java (98%)
rename {dinky-admin/src/main/java/org/dinky/data/model => dinky-common/src/main/java/org/dinky/data}/flink/job/FlinkJobDetailInfo.java (99%)
rename {dinky-admin/src/main/java/org/dinky/data/model => dinky-common/src/main/java/org/dinky/data}/flink/job/FlinkJobPlan.java (97%)
rename {dinky-admin/src/main/java/org/dinky/data/model => dinky-common/src/main/java/org/dinky/data}/flink/job/FlinkJobPlanNode.java (95%)
rename {dinky-admin/src/main/java/org/dinky/data/model => dinky-common/src/main/java/org/dinky/data}/flink/job/FlinkJobPlanNodeInput.java (98%)
rename {dinky-admin/src/main/java/org/dinky/data/model => dinky-common/src/main/java/org/dinky/data}/flink/job/FlinkJobVertex.java (98%)
rename {dinky-admin/src/main/java/org/dinky/data/model => dinky-common/src/main/java/org/dinky/data}/flink/watermark/FlinkJobNodeWaterMark.java (97%)
create mode 100644 dinky-web/src/components/LineageGraph/index.tsx
create mode 100644 dinky-web/src/pages/DataStudio/BottomContainer/Lineage/index.tsx
create mode 100644 dinky-web/src/pages/DevOps/JobDetail/JobLineage/index.tsx
diff --git a/dinky-admin/src/main/java/org/dinky/data/dto/JobDataDto.java b/dinky-admin/src/main/java/org/dinky/data/dto/JobDataDto.java
index 83fa2b8636..c8781d61ca 100644
--- a/dinky-admin/src/main/java/org/dinky/data/dto/JobDataDto.java
+++ b/dinky-admin/src/main/java/org/dinky/data/dto/JobDataDto.java
@@ -19,10 +19,10 @@
package org.dinky.data.dto;
+import org.dinky.data.flink.config.FlinkJobConfigInfo;
+import org.dinky.data.flink.exceptions.FlinkJobExceptionsDetail;
+import org.dinky.data.flink.job.FlinkJobDetailInfo;
import org.dinky.data.model.JobHistory;
-import org.dinky.data.model.flink.config.FlinkJobConfigInfo;
-import org.dinky.data.model.flink.exceptions.FlinkJobExceptionsDetail;
-import org.dinky.data.model.flink.job.FlinkJobDetailInfo;
import org.dinky.utils.JsonUtils;
import java.time.LocalDateTime;
diff --git a/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java b/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java
index 9d422fd252..ea29e096c8 100644
--- a/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java
+++ b/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java
@@ -26,13 +26,13 @@
import org.dinky.data.dto.ClusterConfigurationDTO;
import org.dinky.data.dto.JobDataDto;
import org.dinky.data.enums.JobStatus;
+import org.dinky.data.flink.backpressure.FlinkJobNodeBackPressure;
+import org.dinky.data.flink.config.FlinkJobConfigInfo;
+import org.dinky.data.flink.exceptions.FlinkJobExceptionsDetail;
+import org.dinky.data.flink.job.FlinkJobDetailInfo;
+import org.dinky.data.flink.watermark.FlinkJobNodeWaterMark;
import org.dinky.data.model.JobInfoDetail;
import org.dinky.data.model.JobInstance;
-import org.dinky.data.model.flink.backpressure.FlinkJobNodeBackPressure;
-import org.dinky.data.model.flink.config.FlinkJobConfigInfo;
-import org.dinky.data.model.flink.exceptions.FlinkJobExceptionsDetail;
-import org.dinky.data.model.flink.job.FlinkJobDetailInfo;
-import org.dinky.data.model.flink.watermark.FlinkJobNodeWaterMark;
import org.dinky.gateway.Gateway;
import org.dinky.gateway.config.GatewayConfig;
import org.dinky.gateway.enums.GatewayType;
diff --git a/dinky-admin/src/main/resources/db/db-h2.sql b/dinky-admin/src/main/resources/db/db-h2.sql
index 03811fa533..7685d64b98 100644
--- a/dinky-admin/src/main/resources/db/db-h2.sql
+++ b/dinky-admin/src/main/resources/db/db-h2.sql
@@ -1338,7 +1338,7 @@ CREATE TABLE `dinky_task` (
`alert_group_id` bigint(20) null DEFAULT null COMMENT 'alert group id',
`config_json` text null COMMENT 'configuration json',
`note` varchar(255) null DEFAULT null COMMENT 'Job Note',
- `step` int(11) null DEFAULT null COMMENT 'Job lifecycle',
+ `step` int(11) null DEFAULT 1 COMMENT 'Job lifecycle',
`job_instance_id` bigint(20) null DEFAULT null COMMENT 'job instance id',
`enabled` tinyint(1) NOT null DEFAULT 1 COMMENT 'is enable',
`create_time` datetime(0) null DEFAULT null COMMENT 'create time',
diff --git a/dinky-alert/dinky-alert-base/src/main/java/org/dinky/alert/Rules/ExceptionRule.java b/dinky-alert/dinky-alert-base/src/main/java/org/dinky/alert/Rules/ExceptionRule.java
index c778f7123a..01ccc0606f 100644
--- a/dinky-alert/dinky-alert-base/src/main/java/org/dinky/alert/Rules/ExceptionRule.java
+++ b/dinky-alert/dinky-alert-base/src/main/java/org/dinky/alert/Rules/ExceptionRule.java
@@ -19,9 +19,10 @@
package org.dinky.alert.Rules;
+import org.dinky.data.flink.exceptions.FlinkJobExceptionsDetail;
+
import java.util.concurrent.TimeUnit;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
@@ -43,20 +44,20 @@ public ExceptionRule() {
* @param exceptions The exceptions object containing relevant data.
* @return True if the operation should be executed, false otherwise.
*/
- public Boolean isException(Integer key, ObjectNode exceptions) {
+ public Boolean isException(Integer key, FlinkJobExceptionsDetail exceptions) {
// If the exception is the same as the previous one, it will not be reported again
- if (exceptions.get("timestamp") == null) {
+ if (exceptions.getTimestamp() == null) {
return false;
}
- long timestamp = exceptions.get("timestamp").asLong(0);
+ long timestamp = exceptions.getTimestamp();
Long hisTimeIfPresent = hisTime.getIfPresent(key);
if (hisTimeIfPresent != null && hisTimeIfPresent == timestamp) {
return false;
}
hisTime.put(key, timestamp);
- if (exceptions.has("root-exception")) {
- return !exceptions.get("root-exception").isNull();
+ if (exceptions.getRootException() != null) {
+ return !exceptions.getRootException().isEmpty();
} else {
return false;
}
diff --git a/dinky-common/pom.xml b/dinky-common/pom.xml
index 828ba1bf25..7630e97744 100644
--- a/dinky-common/pom.xml
+++ b/dinky-common/pom.xml
@@ -51,7 +51,10 @@
cn.hutool
hutool-all
-
+
+ com.alibaba.fastjson2
+ fastjson2
+
com.github.docker-java
docker-java-core
diff --git a/dinky-admin/src/main/java/org/dinky/data/model/flink/backpressure/FlinkJobNodeBackPressure.java b/dinky-common/src/main/java/org/dinky/data/flink/backpressure/FlinkJobNodeBackPressure.java
similarity index 98%
rename from dinky-admin/src/main/java/org/dinky/data/model/flink/backpressure/FlinkJobNodeBackPressure.java
rename to dinky-common/src/main/java/org/dinky/data/flink/backpressure/FlinkJobNodeBackPressure.java
index de8c631700..14b8db13c0 100644
--- a/dinky-admin/src/main/java/org/dinky/data/model/flink/backpressure/FlinkJobNodeBackPressure.java
+++ b/dinky-common/src/main/java/org/dinky/data/flink/backpressure/FlinkJobNodeBackPressure.java
@@ -17,7 +17,7 @@
*
*/
-package org.dinky.data.model.flink.backpressure;
+package org.dinky.data.flink.backpressure;
import java.io.Serializable;
import java.util.List;
diff --git a/dinky-admin/src/main/java/org/dinky/data/model/flink/config/ExecutionConfig.java b/dinky-common/src/main/java/org/dinky/data/flink/config/ExecutionConfig.java
similarity index 98%
rename from dinky-admin/src/main/java/org/dinky/data/model/flink/config/ExecutionConfig.java
rename to dinky-common/src/main/java/org/dinky/data/flink/config/ExecutionConfig.java
index 3de8314956..9a10677855 100644
--- a/dinky-admin/src/main/java/org/dinky/data/model/flink/config/ExecutionConfig.java
+++ b/dinky-common/src/main/java/org/dinky/data/flink/config/ExecutionConfig.java
@@ -17,7 +17,7 @@
*
*/
-package org.dinky.data.model.flink.config;
+package org.dinky.data.flink.config;
import java.io.Serializable;
diff --git a/dinky-admin/src/main/java/org/dinky/data/model/flink/config/FlinkJobConfigInfo.java b/dinky-common/src/main/java/org/dinky/data/flink/config/FlinkJobConfigInfo.java
similarity index 98%
rename from dinky-admin/src/main/java/org/dinky/data/model/flink/config/FlinkJobConfigInfo.java
rename to dinky-common/src/main/java/org/dinky/data/flink/config/FlinkJobConfigInfo.java
index 0c4c0452e2..2f06f9841e 100644
--- a/dinky-admin/src/main/java/org/dinky/data/model/flink/config/FlinkJobConfigInfo.java
+++ b/dinky-common/src/main/java/org/dinky/data/flink/config/FlinkJobConfigInfo.java
@@ -17,7 +17,7 @@
*
*/
-package org.dinky.data.model.flink.config;
+package org.dinky.data.flink.config;
import java.io.Serializable;
diff --git a/dinky-admin/src/main/java/org/dinky/data/model/flink/exceptions/FlinkJobExceptionsDetail.java b/dinky-common/src/main/java/org/dinky/data/flink/exceptions/FlinkJobExceptionsDetail.java
similarity index 98%
rename from dinky-admin/src/main/java/org/dinky/data/model/flink/exceptions/FlinkJobExceptionsDetail.java
rename to dinky-common/src/main/java/org/dinky/data/flink/exceptions/FlinkJobExceptionsDetail.java
index bd06ae2eee..8cacb145f1 100644
--- a/dinky-admin/src/main/java/org/dinky/data/model/flink/exceptions/FlinkJobExceptionsDetail.java
+++ b/dinky-common/src/main/java/org/dinky/data/flink/exceptions/FlinkJobExceptionsDetail.java
@@ -17,7 +17,7 @@
*
*/
-package org.dinky.data.model.flink.exceptions;
+package org.dinky.data.flink.exceptions;
import java.io.Serializable;
import java.util.List;
diff --git a/dinky-admin/src/main/java/org/dinky/data/model/flink/job/FlinkJobDetailInfo.java b/dinky-common/src/main/java/org/dinky/data/flink/job/FlinkJobDetailInfo.java
similarity index 99%
rename from dinky-admin/src/main/java/org/dinky/data/model/flink/job/FlinkJobDetailInfo.java
rename to dinky-common/src/main/java/org/dinky/data/flink/job/FlinkJobDetailInfo.java
index 24299a6fd5..a4e80e9f89 100644
--- a/dinky-admin/src/main/java/org/dinky/data/model/flink/job/FlinkJobDetailInfo.java
+++ b/dinky-common/src/main/java/org/dinky/data/flink/job/FlinkJobDetailInfo.java
@@ -17,7 +17,7 @@
*
*/
-package org.dinky.data.model.flink.job;
+package org.dinky.data.flink.job;
import java.io.Serializable;
import java.util.List;
diff --git a/dinky-admin/src/main/java/org/dinky/data/model/flink/job/FlinkJobPlan.java b/dinky-common/src/main/java/org/dinky/data/flink/job/FlinkJobPlan.java
similarity index 97%
rename from dinky-admin/src/main/java/org/dinky/data/model/flink/job/FlinkJobPlan.java
rename to dinky-common/src/main/java/org/dinky/data/flink/job/FlinkJobPlan.java
index 14dced6534..3d4bcd0828 100644
--- a/dinky-admin/src/main/java/org/dinky/data/model/flink/job/FlinkJobPlan.java
+++ b/dinky-common/src/main/java/org/dinky/data/flink/job/FlinkJobPlan.java
@@ -17,7 +17,7 @@
*
*/
-package org.dinky.data.model.flink.job;
+package org.dinky.data.flink.job;
import java.io.Serializable;
import java.util.List;
diff --git a/dinky-admin/src/main/java/org/dinky/data/model/flink/job/FlinkJobPlanNode.java b/dinky-common/src/main/java/org/dinky/data/flink/job/FlinkJobPlanNode.java
similarity index 95%
rename from dinky-admin/src/main/java/org/dinky/data/model/flink/job/FlinkJobPlanNode.java
rename to dinky-common/src/main/java/org/dinky/data/flink/job/FlinkJobPlanNode.java
index e64ea860f9..994e06c7ea 100644
--- a/dinky-admin/src/main/java/org/dinky/data/model/flink/job/FlinkJobPlanNode.java
+++ b/dinky-common/src/main/java/org/dinky/data/flink/job/FlinkJobPlanNode.java
@@ -17,10 +17,10 @@
*
*/
-package org.dinky.data.model.flink.job;
+package org.dinky.data.flink.job;
-import org.dinky.data.model.flink.backpressure.FlinkJobNodeBackPressure;
-import org.dinky.data.model.flink.watermark.FlinkJobNodeWaterMark;
+import org.dinky.data.flink.backpressure.FlinkJobNodeBackPressure;
+import org.dinky.data.flink.watermark.FlinkJobNodeWaterMark;
import java.io.Serializable;
import java.util.List;
diff --git a/dinky-admin/src/main/java/org/dinky/data/model/flink/job/FlinkJobPlanNodeInput.java b/dinky-common/src/main/java/org/dinky/data/flink/job/FlinkJobPlanNodeInput.java
similarity index 98%
rename from dinky-admin/src/main/java/org/dinky/data/model/flink/job/FlinkJobPlanNodeInput.java
rename to dinky-common/src/main/java/org/dinky/data/flink/job/FlinkJobPlanNodeInput.java
index 4d65550c9c..71244a0202 100644
--- a/dinky-admin/src/main/java/org/dinky/data/model/flink/job/FlinkJobPlanNodeInput.java
+++ b/dinky-common/src/main/java/org/dinky/data/flink/job/FlinkJobPlanNodeInput.java
@@ -17,7 +17,7 @@
*
*/
-package org.dinky.data.model.flink.job;
+package org.dinky.data.flink.job;
import java.io.Serializable;
diff --git a/dinky-admin/src/main/java/org/dinky/data/model/flink/job/FlinkJobVertex.java b/dinky-common/src/main/java/org/dinky/data/flink/job/FlinkJobVertex.java
similarity index 98%
rename from dinky-admin/src/main/java/org/dinky/data/model/flink/job/FlinkJobVertex.java
rename to dinky-common/src/main/java/org/dinky/data/flink/job/FlinkJobVertex.java
index 4115059eb8..c9469674a2 100644
--- a/dinky-admin/src/main/java/org/dinky/data/model/flink/job/FlinkJobVertex.java
+++ b/dinky-common/src/main/java/org/dinky/data/flink/job/FlinkJobVertex.java
@@ -17,7 +17,7 @@
*
*/
-package org.dinky.data.model.flink.job;
+package org.dinky.data.flink.job;
import java.io.Serializable;
import java.util.Map;
diff --git a/dinky-admin/src/main/java/org/dinky/data/model/flink/watermark/FlinkJobNodeWaterMark.java b/dinky-common/src/main/java/org/dinky/data/flink/watermark/FlinkJobNodeWaterMark.java
similarity index 97%
rename from dinky-admin/src/main/java/org/dinky/data/model/flink/watermark/FlinkJobNodeWaterMark.java
rename to dinky-common/src/main/java/org/dinky/data/flink/watermark/FlinkJobNodeWaterMark.java
index a9a7b6bf37..be03e7c263 100644
--- a/dinky-admin/src/main/java/org/dinky/data/model/flink/watermark/FlinkJobNodeWaterMark.java
+++ b/dinky-common/src/main/java/org/dinky/data/flink/watermark/FlinkJobNodeWaterMark.java
@@ -17,7 +17,7 @@
*
*/
-package org.dinky.data.model.flink.watermark;
+package org.dinky.data.flink.watermark;
import java.io.Serializable;
diff --git a/dinky-web/package.json b/dinky-web/package.json
index 9dcf39ecba..4115635c05 100644
--- a/dinky-web/package.json
+++ b/dinky-web/package.json
@@ -47,6 +47,7 @@
"@monaco-editor/react": "^4.5.2",
"@umijs/route-utils": "^4.0.1",
"antd": "^5.9.3",
+ "butterfly-dag": "^4.3.28",
"classnames": "^2.3.2",
"dayjs": "^1.11.10",
"js-cookie": "^3.0.5",
@@ -62,6 +63,7 @@
"react-countup": "^6.4.2",
"react-dom": "^18.0.0",
"react-helmet-async": "^1.3.0",
+ "react-lineage-dag": "^2.0.36",
"react-markdown": "^8.0.7",
"react-spring": "^9.7.3",
"react-use-cookie": "^1.4.0",
diff --git a/dinky-web/src/components/CallBackButton/CircleBtn.tsx b/dinky-web/src/components/CallBackButton/CircleBtn.tsx
index 3435ddca66..ab383b6a55 100644
--- a/dinky-web/src/components/CallBackButton/CircleBtn.tsx
+++ b/dinky-web/src/components/CallBackButton/CircleBtn.tsx
@@ -16,9 +16,9 @@
*
*/
+import { TabsItemType } from '@/pages/DataStudio/model';
import { Button } from 'antd';
import React from 'react';
-import {TabsItemType} from "@/pages/DataStudio/model";
export type CircleButtonProps = {
icon: React.ReactNode;
@@ -30,7 +30,7 @@ export type CircleButtonProps = {
export type CircleDataStudioButtonProps = {
icon: React.ReactNode;
loading?: boolean;
- onClick?: (panes:TabsItemType[],activeKey:string) => void;
+ onClick?: (panes: TabsItemType[], activeKey: string) => void;
title?: string;
key?: string;
};
diff --git a/dinky-web/src/components/FlinkDag/component/DagDataNode.tsx b/dinky-web/src/components/FlinkDag/component/DagDataNode.tsx
index 51ed6a7fff..3f181ec9da 100644
--- a/dinky-web/src/components/FlinkDag/component/DagDataNode.tsx
+++ b/dinky-web/src/components/FlinkDag/component/DagDataNode.tsx
@@ -98,13 +98,20 @@ const DagDataNode = (props: any) => {
{' '}
{l('devops.baseinfo.busy')}:
- {renderRatio((backpressure && backpressure.subtasks)?backpressure.subtasks[0]?.busyRatio:0, false)}
+ {renderRatio(
+ backpressure && backpressure.subtasks ? backpressure.subtasks[0]?.busyRatio : 0,
+ false
+ )}
{l('devops.baseinfo.backpressure')}:
-
+
@@ -113,7 +120,10 @@ const DagDataNode = (props: any) => {
{l('devops.baseinfo.idle')}:
- {renderRatio((backpressure && backpressure.subtasks)?backpressure.subtasks[0]?.idleRatio:0, true)}
+ {renderRatio(
+ backpressure && backpressure.subtasks ? backpressure.subtasks[0]?.idleRatio : 0,
+ true
+ )}
diff --git a/dinky-web/src/components/FlinkDag/index.tsx b/dinky-web/src/components/FlinkDag/index.tsx
index 0e48b2b92d..19e134834f 100644
--- a/dinky-web/src/components/FlinkDag/index.tsx
+++ b/dinky-web/src/components/FlinkDag/index.tsx
@@ -20,74 +20,72 @@
import DagDataNode from '@/components/FlinkDag/component/DagDataNode';
import DagPlanNode from '@/components/FlinkDag/component/DagPlanNode';
import {
- edgeConfig,
- graphConfig,
- layoutConfig,
- portConfig,
- zoomOptions
+ edgeConfig,
+ graphConfig,
+ layoutConfig,
+ portConfig,
+ zoomOptions
} from '@/components/FlinkDag/config';
-import {buildDag, regConnect, updateDag} from '@/components/FlinkDag/functions';
-import {Jobs} from '@/types/DevOps/data';
-import {DagreLayout} from '@antv/layout';
-import {Edge, Graph} from '@antv/x6';
-import {Selection} from '@antv/x6-plugin-selection';
-import {register} from '@antv/x6-react-shape';
-import {Drawer, Select, Slider, Table, Tabs, TabsProps, Typography} from 'antd';
-import {useEffect, useRef, useState} from 'react';
+import { buildDag, regConnect, updateDag } from '@/components/FlinkDag/functions';
+import { getData } from '@/services/api';
+import { API_CONSTANTS } from '@/services/endpoints';
+import { Jobs } from '@/types/DevOps/data';
+import { DagreLayout } from '@antv/layout';
+import { Edge, Graph } from '@antv/x6';
+import { Rectangle } from '@antv/x6-geometry';
+import { Selection } from '@antv/x6-plugin-selection';
+import { register } from '@antv/x6-react-shape';
+import { Drawer, Select, Slider, Table, Tabs, TabsProps, Typography } from 'antd';
+import { useEffect, useRef, useState } from 'react';
import './index.css';
-import {getData} from "@/services/api";
-import {API_CONSTANTS} from "@/services/endpoints";
-import {Rectangle} from "@antv/x6-geometry";
-import {Options as GraphOptions} from "@antv/x6/src/graph/options";
-import path from "path";
export type DagProps = {
- job: Jobs.Job;
- onlyPlan?: boolean;
- checkPoints?: any;
+ job: Jobs.Job;
+ onlyPlan?: boolean;
+ checkPoints?: any;
};
-const {Paragraph} = Typography;
+const { Paragraph } = Typography;
const FlinkDag = (props: DagProps) => {
- const container = useRef(null);
-
- const {job, onlyPlan = false, checkPoints = {}} = props;
-
- const [graph, setGraph] = useState();
- const [currentJob, setCurrentJob] = useState();
- const [currentSelect, setCurrentSelect] = useState();
- const [open, setOpen] = useState(false);
- const [zoom, setZoom] = useState(1);
- let originPosition = {
- zoom: 1
- };
-
- const handleClose = () => {
- setOpen(false);
- setCurrentSelect(undefined);
- graph?.zoomToFit(zoomOptions);
- graph?.centerContent();
- };
-
- const initListen = (graph: Graph) => {
- graph.on('node:selected', ({cell}) => {
- if (!onlyPlan) {
- setOpen(true);
- setZoom(oldValue => {
- originPosition = {zoom: oldValue}
- return 1;
- })
- graph.zoomTo(1)
- setCurrentSelect(cell);
- graph.positionPoint(Rectangle.create(cell.getBBox()).getLeftMiddle(), '10%', '50%');
- }
- });
+ const container = useRef(null);
+
+ const { job, onlyPlan = false, checkPoints = {} } = props;
+
+ const [graph, setGraph] = useState();
+ const [currentJob, setCurrentJob] = useState();
+ const [currentSelect, setCurrentSelect] = useState();
+ const [open, setOpen] = useState(false);
+ const [zoom, setZoom] = useState(1);
+ let originPosition = {
+ zoom: 1
+ };
- graph.on('node:unselected', ({cell}) => {
- setZoom(originPosition.zoom)
- handleClose();
+ const handleClose = () => {
+ setOpen(false);
+ setCurrentSelect(undefined);
+ graph?.zoomToFit(zoomOptions);
+ graph?.centerContent();
+ };
+
+ const initListen = (graph: Graph) => {
+ graph.on('node:selected', ({ cell }) => {
+ if (!onlyPlan) {
+ setOpen(true);
+ setZoom((oldValue) => {
+ originPosition = { zoom: oldValue };
+ return 1;
});
- };
+ graph.zoomTo(1);
+ setCurrentSelect(cell);
+ graph.positionPoint(Rectangle.create(cell.getBBox()).getLeftMiddle(), '10%', '50%');
+ }
+ });
+
+ graph.on('node:unselected', ({ cell }) => {
+ setZoom(originPosition.zoom);
+ handleClose();
+ });
+ };
const initGraph = (flinkData: any) => {
register({
@@ -98,156 +96,193 @@ const FlinkDag = (props: DagProps) => {
ports: portConfig
});
- Edge.config(edgeConfig);
- Graph.registerConnector('curveConnector', regConnect, true);
- Graph.registerEdge('data-processing-curve', Edge, true);
+ Edge.config(edgeConfig);
+ Graph.registerConnector('curveConnector', regConnect, true);
+ Graph.registerEdge('data-processing-curve', Edge, true);
- const graph: Graph = new Graph({
- // @ts-ignore
- container: container.current,
- ...graphConfig
- });
+ const graph: Graph = new Graph({
+ // @ts-ignore
+ container: container.current,
+ ...graphConfig
+ });
- graph.use(
- new Selection({
- enabled: true,
- multiple: false,
- rubberband: false,
- showNodeSelectionBox: true
- })
- );
-
- // Adaptive layout
- const model = new DagreLayout(layoutConfig).layout(flinkData);
- graph.fromJSON(model);
-
- // Automatically zoom to fit
- graph.zoomToFit(zoomOptions);
- graph.on('scale', ({sx}) => setZoom(sx));
- graph.centerContent();
- graph?.zoomTo(zoom)
- updateDag(job?.vertices, graph);
- initListen(graph);
- return graph;
- };
+ graph.use(
+ new Selection({
+ enabled: true,
+ multiple: false,
+ rubberband: false,
+ showNodeSelectionBox: true
+ })
+ );
- useEffect(() => {
- const flinkData = buildDag(job?.plan);
- // Clean up old data
- if (graph) {
- graph.clearCells();
- }
- setGraph(initGraph(flinkData));
- setZoom(1 / flinkData.nodes.length + 0.5)
- }, [currentJob]);
+ // Adaptive layout
+ const model = new DagreLayout(layoutConfig).layout(flinkData);
+ graph.fromJSON(model);
- useEffect(() => {
- updateDag(job?.vertices, graph);
- if (currentJob != job?.jid) {
- setCurrentJob(job?.jid);
- }
- }, [job]);
+ // Automatically zoom to fit
+ graph.zoomToFit(zoomOptions);
+ graph.on('scale', ({ sx }) => setZoom(sx));
+ graph.centerContent();
+ graph?.zoomTo(zoom);
+ updateDag(job?.vertices, graph);
+ initListen(graph);
+ return graph;
+ };
+
+ useEffect(() => {
+ const flinkData = buildDag(job?.plan);
+ // Clean up old data
+ if (graph) {
+ graph.clearCells();
+ }
+ setGraph(initGraph(flinkData));
+ setZoom(1 / flinkData.nodes.length + 0.5);
+ }, [currentJob]);
+
+ useEffect(() => {
+ updateDag(job?.vertices, graph);
+ if (currentJob != job?.jid) {
+ setCurrentJob(job?.jid);
+ }
+ }, [job]);
+ useEffect(() => {
+ graph?.zoomTo(zoom);
+ }, [zoom]);
+
+ const renderCheckpoint = (id: string) => {
+ const [selectPath, setSelectPath] = useState('');
+ const key = id + selectPath;
+ const [itemChildren, setItemChildren] = useState({ [key]: [] as TabsProps['items'] });
+ const checkpointArray = ((checkPoints.history ?? []) as any[])
+ .filter((x) => x.status === 'COMPLETED')
+ .map((x) => {
+ return { checkpointType: x.checkpoint_type, path: x.external_path, id: x.id };
+ });
useEffect(() => {
- graph?.zoomTo(zoom)
- }, [zoom]);
-
- const renderCheckpoint = (id: string) => {
- const [selectPath, setSelectPath] = useState('');
- const key = id + selectPath;
- const [itemChildren, setItemChildren] = useState({[key]: [] as TabsProps['items']});
- const checkpointArray = ((checkPoints.history?? []) as any[]).filter(x => x.status === "COMPLETED").map(x => {
- return {checkpointType: x.checkpoint_type, path: x.external_path, id: x.id}
- });
- useEffect(() => {
- if (selectPath && id) {
- if (!itemChildren[key]) {
- getData(API_CONSTANTS.READ_CHECKPOINT, {path: selectPath, operatorId: id}).then(res => {
- const genData = Object.keys(res.datas).map(x => {
- const datum = res.datas[x];
- return {
- key: x,
- label: x,
- children:
- {
- return {
- key: y,
- label: y,
- children: {
- return {
- title: z,
- dataIndex: z,
- key: z,
- render: (text) => {text},
- }
- })}/>
- }
- })
- } tabBarStyle={{marginBlock: 0}} tabBarGutter={10}/>
- }
- })
- setItemChildren({...itemChildren, [key]: genData})
- })
- }
+ if (selectPath && id) {
+ if (!itemChildren[key]) {
+ getData(API_CONSTANTS.READ_CHECKPOINT, { path: selectPath, operatorId: id }).then(
+ (res) => {
+ const genData = Object.keys(res.datas).map((x) => {
+ const datum = res.datas[x];
+ return {
+ key: x,
+ label: x,
+ children: (
+ {
+ return {
+ key: y,
+ label: y,
+ children: (
+ {
+ return {
+ title: z,
+ dataIndex: z,
+ key: z,
+ render: (text) => (
+
+ {text}
+
+ )
+ };
+ })}
+ />
+ )
+ };
+ })}
+ tabBarStyle={{ marginBlock: 0 }}
+ tabBarGutter={10}
+ />
+ )
+ };
+ });
+ setItemChildren({ ...itemChildren, [key]: genData });
}
- }, [selectPath, id])
-
- return <>
-