-
Notifications
You must be signed in to change notification settings - Fork 11
/
eosdac-ws.js
executable file
·154 lines (127 loc) · 4.51 KB
/
eosdac-ws.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#!/usr/bin/env node
process.title = 'eosdac-ws';
const {loadConfig} = require('./functions');
const cluster = require('cluster');
const WebSocketServer = require('websocket').server;
const http = require('http');
const fs = require('fs');
const {IPC} = require('node-ipc');
const workers = [], connections = [], connections_idx = new Map();
const config = loadConfig();
const receiveWSMessage = (connection) => {
// Receive register request from ws client
return (message) => {
if (message.type === 'utf8') {
try {
const msg = JSON.parse(message.utf8Data);
console.log('Received websocket request: ' + message.utf8Data, msg);
switch (msg.type){
case 'register':
const dac_id = msg.data.dac_id;
if (!dac_id){
throw new Error(`dac_id not supplied`);
}
// add to index by dac_id
let existing = connections_idx.get(dac_id);
if (!existing){
existing = [];
}
existing.push(connection);
connections_idx.set(dac_id, existing);
break;
default:
throw new Error(`Unknown message type : ${msg.type}`);
break;
}
// connection.sendUTF(message.utf8Data);
}
catch (e){
console.log(`Error in message, closing connection ${e.message}`);
connection.close();
}
}
}
};
const receiveIPCMessage = (ipc, type) => {
// IPC message from one of the connected clients, relay to workers who will send to connected clients
return (data, socket) => {
console.log(`Got data to websocket IPC server of type ${type}, notifying workers`, data);
workers.forEach((w) => {
w.send({type, data});
});
};
};
const receiveWorkerMessage = (msg) => {
console.log(`Worker got message, sending to interested websocket clients`, msg);
// find which connections are interested and send to them
const dac_id = msg.data.dac_id;
if (dac_id){
const notify = connections_idx.get(dac_id);
if (notify && notify.length){
notify.forEach((conn) => {
conn.send(JSON.stringify(msg));
});
}
const notify_all = connections_idx.get('*');
if (notify_all && notify_all.length){
notify_all.forEach((conn) => {
conn.send(JSON.stringify(msg));
});
}
}
};
const startWSServer = (host = '127.0.0.1', port = 3030) => {
console.log(`Starting HTTP server for websocket status server`);
const sendIndex = (response) => {
response.writeHead(200, {
"Content-Type": "text/html"
});
fs.createReadStream("static/ws-test.html").pipe(response);
};
const server = http.createServer(function(request, response) {
// send test page
sendIndex(response);
});
server.listen(port, host, () => {
console.log(`Started http/websocket server on ${host}:${port}`);
});
// create the server
const wsServer = new WebSocketServer({
httpServer: server
});
// WebSocket server
wsServer.on('request', (request) => {
// console.log(`Got request`, req.message.utf8Data, data);
const connection = request.accept();
connections.push(connection);
console.log((new Date()) + ' Connection accepted.');
connection.on('message', receiveWSMessage(connection));
});
};
const startIPCServer = () => {
const ipc = new IPC();
ipc.config.appspace = config.ipc.appspace;
ipc.config.id = config.ipc.id;
ipc.config.retry = 1500;
ipc.serve(() => {
console.log(`Started IPC Server`);
ipc.server.on('notification', receiveIPCMessage(ipc, 'notification'));
});
ipc.server.start();
};
// Start the server and workers
const process_count = 4;
if (cluster.isMaster){
for (let i=0;i<process_count;i++){
const worker = cluster.fork();
workers.push(worker);
}
// start ipc server for other processes to notify websockets
startIPCServer();
}
else {
console.log(`Starting worker`);
console.log(config.ws);
startWSServer(config.ws.host, config.ws.port);
process.on('message', receiveWorkerMessage);
}