diff --git a/lib/core/leader.js b/lib/core/leader.js index a650ab5..c7051f4 100644 --- a/lib/core/leader.js +++ b/lib/core/leader.js @@ -23,13 +23,14 @@ module.exports = class Leader extends Base { logger: this.logger }); await this.server.ready(); - this.server.on('follow', async ({client, data}) => { + + this.server.on('follow', async ({request, data}) => { this.logger.info(`Leader recieved Follower(${data.from})'s connection.`); - await client.send({data: 'ok'}); + await request.send({data: 'ok'}); this.server.broadcast(); }); - this.server.on('message', ({client, body}) => { + this.server.on('message', ({request, body}) => { this.logger.info(body); }); } diff --git a/lib/core/server.js b/lib/core/server.js index 5f89b1f..d2f48a0 100644 --- a/lib/core/server.js +++ b/lib/core/server.js @@ -2,7 +2,7 @@ const Base = require('sdk-base'); const net = require('net'); const pify = require('pify'); -class Client extends Base { +class Request extends Base { get socket() { return this.options.socket; } @@ -28,6 +28,7 @@ module.exports = class Leader extends Base { super(Object.assign({}, options, { initMethod: '_init' })); + this.requestPool = new Map(); } get port() { @@ -40,37 +41,35 @@ module.exports = class Leader extends Base { async broadcast(data) { const promises = []; - this.sockets.forEach((client) => { - promises.push(client.send(data)); + this.requestPool.forEach((request) => { + promises.push(request.send(data)); }); await Promise.all(promises); } async _init() { - this.sockets = new Map(); - this.server = net.createServer((socket) => { - if (!this.sockets.has(socket)) { - this.sockets.set(socket, new Client({ + if (!this.requestPool.has(socket)) { + this.requestPool.set(socket, new Request({ socket })); } - const client = this.sockets.get(socket); + const request = this.requestPool.get(socket); let header; - let bodyLen = null; + let bodySize = null; const readPacket = () => { - if (bodyLen === null) { + if (bodySize === null) { header = socket.read(8); if (!header) { return false; } - bodyLen = header.readInt32BE(4); + bodySize = header.readInt32BE(4); } - if (bodyLen !== 0) { - const buf = socket.read(bodyLen); + if (bodySize !== 0) { + const buf = socket.read(bodySize); const str = buf.toString(); const data = JSON.parse(str); const { action } = data; @@ -78,10 +77,10 @@ module.exports = class Leader extends Base { const actionName = isolateActions.includes(action)? action: 'message'; this.emit(actionName, { - client, data, + request, data, }); } - bodyLen = null; + bodySize = null; return true; } @@ -96,6 +95,10 @@ module.exports = class Leader extends Base { this.logger.error(err); } }); + + socket.on('close', () => { + this.requestPool.delete(socket); + }); }); const success = await new Promise((resolve) => {