Skip to content

Commit

Permalink
[admin] Add websocket PING PONG (DataLinkDC#3921)
Browse files Browse the repository at this point in the history
Co-authored-by: gaoyan1998 <[email protected]>
  • Loading branch information
gaoyan1998 and gaoyan1998 authored Nov 19, 2024
1 parent 92c1c01 commit 917ba7c
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 5 deletions.
18 changes: 14 additions & 4 deletions dinky-admin/src/main/java/org/dinky/data/vo/SseDataVo.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,25 @@

package org.dinky.data.vo;

import lombok.AllArgsConstructor;
import lombok.Builder;
import org.dinky.ws.GlobalWebSocket;

import lombok.Data;

@Data
@AllArgsConstructor
@Builder
public class SseDataVo {
private String sessionKey;
private String topic;
private Object data;
private GlobalWebSocket.RequestDTO.EventType type;

public SseDataVo(String sessionKey, String topic, Object data) {
this.sessionKey = sessionKey;
this.topic = topic;
this.data = data;
}

public SseDataVo(String sessionKey, GlobalWebSocket.RequestDTO.EventType type) {
this.sessionKey = sessionKey;
this.type = type;
}
}
14 changes: 14 additions & 0 deletions dinky-admin/src/main/java/org/dinky/ws/GlobalWebSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ public GlobalWebSocket() {
public static class RequestDTO {
private Map<GlobalWebSocketTopic, Set<String>> topics;
private String token;
private EventType type;

public enum EventType {
SUBSCRIBE,
PING,
PONG
}
}

private static final Map<Session, RequestDTO> TOPICS = new ConcurrentHashMap<>();
Expand All @@ -99,6 +106,13 @@ public void onMessage(String message, Session session) throws IOException {
TOPICS.remove(session);
return;
}

if (requestDTO.getType() == RequestDTO.EventType.PING) {
SseDataVo data = new SseDataVo(session.getId(), RequestDTO.EventType.PONG);
session.getBasicRemote().sendText(JsonUtils.toJsonString(data));
return;
}

Map<GlobalWebSocketTopic, Set<String>> topics = requestDTO.getTopics();
if (MapUtil.isNotEmpty(topics)) {
TOPICS.put(session, requestDTO);
Expand Down
14 changes: 13 additions & 1 deletion dinky-web/src/models/UseWebSocketModel.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { TOKEN_KEY } from '@/services/constants';
export type SseData = {
topic: string;
data: Record<string, any>;
type: string;
};

export enum Topic {
Expand All @@ -48,6 +49,7 @@ export type WsState = {

export default () => {
const subscriberRef = useRef<SubscriberData[]>([]);
const lastPongTimeRef = useRef<number>(new Date().getTime());

const protocol = window.location.protocol === 'https:' ? 'wss' : 'ws';
const wsUrl = `${protocol}://${window.location.hostname}:${window.location.port}/api/ws/global`;
Expand All @@ -61,6 +63,7 @@ export default () => {
}
ws.current = new WebSocket(wsUrl);
ws.current.onopen = () => {
lastPongTimeRef.current = new Date().getTime();
setWsState({ wsOnReady: true, wsUrl });
receiveMessage();
subscribe();
Expand All @@ -85,7 +88,7 @@ export default () => {
reconnect();
} else if (ws.current.readyState === WebSocket.OPEN) {
const token = JSON.parse(localStorage.getItem(TOKEN_KEY) ?? '{}')?.tokenValue;
ws.current.send(JSON.stringify({ token, topics }));
ws.current.send(JSON.stringify({ token, topics, type: 'SUBSCRIBE' }));
} else {
//TODO do someting
}
Expand All @@ -96,6 +99,7 @@ export default () => {
ws.current.onmessage = (e) => {
try {
const data: SseData = JSON.parse(e.data);
lastPongTimeRef.current = new Date().getTime();
subscriberRef.current
.filter((sub) => sub.topic === data.topic)
.filter((sub) => !sub.params || sub.params.find((x) => data.data[x]))
Expand All @@ -112,6 +116,14 @@ export default () => {
setInterval(() => {
if (!ws.current || ws.current.readyState != WebSocket.OPEN) {
reconnect();
} else {
const currentTime = new Date().getTime();
if (currentTime - lastPongTimeRef.current > 15000) {
reconnect();
} else if (currentTime - lastPongTimeRef.current > 5000) {
const token = JSON.parse(localStorage.getItem(TOKEN_KEY) ?? '{}')?.tokenValue;
ws.current.send(JSON.stringify({ token, type: 'PING' }));
}
}
}, 2000);
}, []);
Expand Down

0 comments on commit 917ba7c

Please sign in to comment.