diff --git a/dinky-admin/src/main/java/org/dinky/data/vo/SseDataVo.java b/dinky-admin/src/main/java/org/dinky/data/vo/SseDataVo.java index c940572acb..1b83cb65bd 100644 --- a/dinky-admin/src/main/java/org/dinky/data/vo/SseDataVo.java +++ b/dinky-admin/src/main/java/org/dinky/data/vo/SseDataVo.java @@ -19,15 +19,25 @@ package org.dinky.data.vo; -import lombok.AllArgsConstructor; -import lombok.Builder; +import org.dinky.ws.GlobalWebSocket; + import lombok.Data; @Data -@AllArgsConstructor -@Builder public class SseDataVo { private String sessionKey; private String topic; private Object data; + private GlobalWebSocket.RequestDTO.EventType type; + + public SseDataVo(String sessionKey, String topic, Object data) { + this.sessionKey = sessionKey; + this.topic = topic; + this.data = data; + } + + public SseDataVo(String sessionKey, GlobalWebSocket.RequestDTO.EventType type) { + this.sessionKey = sessionKey; + this.type = type; + } } diff --git a/dinky-admin/src/main/java/org/dinky/ws/GlobalWebSocket.java b/dinky-admin/src/main/java/org/dinky/ws/GlobalWebSocket.java index 3d742c2a42..2e6eb66030 100644 --- a/dinky-admin/src/main/java/org/dinky/ws/GlobalWebSocket.java +++ b/dinky-admin/src/main/java/org/dinky/ws/GlobalWebSocket.java @@ -78,6 +78,13 @@ public GlobalWebSocket() { public static class RequestDTO { private Map> topics; private String token; + private EventType type; + + public enum EventType { + SUBSCRIBE, + PING, + PONG + } } private static final Map TOPICS = new ConcurrentHashMap<>(); @@ -99,6 +106,13 @@ public void onMessage(String message, Session session) throws IOException { TOPICS.remove(session); return; } + + if (requestDTO.getType() == RequestDTO.EventType.PING) { + SseDataVo data = new SseDataVo(session.getId(), RequestDTO.EventType.PONG); + session.getBasicRemote().sendText(JsonUtils.toJsonString(data)); + return; + } + Map> topics = requestDTO.getTopics(); if (MapUtil.isNotEmpty(topics)) { TOPICS.put(session, requestDTO); diff --git a/dinky-web/src/models/UseWebSocketModel.tsx b/dinky-web/src/models/UseWebSocketModel.tsx index 86886e1c42..edc8465833 100644 --- a/dinky-web/src/models/UseWebSocketModel.tsx +++ b/dinky-web/src/models/UseWebSocketModel.tsx @@ -25,6 +25,7 @@ import { TOKEN_KEY } from '@/services/constants'; export type SseData = { topic: string; data: Record; + type: string; }; export enum Topic { @@ -48,6 +49,7 @@ export type WsState = { export default () => { const subscriberRef = useRef([]); + const lastPongTimeRef = useRef(new Date().getTime()); const protocol = window.location.protocol === 'https:' ? 'wss' : 'ws'; const wsUrl = `${protocol}://${window.location.hostname}:${window.location.port}/api/ws/global`; @@ -61,6 +63,7 @@ export default () => { } ws.current = new WebSocket(wsUrl); ws.current.onopen = () => { + lastPongTimeRef.current = new Date().getTime(); setWsState({ wsOnReady: true, wsUrl }); receiveMessage(); subscribe(); @@ -85,7 +88,7 @@ export default () => { reconnect(); } else if (ws.current.readyState === WebSocket.OPEN) { const token = JSON.parse(localStorage.getItem(TOKEN_KEY) ?? '{}')?.tokenValue; - ws.current.send(JSON.stringify({ token, topics })); + ws.current.send(JSON.stringify({ token, topics, type: 'SUBSCRIBE' })); } else { //TODO do someting } @@ -96,6 +99,7 @@ export default () => { ws.current.onmessage = (e) => { try { const data: SseData = JSON.parse(e.data); + lastPongTimeRef.current = new Date().getTime(); subscriberRef.current .filter((sub) => sub.topic === data.topic) .filter((sub) => !sub.params || sub.params.find((x) => data.data[x])) @@ -112,6 +116,14 @@ export default () => { setInterval(() => { if (!ws.current || ws.current.readyState != WebSocket.OPEN) { reconnect(); + } else { + const currentTime = new Date().getTime(); + if (currentTime - lastPongTimeRef.current > 15000) { + reconnect(); + } else if (currentTime - lastPongTimeRef.current > 5000) { + const token = JSON.parse(localStorage.getItem(TOKEN_KEY) ?? '{}')?.tokenValue; + ws.current.send(JSON.stringify({ token, type: 'PING' })); + } } }, 2000); }, []);