Skip to content

Commit

Permalink
[Feature][Web] Add Flink Job Sql Dag (DataLinkDC#2340)
Browse files Browse the repository at this point in the history
* delete unuse log

* refactor tag

* add job Graph Dag

* change getJob to refeshJob
  • Loading branch information
gaoyan1998 authored Sep 25, 2023
1 parent 6575066 commit 845e4c7
Show file tree
Hide file tree
Showing 20 changed files with 271 additions and 352 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,30 @@
*/

import StatusTag from '@/components/JobTags/StatusTag';
import { Jobs } from '@/types/DevOps/data';
import { Card, Col, Row, Tag, Typography } from 'antd';

const { Text } = Typography;
const { Text, Paragraph } = Typography;

const DagNode = (props: any) => {
const DagDataNode = (props: any) => {
const { node } = props;
const vertices: Jobs.JobVertices = node?.getData();
const data: any = node?.getData();

return (
<Card
style={{ width: '250px' }}
style={{ width: '250px', padding: 0 }}
bordered={false}
size={'small'}
type={'inner'}
hoverable={true}
title={vertices.name}
extra={<Text keyboard>{vertices.parallelism}</Text>}
title={data.name}
extra={<Text keyboard>{data.parallelism}</Text>}
>
<Paragraph ellipsis={{ tooltip: data.description }}>
<blockquote style={{ margin: 0 }}>
<Text type='secondary'>{data.description}</Text>
</blockquote>
</Paragraph>

<Row>
<Col span={15}>
<Text type='secondary'>BackPressure:</Text>
Expand All @@ -51,7 +56,7 @@ const DagNode = (props: any) => {
<Row>
<Col span={15}>
<Text type='secondary'>Status:</Text>
<StatusTag status={vertices.status} bordered={false} animation={false} />
<StatusTag status={data.status} bordered={false} animation={false} />
</Col>
<Col span={8}>
<Text type='secondary'>Idle:50%</Text>
Expand All @@ -60,4 +65,4 @@ const DagNode = (props: any) => {
</Card>
);
};
export default DagNode;
export default DagDataNode;
46 changes: 46 additions & 0 deletions dinky-web/src/components/FlinkDag/component/DagPlanNode.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

import { Card, Typography } from 'antd';

const { Text, Paragraph } = Typography;

const DagPlanNode = (props: any) => {
const { node } = props;
const data: any = node?.getData();

return (
<Card
style={{ width: '250px', padding: 0 }}
bordered={false}
size={'small'}
type={'inner'}
hoverable={true}
title={data.description}
extra={<Text keyboard>{data.parallelism}</Text>}
>
<Paragraph ellipsis={{ tooltip: data.description, rows: 3 }}>
<blockquote>
<Text>{data.description}</Text>
</blockquote>
</Paragraph>
</Card>
);
};
export default DagPlanNode;
111 changes: 43 additions & 68 deletions dinky-web/src/components/FlinkDag/functions.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,94 +17,69 @@
*
*/

import { JOB_STATUS } from '@/pages/DevOps/constants';
import { Jobs } from '@/types/DevOps/data';
import { Graph, Path } from '@antv/x6';

export const buildEdge = (job: Jobs.Job) => {
export const buildDag = (job: Jobs.JobPlan) => {
const edges: any = [];
const nodes: any = [];

for (let node of job.plan.nodes) {
if (node.inputs) {
const sources = node.inputs;
for (let i = 0; i < sources.length; i++) {
const plan_node = sources[i];
edges.push({
attrs: {
line: {
strokeDasharray: '5',
stroke: '#3471F9',
style: {
animation: 'running-line 30s infinite linear'
}
},
label: {
text: plan_node.ship_strategy
}
},
label: plan_node.ship_strategy,
id: `${plan_node.id}-${i}`,
shape: 'data-processing-curve',
zIndex: -1,
source: {
cell: `${plan_node.id}`,
port: `${plan_node.id}-out`
},
target: {
cell: `${node.id}`,
port: `${node.id}-in`
},
data: node
});
}
}
}
return edges;
};
if (!job) return { nodes: nodes, edges: edges };

export const buildNode = (job: Jobs.Job) => {
const nodes: any = {};

for (let vertice of job.vertices) {
nodes[vertice.id] = {
id: vertice.id,
job.nodes.forEach((node) => {
nodes.push({
id: node.id,
shape: 'data-processing-dag-node',
ports: [
{
id: `${vertice.id}-in`,
id: `${node.id}-in`,
group: 'in'
},
{
id: `${vertice.id}-out`,
id: `${node.id}-out`,
group: 'out'
}
],
data: vertice
};
}
return nodes;
};
data: node
});

node.inputs?.forEach((plan_node) => {
edges.push({
label: plan_node.ship_strategy,
id: `${node.id}-${plan_node.num}`,
shape: 'data-processing-curve',
zIndex: -1,
source: {
cell: `${plan_node.id}`,
port: `${plan_node.id}-out`
},
target: {
cell: `${node.id}`,
port: `${node.id}-in`
},
data: node
});
});
});

export const buildData = (job: Jobs.Job) => {
const nodes = Object.values(buildNode(job));
return { nodes: nodes, edges: buildEdge(job) };
return { nodes: nodes, edges: edges };
};

export const updateEdge = (job: Jobs.Job, graph?: Graph) => {
if (graph) {
const nodes = buildNode(job);
export const updateDag = (job: Jobs.JobVertices[], graph?: Graph) => {
if (!job || !graph) return;

graph.getCells().forEach((node) => {
const data = node.getData();
if (nodes[data.id]) {
console.log(nodes[data.id]);
node.setData(nodes[data.id].data);
if (job && graph) {
job.forEach((vertice) => {
const node = graph.getCellById(vertice.id);
if (node) {
node.setData({ ...node.getData(), ...vertice });
}
});

graph.getEdges().forEach((edge) => {
const node = edge.getSourceNode()?.getData();

if (node.status == 'RUNNING') {
const nodeData = edge.getSourceNode()?.getData();
if (nodeData.status == JOB_STATUS.RUNNING) {
edge.attr({ line: { stroke: '#3471F9' } });
edge.attr('line/strokeDasharray', 5);
edge.attr('line/strokeWidth', 2);
Expand All @@ -113,11 +88,11 @@ export const updateEdge = (job: Jobs.Job, graph?: Graph) => {
edge.attr('line/strokeDasharray', 0);
edge.attr('line/style/animation', '');
edge.attr('line/strokeWidth', 1);
if (node.status == 'FINISHED') {
if (nodeData.status == JOB_STATUS.FINISHED) {
edge.attr('line/stroke', '#52c41a');
} else if (node.status == 'CANCELED') {
} else if (nodeData.status == JOB_STATUS.CANCELED) {
edge.attr('line/stroke', '#ffe7ba');
} else if (node.status == 'FAILED') {
} else if (nodeData.status == JOB_STATUS.FAILED) {
edge.attr('line/stroke', '#ff4d4f');
} else {
edge.attr('line/stroke', '#bfbfbf');
Expand Down
23 changes: 16 additions & 7 deletions dinky-web/src/components/FlinkDag/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,27 @@
*
*/

import DagNode from '@/components/FlinkDag/component/DagNode';
import DagDataNode from '@/components/FlinkDag/component/DagDataNode';
import DagPlanNode from '@/components/FlinkDag/component/DagPlanNode';
import { edgeConfig, graphConfig, layoutConfig, portConfig } from '@/components/FlinkDag/config';
import { buildData, regConnect, updateEdge } from '@/components/FlinkDag/functions';
import { buildDag, regConnect, updateDag } from '@/components/FlinkDag/functions';
import { Jobs } from '@/types/DevOps/data';
import { DagreLayout } from '@antv/layout';
import { Edge, Graph } from '@antv/x6';
import { register } from '@antv/x6-react-shape';
import { useEffect, useRef, useState } from 'react';
import './index.css';

const FlinkDag = (props: { jobDetail: Jobs.JobInfoDetail }) => {
export type DagProps = {
job: Jobs.Job;
onlyPlan?: boolean;
};

const FlinkDag = (props: DagProps) => {
const container = useRef(null);
const job = props.jobDetail.jobDataDto.job;

const { job, onlyPlan = false } = props;

const [graph, setGraph] = useState<Graph>();
const [curentJob, setCurentJob] = useState<string>();

Expand All @@ -38,7 +46,7 @@ const FlinkDag = (props: { jobDetail: Jobs.JobInfoDetail }) => {
shape: 'data-processing-dag-node',
width: 212,
height: 48,
component: DagNode,
component: onlyPlan ? DagPlanNode : DagDataNode,
ports: portConfig
});

Expand Down Expand Up @@ -67,11 +75,12 @@ const FlinkDag = (props: { jobDetail: Jobs.JobInfoDetail }) => {
};
graph.zoomToFit(zoomOptions);
graph.centerContent();
updateDag(job.vertices, graph);
return graph;
};

useEffect(() => {
const flinkData = buildData(job);
const flinkData = buildDag(job.plan);
// Clean up old data
if (graph) {
graph.clearCells();
Expand All @@ -80,7 +89,7 @@ const FlinkDag = (props: { jobDetail: Jobs.JobInfoDetail }) => {
}, [curentJob]);

useEffect(() => {
updateEdge(job, graph);
updateDag(job.vertices, graph);
if (curentJob != job.jid) {
setCurentJob(job.jid);
}
Expand Down
77 changes: 77 additions & 0 deletions dinky-web/src/components/JobTags/JobLifeCycleTag.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

import { StatusTagProps } from '@/components/JobTags/data';
import { JOB_LIFE_CYCLE } from '@/pages/DevOps/constants';
import { l } from '@/utils/intl';
import {
CameraOutlined,
CarryOutOutlined,
CloseCircleOutlined,
EditOutlined
} from '@ant-design/icons';
import { Tag } from 'antd';

/**
* Renders a tag for the job life cycle based on the provided step.
*
* @returns {JSX.Element} - The tag representing the job life cycle.
* @param props
*/
const JobLifeCycleTag = (props: StatusTagProps) => {
const { status, animation = true, bordered = true } = props;

const buildParam = () => {
switch (status) {
case JOB_LIFE_CYCLE.DEVELOP:
return {
icon: <EditOutlined />,
color: 'default',
text: l('global.table.lifecycle.dev')
};
case JOB_LIFE_CYCLE.RELEASE:
return {
icon: <CameraOutlined />,
color: 'green',
text: l('global.table.lifecycle.publish')
};

case JOB_LIFE_CYCLE.ONLINE:
return {
icon: <CarryOutOutlined />,
color: 'blue',
text: l('global.table.lifecycle.online')
};
default:
return {
icon: <CloseCircleOutlined />,
color: 'default',
text: status
};
}
};

const param = buildParam();
return (
<Tag icon={animation ? param.icon : undefined} color={param.color} bordered={bordered}>
{param.text}
</Tag>
);
};

export default JobLifeCycleTag;
Loading

0 comments on commit 845e4c7

Please sign in to comment.