Skip to content

Commit

Permalink
[Optimize][Web]Optimize the display of Flink operator diagram in the …
Browse files Browse the repository at this point in the history
…operation and maintenance center (DataLinkDC#3813)

Co-authored-by: zackyoungh <[email protected]>
  • Loading branch information
zackyoungh and zackyoungh authored Sep 16, 2024
1 parent a981a59 commit 89b75c6
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 104 deletions.
52 changes: 32 additions & 20 deletions dinky-web/src/components/Flink/FlinkDag/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
*/

import { DagreLayoutOptions } from '@antv/layout/lib/layout/types';
import { Platform } from '@antv/x6';
import { Options } from '@antv/x6/lib/graph/options';
import Connecting = Options.Connecting;
import Manual = Options.Manual;
import { Cell } from '@antv/x6/src/model/cell';

export const edgeConfig = {
markup: [
Expand Down Expand Up @@ -70,13 +68,42 @@ export const edgeConfig = {
}
};

export const portConfigTb = {
groups: {
in: {
position: 'top',
attrs: {
circle: {
magnet: false,
stroke: 'transparent',
strokeWidth: 1,
fill: 'transparent'
}
}
},

out: {
position: {
name: 'bottom'
},
attrs: {
circle: {
magnet: false,
stroke: 'transparent',
strokeWidth: 1,
fill: 'transparent'
}
}
}
}
};

export const portConfig = {
groups: {
in: {
position: 'left',
attrs: {
circle: {
r: 4,
magnet: false,
stroke: 'transparent',
strokeWidth: 1,
Expand All @@ -88,14 +115,10 @@ export const portConfig = {
out: {
position: {
name: 'right',
args: {
dx: 10
}
args: {}
},

attrs: {
circle: {
r: 4,
magnet: false,
stroke: 'transparent',
strokeWidth: 1,
Expand All @@ -107,22 +130,11 @@ export const portConfig = {
};

export const graphConnectConfig: Partial<Connecting> = {
connectionPoint: 'boundary',
snap: true,
allowBlank: false,
allowLoop: false,
highlight: true,
sourceAnchor: {
name: 'left',
args: {
dx: Platform.IS_SAFARI ? 4 : 8
}
},
targetAnchor: {
name: 'right',
args: {
dx: Platform.IS_SAFARI ? 4 : -8
}
},
// Connection pile verification
validateConnection({ sourceMagnet, targetMagnet }) {
// Connections can only be created from output link stubs
Expand Down
39 changes: 23 additions & 16 deletions dinky-web/src/components/Flink/FlinkDag/functions.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -109,24 +109,31 @@ export const updateDag = (job: Jobs.JobVertices[], graph?: Graph) => {
});
};

// export const s = (sourcePoint: any, targetPoint: any) => {
// const hgap = Math.abs(targetPoint.x - sourcePoint.x);
// const path = new Path();
// path.appendSegment(Path.createSegment('M', sourcePoint.x - 4, sourcePoint.y));
// path.appendSegment(Path.createSegment('L', sourcePoint.x + 12, sourcePoint.y));
// // 水平三阶贝塞尔曲线
// path.appendSegment(
// Path.createSegment(
// 'C',
// sourcePoint.x < targetPoint.x ? sourcePoint.x + hgap / 2 : sourcePoint.x - hgap / 2,
// sourcePoint.y,
// sourcePoint.x < targetPoint.x ? targetPoint.x - hgap / 2 : targetPoint.x + hgap / 2,
// targetPoint.y,
// targetPoint.x - 6,
// targetPoint.y
// )
// );
// path.appendSegment(Path.createSegment('L', targetPoint.x + 2, targetPoint.y));
//
// return path.serialize();
// };
export const regConnect = (sourcePoint: any, targetPoint: any) => {
const hgap = Math.abs(targetPoint.x - sourcePoint.x);
const path = new Path();
path.appendSegment(Path.createSegment('M', sourcePoint.x - 4, sourcePoint.y));
path.appendSegment(Path.createSegment('L', sourcePoint.x + 12, sourcePoint.y));
// 水平三阶贝塞尔曲线
path.appendSegment(
Path.createSegment(
'C',
sourcePoint.x < targetPoint.x ? sourcePoint.x + hgap / 2 : sourcePoint.x - hgap / 2,
sourcePoint.y,
sourcePoint.x < targetPoint.x ? targetPoint.x - hgap / 2 : targetPoint.x + hgap / 2,
targetPoint.y,
targetPoint.x - 6,
targetPoint.y
)
);
path.appendSegment(Path.createSegment('L', targetPoint.x + 2, targetPoint.y));
path.appendSegment(Path.createSegment('M', sourcePoint.x, sourcePoint.y));
path.appendSegment(Path.createSegment('L', targetPoint.x, targetPoint.y));

return path.serialize();
};
131 changes: 63 additions & 68 deletions dinky-web/src/components/Flink/FlinkDag/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
graphConfig,
layoutConfig,
portConfig,
portConfigTb,
zoomOptions
} from '@/components/Flink/FlinkDag/config';
import { buildDag, regConnect, updateDag } from '@/components/Flink/FlinkDag/functions';
Expand Down Expand Up @@ -146,6 +147,53 @@ const RenderCheckpoint = (id: string, checkPoints: any) => {
);
};

type CusEdge = {
source: { cell: string };
target: { cell: string };
};

function getMaxWidthAndDepth(edges: CusEdge[]): { maxWidth: number; maxDepth: number } {
const sourceCount: Record<string, number> = {};
const graph: Record<string, string[]> = {};

edges.forEach((edge) => {
const sourceCell = edge.source.cell;
const targetCell = edge.target.cell;

sourceCount[sourceCell] = (sourceCount[sourceCell] || 0) + 1;

if (!graph[sourceCell]) {
graph[sourceCell] = [];
}
graph[sourceCell].push(targetCell);
});

const maxSource = Object.keys(sourceCount).reduce((a, b) =>
sourceCount[a] > sourceCount[b] ? a : b
);
const maxWidth = sourceCount[maxSource];

const visited: Record<string, boolean> = {};
let maxDepth = 0;

function dfs(node: string, depth: number) {
if (visited[node]) return;
visited[node] = true;
maxDepth = Math.max(maxDepth, depth);
if (graph[node]) {
graph[node].forEach((neighbor) => dfs(neighbor, depth + 1));
}
}

Object.keys(graph).forEach((node) => {
if (!visited[node]) {
dfs(node, 1);
}
});

return { maxWidth, maxDepth };
}

const FlinkDag = (props: DagProps) => {
const container = useRef(null);

Expand Down Expand Up @@ -190,12 +238,24 @@ const FlinkDag = (props: DagProps) => {
};

const initGraph = (flinkData: any) => {
const { maxWidth, maxDepth } = getMaxWidthAndDepth(flinkData.edges);
let dir: string = 'LR';
let ranksep = 200;
let nodesep = 40;
let portConfigs: any = portConfig;

if (maxDepth < maxWidth) {
dir = 'TB';
ranksep = 200;
nodesep = 40;
portConfigs = portConfigTb;
}
register({
shape: 'data-processing-dag-node',
width: 240,
height: 140,
component: onlyPlan ? DagPlanNode : DagDataNode,
ports: portConfig
ports: portConfigs
});

Edge.config(edgeConfig);
Expand Down Expand Up @@ -227,78 +287,13 @@ const FlinkDag = (props: DagProps) => {
graph?.zoomTo(zoom);
updateDag(job?.vertices, graph);
initListen(graph);
layout(graph);
layout(graph, dir, ranksep, nodesep);
graph.centerContent();
return graph;
};
const getMaxListLength = (listOfLists: any[]) => {
return listOfLists.reduce((maxLength, currentList) => {
return Math.max(maxLength, currentList.length);
}, 0);
};
const calculateGraphMetrics = (graph: Graph) => {
const nodes = graph.getNodes();
const edges = graph.getEdges();

let maxWidth = getMaxListLength(Object.values(graph.model.outgoings));
let maxDepth = 0;

const nodeDepths = new Map<string, number>();

const calculateDepth = (node: Node, depth: number) => {
if (nodeDepths.has(node.id)) {
return nodeDepths.get(node.id)!;
}
nodeDepths.set(node.id, depth);

const outgoingEdges = edges.filter((edge) => edge.getSourceCellId() === node.id);
outgoingEdges.forEach((edge) => {
const targetNode = graph.getCellById(edge.getTargetCellId()) as Node;
calculateDepth(targetNode, depth + 1);
});

return depth;
};

nodes.forEach((node) => {
if (!nodeDepths.has(node.id)) {
calculateDepth(node, 1);
}
});

const calculatePathLength = (node: Node, length: number) => {
const outgoingEdges = edges.filter((edge) => edge.getSourceCellId() === node.id);
if (outgoingEdges.length === 0) {
maxDepth = Math.max(maxDepth, length);
return;
}
outgoingEdges.forEach((edge) => {
const targetNode = graph.getCellById(edge.getTargetCellId()) as Node;
calculatePathLength(targetNode, length + 1);
});
};

nodes.forEach((node) => {
calculatePathLength(node, 1);
});

return { maxWidth, maxDepth };
};

// 自动布局
function layout(graph: Graph) {
const { maxDepth, maxWidth } = calculateGraphMetrics(graph);
// 布局方向
let dir: string = 'LR';
let ranksep = 200;
let nodesep = 40;

if (maxDepth < maxWidth) {
dir = 'TB';
ranksep = 40;
nodesep = 40;
}

function layout(graph: Graph, dir: string, ranksep: number, nodesep: number) {
const nodes = graph.getNodes();
const edges = graph.getEdges();
const g = new dagre.graphlib.Graph();
Expand Down

0 comments on commit 89b75c6

Please sign in to comment.