Skip to content

Commit

Permalink
feat: client add Queue
Browse files Browse the repository at this point in the history
  • Loading branch information
hxg2050 committed Mar 5, 2024
1 parent 009ec50 commit c5f7992
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 74 deletions.
Binary file modified bun.lockb
Binary file not shown.
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "feiyun",
"private": false,
"version": "0.2.1",
"version": "0.2.2",
"repository": "https://github.com/hxg2050/feiyun.git",
"license": "MIT",
"main": "./dist/index.cjs",
Expand Down Expand Up @@ -38,7 +38,9 @@
"dependencies": {
"eventemitter3": "^5.0.1",
"feiyun-handler": "^0.0.6",
"queue": "^7.0.0",
"reflect-metadata": "^0.1.13",
"uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.42.0",
"ws": "^8.14.1"
}
}
24 changes: 20 additions & 4 deletions src/client/FeiyunClient.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import EventEmitter from 'eventemitter3'
import Queue from 'queue'

import WebSocket from './node';

export interface ClientConfig {
url: string
Expand All @@ -18,6 +21,9 @@ export class FeiyunClient {
private emitter: EventEmitter = new EventEmitter()
private anyKey = Symbol('anyKey')

private queue = new Queue({ results: [] });
public online: boolean = false;

constructor(private config: ClientConfig) {
this.ws = new WebSocket(this.config.url)
this.ws.addEventListener('open', () => {
Expand All @@ -40,9 +46,11 @@ export class FeiyunClient {
* 连接成功
*/
onOpen() {
console.log('连接服务器成功')
this.config.heart && this.ping()
this.emitter.emit('connect')
console.log('连接服务器成功');
this.config.heart && this.ping();
this.online = true;
this.emitter.emit('connect');
this.queue.start();
}

pingTimeout?: number | NodeJS.Timeout
Expand Down Expand Up @@ -74,6 +82,7 @@ export class FeiyunClient {

onClose() {
console.log('连接断开')
this.online = false;
clearTimeout(this.pingTimeout)
this.emitter.emit('disconnect')
}
Expand All @@ -82,7 +91,14 @@ export class FeiyunClient {
* 发送消息
*/
send(name: string, data?: any) {
this.ws.send(JSON.stringify([++this.index, name, data]))
++this.index;
if (this.online) {
this.ws.send(JSON.stringify([this.index, name, data]));
} else {
this.queue.push(() => {
this.ws.send(JSON.stringify([this.index, name, data]));
});
}
}

/**
Expand Down
3 changes: 3 additions & 0 deletions src/client/node.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import { WebSocket as _WebSocket } from "ws";

export default globalThis.WebSocket || _WebSocket;
138 changes: 69 additions & 69 deletions src/server/ws.ts
Original file line number Diff line number Diff line change
@@ -1,79 +1,79 @@
import { ServerWebSocket } from "bun";
import { IWebsocketServer } from "./IWebSocketServer";
import { IWebsocketServer } from "./IWebsocketServer";

const ALL = '_:world';
export const createWebsocketServer = (options: {port?: number, timeout?: number} = {}): IWebsocketServer => {
export const createWebsocketServer = (options: { port?: number, timeout?: number } = {}): IWebsocketServer => {

const allTimeout: Map<ServerWebSocket, Timer> = new Map();
const ping = (ws: ServerWebSocket) => {
if (!options.timeout || options.timeout < 0) {
return
}
clearTimeout(allTimeout.get(ws));
allTimeout.set(ws, setTimeout(ws.close, options.timeout));
const allTimeout: Map<ServerWebSocket, Timer> = new Map();
const ping = (ws: ServerWebSocket) => {
if (!options.timeout || options.timeout < 0) {
return
}
clearTimeout(allTimeout.get(ws));
allTimeout.set(ws, setTimeout(ws.close, options.timeout));
}

const stopPing = (ws: ServerWebSocket) => {
clearTimeout(allTimeout.get(ws));
allTimeout.delete(ws);
}
const stopPing = (ws: ServerWebSocket) => {
clearTimeout(allTimeout.get(ws));
allTimeout.delete(ws);
}

let openHandler: (ws: ServerWebSocket) => void;
let messageHandler: (ws: ServerWebSocket, message: string | Buffer) => void;
let closeHandler: (ws: ServerWebSocket) => void;
let openHandler: (ws: ServerWebSocket) => void;
let messageHandler: (ws: ServerWebSocket, message: string | Buffer) => void;
let closeHandler: (ws: ServerWebSocket) => void;

const open = (handler: typeof openHandler) => {
openHandler = handler;
}
const message = (handler: typeof messageHandler) => {
messageHandler = handler;
}
const open = (handler: typeof openHandler) => {
openHandler = handler;
}
const message = (handler: typeof messageHandler) => {
messageHandler = handler;
}

const close = (handler: typeof closeHandler) => {
closeHandler = handler;
}
Bun.serve<undefined>({
port: 3000,
fetch(req, server) {
if (
server.upgrade(req, {
data: {
},
})
)
return;
return new Response("Error");
},
websocket: {
open(ws) {
ws.subscribe(ALL);
options.timeout && ping(ws);
openHandler && openHandler(ws);
},
message(ws, message) {
if (message === 'ping') {
if (options.timeout) {
ping(ws);
}
} else{
messageHandler && messageHandler(ws, message);
}
},
close(ws) {
ws.unsubscribe(ALL);
options.timeout && stopPing(ws);
closeHandler && closeHandler(ws);
},
perMessageDeflate: false,
publishToSelf: true,
},
});
return {
open,
message,
close
}
const close = (handler: typeof closeHandler) => {
closeHandler = handler;
}

Bun.serve<undefined>({
port: 3000,
fetch(req, server) {
if (
server.upgrade(req, {
data: {
},
})
)
return;

return new Response("Error");
},
websocket: {
open(ws) {
ws.subscribe(ALL);
options.timeout && ping(ws);
openHandler && openHandler(ws);
},
message(ws, message) {
if (message === 'ping') {
if (options.timeout) {
ping(ws);
}
} else {
messageHandler && messageHandler(ws, message);
}
},
close(ws) {
ws.unsubscribe(ALL);
options.timeout && stopPing(ws);
closeHandler && closeHandler(ws);
},
perMessageDeflate: false,
publishToSelf: true,
},
});

return {
open,
message,
close
}
}

0 comments on commit c5f7992

Please sign in to comment.