Skip to content

Commit

Permalink
feat#6: Follower 断开连接时,Leader 将其对应的 Request 对象 从 requestPool 中移除
Browse files Browse the repository at this point in the history
  • Loading branch information
ImHype committed Nov 27, 2018
1 parent 58bcd98 commit 423ac20
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 18 deletions.
7 changes: 4 additions & 3 deletions lib/core/leader.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ module.exports = class Leader extends Base {
logger: this.logger
});
await this.server.ready();
this.server.on('follow', async ({client, data}) => {

this.server.on('follow', async ({request, data}) => {
this.logger.info(`Leader recieved Follower(${data.from})'s connection.`);
await client.send({data: 'ok'});
await request.send({data: 'ok'});
this.server.broadcast();
});

this.server.on('message', ({client, body}) => {
this.server.on('message', ({request, body}) => {
this.logger.info(body);
});
}
Expand Down
33 changes: 18 additions & 15 deletions lib/core/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ const Base = require('sdk-base');
const net = require('net');
const pify = require('pify');

class Client extends Base {
class Request extends Base {
get socket() {
return this.options.socket;
}
Expand All @@ -28,6 +28,7 @@ module.exports = class Leader extends Base {
super(Object.assign({}, options, {
initMethod: '_init'
}));
this.requestPool = new Map();
}

get port() {
Expand All @@ -40,48 +41,46 @@ module.exports = class Leader extends Base {

async broadcast(data) {
const promises = [];
this.sockets.forEach((client) => {
promises.push(client.send(data));
this.requestPool.forEach((request) => {
promises.push(request.send(data));
});

await Promise.all(promises);
}

async _init() {
this.sockets = new Map();

this.server = net.createServer((socket) => {
if (!this.sockets.has(socket)) {
this.sockets.set(socket, new Client({
if (!this.requestPool.has(socket)) {
this.requestPool.set(socket, new Request({
socket
}));
}
const client = this.sockets.get(socket);
const request = this.requestPool.get(socket);
let header;
let bodyLen = null;
let bodySize = null;

const readPacket = () => {
if (bodyLen === null) {
if (bodySize === null) {
header = socket.read(8);
if (!header) {
return false;
}
bodyLen = header.readInt32BE(4);
bodySize = header.readInt32BE(4);
}

if (bodyLen !== 0) {
const buf = socket.read(bodyLen);
if (bodySize !== 0) {
const buf = socket.read(bodySize);
const str = buf.toString();
const data = JSON.parse(str);
const { action } = data;
const isolateActions = ['follow', 'close'];
const actionName = isolateActions.includes(action)? action: 'message';

this.emit(actionName, {
client, data,
request, data,
});
}
bodyLen = null;
bodySize = null;
return true;
}

Expand All @@ -96,6 +95,10 @@ module.exports = class Leader extends Base {
this.logger.error(err);
}
});

socket.on('close', () => {
this.requestPool.delete(socket);
});
});

const success = await new Promise((resolve) => {
Expand Down

0 comments on commit 423ac20

Please sign in to comment.