Skip to content

Commit

Permalink
[Bug][Web] Fix ws bug (#3881)
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoyan1998 authored Oct 22, 2024
1 parent 0f52a56 commit d47403d
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 21 deletions.
33 changes: 33 additions & 0 deletions dinky-web/src/components/Modal/WsErrorShow/WsErrorShow.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

import { Button, Result } from 'antd';
import { WsState } from '@/models/UseWebSocketModel';
import * as React from 'react';
import { l } from '@/utils/intl';

const WsErrorShow = (props: { state: WsState; extra?: React.ReactNode }) => {
const { state, extra } = props;

return (
<Result status='error' title={l('global.ws.failed')} subTitle={state.wsUrl} extra={extra} />
);
};

export default WsErrorShow;
5 changes: 4 additions & 1 deletion dinky-web/src/locales/en-US/global.ts
Original file line number Diff line number Diff line change
Expand Up @@ -236,5 +236,8 @@ export default {
'global.job.status.failed-tip': 'Failed to submit to the cluster, unable to get the task name',

'global.operation.unable': 'Unable to operate',
'global.operation.able': 'Can operate'
'global.operation.able': 'Can operate',

//WS
'global.ws.failed': 'Failed to connect to WebSocket'
};
5 changes: 4 additions & 1 deletion dinky-web/src/locales/zh-CN/global.ts
Original file line number Diff line number Diff line change
Expand Up @@ -232,5 +232,8 @@ export default {
'global.job.status.failed-tip': '未成功提交到集群,无法获取任务名称/作业ID',

'global.operation.unable': '无法操作',
'global.operation.able': '可以操作'
'global.operation.able': '可以操作',

//WS
'global.ws.failed': '连接WebSocket失败'
};
55 changes: 37 additions & 18 deletions dinky-web/src/models/UseWebSocketModel.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { useEffect, useRef, useState } from 'react';
import { ErrorMessage } from '@/utils/messages';
import { v4 as uuidv4 } from 'uuid';
import { TOKEN_KEY } from '@/services/constants';

export type SseData = {
topic: string;
data: Record<string, any>;
Expand All @@ -39,21 +40,33 @@ export type SubscriberData = {
params: string[];
call: (data: SseData) => void;
};

export type WsState = {
wsOnReady: boolean;
wsUrl: string;
};

export default () => {
const subscriberRef = useRef<SubscriberData[]>([]);
const wsUrl = `ws://${window.location.hostname}:${window.location.port}/api/ws/global`;

const ws = useRef<WebSocket>(new WebSocket(wsUrl));
const protocol = window.location.protocol === 'https:' ? 'wss' : 'ws';
const wsUrl = `${protocol}://${window.location.hostname}:${window.location.port}/api/ws/global`;
const [wsState, setWsState] = useState<WsState>({ wsOnReady: true, wsUrl });

const ws = useRef<WebSocket>();

const reconnect = () => {
if (ws.current.readyState === WebSocket.OPEN) {
if (ws.current && ws.current.readyState === WebSocket.CLOSED) {
ws.current.close();
}
ws.current = new WebSocket(wsUrl);
ws.current.onopen = () => {
setWsState({ wsOnReady: true, wsUrl });
receiveMessage();
subscribe();
};
ws.current.onerror = () => setWsState({ wsOnReady: false, wsUrl });
ws.current.onclose = () => setWsState({ wsOnReady: false, wsUrl });
};

const subscribe = () => {
Expand All @@ -68,31 +81,36 @@ export default () => {
topics[sub.topic] = [...topics[sub.topic]];
}
});
if (ws.current.readyState === WebSocket.CLOSED) {
if (!ws.current || ws.current.readyState === WebSocket.CLOSED) {
reconnect();
} else {
} else if (ws.current.readyState === WebSocket.OPEN) {
const token = JSON.parse(localStorage.getItem(TOKEN_KEY) ?? '{}')?.tokenValue;
ws.current.send(JSON.stringify({ token, topics }));
} else {
//TODO 这里要做些什么
}
};

const receiveMessage = () => {
ws.current.onmessage = (e) => {
try {
const data: SseData = JSON.parse(e.data);
subscriberRef.current
.filter((sub) => sub.topic === data.topic)
.filter((sub) => !sub.params || sub.params.find((x) => data.data[x]))
.forEach((sub) => sub.call(data));
} catch (e: any) {
ErrorMessage(e);
}
};
if (ws.current) {
ws.current.onmessage = (e) => {
try {
const data: SseData = JSON.parse(e.data);
subscriberRef.current
.filter((sub) => sub.topic === data.topic)
.filter((sub) => !sub.params || sub.params.find((x) => data.data[x]))
.forEach((sub) => sub.call(data));
} catch (e: any) {
ErrorMessage(e);
}
};
}
};

useEffect(() => {
receiveMessage();
setInterval(() => {
if (ws.current.readyState === WebSocket.CLOSED) {
if (!ws.current || ws.current.readyState != WebSocket.OPEN) {
reconnect();
}
}, 2000);
Expand All @@ -111,6 +129,7 @@ export default () => {

return {
subscribeTopic,
reconnect
reconnect,
wsState
};
};
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ import { SplitPane } from '@andrewray/react-multi-split-pane';
import { Pane } from '@andrewray/react-multi-split-pane/dist/lib/Pane';
import { CheckOutlined, CloseCircleFilled, LoadingOutlined, XFilled } from '@ant-design/icons';
import { connect, useModel, useRequest } from '@umijs/max';
import { Empty, Space, Typography } from 'antd';
import { Button, Empty, Space, Typography } from 'antd';
import { DataNode } from 'antd/es/tree';
import DirectoryTree from 'antd/es/tree/DirectoryTree';
import { Key, useEffect, useRef, useState } from 'react';
import { SseData, Topic } from '@/models/UseWebSocketModel';
import WsErrorShow from '@/components/Modal/WsErrorShow/WsErrorShow';

const { Text } = Typography;

Expand Down Expand Up @@ -72,11 +73,17 @@ const ConsoleContent = (props: ConsoleProps) => {
const [processNode, setProcessNode] = useState<ProcessStep>();
const [expandedKeys, setExpandedKeys] = useState<Key[]>([]);

const [showCacheData, setShowCacheData] = useState<boolean>(false);

const process = `FlinkSubmit/${tab.params.taskId}`;
const { subscribeTopic } = useModel('UseWebSocketModel', (model: any) => ({
subscribeTopic: model?.subscribeTopic
}));

const { wsState } = useModel('UseWebSocketModel', (model: any) => ({
wsState: model?.wsState
}));

const onUpdate = (data: ProcessStep) => {
setProcessNode((prevState: any) => {
//如果key不一致代表重新提交了任务,清空旧状态
Expand Down Expand Up @@ -147,6 +154,19 @@ const ConsoleContent = (props: ConsoleProps) => {
setExpandedKeys(expandedKeys);
};

if (!wsState?.wsOnReady && !showCacheData) {
return (
<WsErrorShow
state={wsState}
extra={
<Button onClick={() => setShowCacheData(true)}>
{l('devops.jobinfo.recently.job.status')}
</Button>
}
/>
);
}

return (
<div style={{ height: props.height - VIEW.leftMargin }}>
<SplitPane
Expand Down

0 comments on commit d47403d

Please sign in to comment.