Skip to content

Commit

Permalink
Modularize WS protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
hans00 committed Dec 1, 2019
1 parent 27b942c commit 047018d
Show file tree
Hide file tree
Showing 10 changed files with 358 additions and 200 deletions.
119 changes: 119 additions & 0 deletions BENCHMARK.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
Tested
---

- Express + ws
- this project


Enviroments
---

VM Host:
```
CPU: E5-2630 v3
RAM: 64GB
OS: Proxmox VE
```

# Server

```
CPU: KVM 2 Core
RAM: 2GB
OS: Debian 10
Node: 12
```

# Client

```
CPU: KVM 4 Core
RAM: 4GB
OS: Debian 10
```

Results
---

# Static file

```sh
wrk -c 1k -t 10 -d 10s http://<IP>:3000
```

## Express

```
Running 10s test @ http://<IP>:3000
10 threads and 1000 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 309.14ms 84.79ms 1.23s 89.22%
Req/Sec 323.84 173.86 1.00k 71.07%
31366 requests in 10.03s, 9.09MB read
Requests/sec: 3127.35
Transfer/sec: 0.91MB
```

## This Project

```
Running 10s test @ http://172.17.210.1:3000
10 threads and 1000 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 54.04ms 52.45ms 1.08s 98.46%
Req/Sec 1.89k 551.05 3.73k 77.86%
76083 requests in 10.06s, 12.63MB read
Requests/sec: 7565.18
Transfer/sec: 1.26MB
```

# Dynamic with URL parameter

```sh
wrk -c 1k -t 10 -d 10s http://<IP>:3000/param/test
```

## Express

```
Running 10s test @ http://<IP>:3000/param/test
10 threads and 1000 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 102.97ms 101.94ms 1.35s 97.42%
Req/Sec 1.07k 275.90 2.66k 83.28%
99698 requests in 10.08s, 11.98MB read
Socket errors: connect 0, read 0, write 0, timeout 98
Requests/sec: 9893.80
Transfer/sec: 1.19MB
```

## This Project

```
Running 10s test @ http://<IP>:3000/param/test
10 threads and 1000 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 25.17ms 12.54ms 493.42ms 99.29%
Req/Sec 4.00k 616.40 5.94k 90.58%
397847 requests in 10.03s, 23.52MB read
Requests/sec: 39675.91
Transfer/sec: 2.35MB
```

# WS

```sh
thor -M 500 -A 1000 -W 4 -B 1024 ws://<IP>:3000/ws
```

## Express

```
WIP
```

## This Project

```
WIP
```
22 changes: 7 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,8 @@ app.get('/hello/:name/alert', async (req, res, params) => {

app.serve('/*') // auto serve project /static/*

app.listen(3000, err => {
if (!err) {
console.log('Listen on 3000')
}
app.listen(3000, () => {
console.log('Listen on 3000')
})
```

Expand Down Expand Up @@ -175,21 +173,15 @@ options = {
options = {
compression: 'default', // 'disable' / 'default' == 'shared' / 'dedicated'
idleTimeout: 300, // (sec)
maxPayloadLength: 4096
maxPayloadLength: 4096,
protocol: 'fast-ws', // string of ws protocol name (builtins: fast-ws, echo, chat)
protocol: Object, // object of WS protocol (must extends 'fast-ws/server/ws-prototol/basic')
}
```

- `broadcast(channel, event, data[, compress=true])`

> Broadcast event to WebSocket channel
- `broadcastMessage(channel, message[, compress=true])`

> Broadcast message to WebSocket channel
- `broadcastBinary(channel, message[, compress=true])`
- `listen(path, callback)`

> Broadcast binary to WebSocket channel
> Start listen
## `Request`

Expand Down
15 changes: 11 additions & 4 deletions client/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ class WSClient extends EventEmitter {
super()
this.options = options
this.connectState = 0
this.client = new WebSocket(endpoint)
this.client = new WebSocket(endpoint, 'fast-ws', options)
this.client.on('error', error => {
this.emit('error', error)
})
Expand All @@ -23,9 +23,16 @@ class WSClient extends EventEmitter {
this.emit('disconnected')
})
this.client.on('message', (message) => {
if (message === '\x00') {
this.connectState = 2
this.emit('ready')
if (this.connectState !== 2) {
if (message === '\x00\x01') {
this.connectState = 2
this.emit('ready')
} else {
this.emit('error', new Error({
code: 'CLIENT_VERSION_MISMATCH',
message: 'Client version mismatch.'
}))
}
} else {
this.onMessage(message)
}
Expand Down
47 changes: 24 additions & 23 deletions server/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const uWS = require('bindings')('uWS')
const LRU = require('lru-cache')
const WSClient = require('./ws')
const BasicWSProtocol = require('./ws-protocol/basic')
const Request = require('./request')
const Response = require('./response')
const ServerError = require('./errors')
Expand Down Expand Up @@ -92,27 +92,15 @@ class fastWS {
this._socket = listenSocket
if (listenSocket) {
this.options.verbose && console.log('Started')
if (callback) {
callback()
}
} else {
this.options.verbose && console.log('Failed')
}
if (callback) {
callback(listenSocket)
}
})
}

broadcast (channel, event, data, compress = true) {
this._server.publish(channel, WSClient.getPayload({ event, data }, 'event'), false, compress)
}

broadcastMessage (channel, data, compress = true) {
this._server.publish(channel, WSClient.getPayload(data), false, compress)
}

broadcastBinary (channel, data, compress = true) {
this._server.publish(channel, data, true, compress)
}

route (method, path, callbacks) {
if (!this._routes[path]) {
this._routes[path] = {}
Expand Down Expand Up @@ -168,15 +156,28 @@ class fastWS {
if (options.maxPayloadLength && !Number.isInteger(options.maxPayloadLength)) {
throw new ServerError({ code: 'INVALID_OPTIONS', message: 'Invalid websocket option' })
}
if (options.protocol) {
if (options.protocol === 'custom' || options.protocol === 'chat') {
options.protocol = BasicWSProtocol
} else if (typeof options.protocol === 'string') {
options.protocol = require(`./ws-protocol/${options.protocol}`)
} else if (!(options.protocol.prototype instanceof BasicWSProtocol)) {
throw new ServerError({ code: 'INVALID_OPTIONS', message: 'Invalid websocket option' })
}
} else {
options.protocol = require('./ws-protocol/fast-ws')
}
const Protocol = options.protocol
this.route('ws', path, {
...options,
open: async (ws, req) => {
const client = new WSClient(ws, new Request(req))
compression: options.compression,
idleTimeout: options.idleTimeout,
maxPayloadLength: options.maxPayloadLength,
open: (ws, request) => {
const client = new Protocol(ws, new Request(request, null))
this.options.verbose && console.log('[open]', client.remoteAddress)
ws._client = client
try {
await callback(client)
ws.send('\x00', 0, 0)
callback(client)
} catch (error) {
console.error(error)
// disconnect when error
Expand All @@ -189,7 +190,7 @@ class fastWS {
// decode message
ws._client.emitPayload(Buffer.from(message).toString())
} catch (error) {
if (error === 'INVALID_PAYLOAD') {
if (error.code === 'WS_INVALID_PAYLOAD') {
this.options.verbose && console.log('[error]', 'Invalid message payload')
} else {
console.error(error)
Expand All @@ -211,7 +212,7 @@ class fastWS {
ws._client.emit('pong')
},
close: (ws, code, message) => {
ws._client.emit('disconnect')
ws._client.emit('close', code, message)
setImmediate(() => delete ws._client)
}
})
Expand Down
71 changes: 71 additions & 0 deletions server/ws-protocol/basic.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
const inet = require('../inet')
const { EventEmitter } = require('events')

class WSClient extends EventEmitter {
constructor (session, request) {
super()
this.session = session
this.requestHeaders = request.headers
}

emitPayload (payload) {
this.emit('message', { data: payload })
}

get remoteAddress () {
return inet.ntop(Buffer.from(this.session.getRemoteAddress()))
}

drain () {
if (this.session.getBufferedAmount() === 0) {
this.emit('drained')
}
}

_publish (topic, data, isBinary, compress) {
this.session.publish(topic, data, isBinary, compress)
}

async _send (data, isBinary, compress) {
if (this.session.getBufferedAmount() === 0) {
return this.session.send(data, isBinary, compress)
} else {
await new Promise((resolve) => {
super.once('drained', async () => {
await this._send(data, isBinary, compress)
resolve()
})
})
}
}

join (channel) {
return this.session.subscribe(channel)
}

quit (channel) {
return this.session.unsubscribe(channel)
}

send (data, compress = true) {
return this._send(data, false, compress)
}

sendBinary (data, compress = true) {
return this._send(data, true, compress)
}

broadcast (channel, event, data, compress = true) {
this._publish(channel, data, false, compress)
}

broadcastBinary (channel, data, compress = true) {
this._publish(channel, data, true, compress)
}

close () {
return this.session.close()
}
}

module.exports = WSClient
12 changes: 12 additions & 0 deletions server/ws-protocol/echo.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
const basic = require('./basic')

class WSClient extends basic {
constructor (session, request) {
super(session, request)
this.on('message', ({ data }) => {
this.send(data)
})
}
}

module.exports = WSClient
Loading

0 comments on commit 047018d

Please sign in to comment.