Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Created a PR to Support Redis for message passing between client #195

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file modified .eslintignore
100644 → 100755
Empty file.
Empty file modified .eslintrc.json
100644 → 100755
Empty file.
Empty file modified .github/ISSUE_TEMPLATE/config.yml
100644 → 100755
Empty file.
Empty file modified .github/ISSUE_TEMPLATE/peer-template.md
100644 → 100755
Empty file.
Empty file modified .gitignore
100644 → 100755
Empty file.
Empty file modified .gitpod.yml
100644 → 100755
Empty file.
Empty file modified .travis.yml
100644 → 100755
Empty file.
Empty file modified Dockerfile
100644 → 100755
Empty file.
Empty file modified LICENSE
100644 → 100755
Empty file.
Empty file modified README.md
100644 → 100755
Empty file.
Empty file modified app.json
100644 → 100755
Empty file.
63 changes: 41 additions & 22 deletions bin/peerjs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const path = require("path");
const pkg = require("../package.json");
const fs = require("fs");
const optimistUsageLength = 98;
const yargs = require("yargs")
const yargs = require("yargs");
const version = pkg.version;
const { PeerServer } = require("../dist/src");
const opts = yargs
Expand All @@ -16,91 +16,110 @@ const opts = yargs
demandOption: false,
alias: "t",
describe: "timeout (milliseconds)",
default: 5000
default: 5000,
},
concurrent_limit: {
demandOption: false,
alias: "c",
describe: "concurrent limit",
default: 5000
default: 5000,
},
alive_timeout: {
demandOption: false,
describe: "broken connection check timeout (milliseconds)",
default: 60000
default: 60000,
},
key: {
demandOption: false,
alias: "k",
describe: "connection key",
default: "peerjs"
default: "peerjs",
},
sslkey: {
demandOption: false,
describe: "path to SSL key"
describe: "path to SSL key",
},
sslcert: {
demandOption: false,
describe: "path to SSL certificate"
describe: "path to SSL certificate",
},
port: {
demandOption: true,
alias: "p",
describe: "port"
describe: "port",
},
path: {
demandOption: false,
describe: "custom path",
default: "/"
default: "/",
},
allow_discovery: {
demandOption: false,
describe: "allow discovery of peers"
describe: "allow discovery of peers",
},
proxied: {
demandOption: false,
describe: "Set true if PeerServer stays behind a reverse proxy",
default: false
}
default: false,
},
redis: {
demandOption: false,
describe: "Should it use redis?",
default: false,
},
redisHost: {
demandOption: false,
describe: "Redis Host to use",
default: "",
},
redisPort: {
demandOption: false,
describe: "Redis Port to use",
default: 0,
},
})
.boolean("allow_discovery")
.argv;
.boolean("allow_discovery").argv;

process.on("uncaughtException", function (e) {
process.on("uncaughtException", function(e) {
console.error("Error: " + e);
});

if (opts.sslkey || opts.sslcert) {
if (opts.sslkey && opts.sslcert) {
opts.ssl = {
key: fs.readFileSync(path.resolve(opts.sslkey)),
cert: fs.readFileSync(path.resolve(opts.sslcert))
cert: fs.readFileSync(path.resolve(opts.sslcert)),
};

delete opts.sslkey;
delete opts.sslcert;
} else {
console.error("Warning: PeerServer will not run because either " +
"the key or the certificate has not been provided.");
console.error(
"Warning: PeerServer will not run because either " +
"the key or the certificate has not been provided."
);
process.exit(1);
}
}

const userPath = opts.path;
const server = PeerServer(opts, server => {
const server = PeerServer(opts, (server) => {
const host = server.address().address;
const port = server.address().port;

console.log(
"Started PeerServer on %s, port: %s, path: %s (v. %s)",
host, port, userPath || "/", version
host,
port,
userPath || "/",
version
);
});

server.on("connection", client => {
server.on("connection", (client) => {
console.log(`Client connected: ${client.getId()}`);
});

server.on("disconnect", client => {
server.on("disconnect", (client) => {
console.log(`Client disconnected: ${client.getId()}`);
});
Empty file modified changelog.md
100644 → 100755
Empty file.
Empty file modified dist/app.json
100644 → 100755
Empty file.
Empty file modified dist/src/api/index.js
100644 → 100755
Empty file.
Empty file modified dist/src/api/middleware/auth/index.js
100644 → 100755
Empty file.
Empty file modified dist/src/api/middleware/middleware.js
100644 → 100755
Empty file.
5 changes: 3 additions & 2 deletions dist/src/api/v1/calls/index.js
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ var __importDefault = (this && this.__importDefault) || function (mod) {
};
Object.defineProperty(exports, "__esModule", { value: true });
const express_1 = __importDefault(require("express"));
exports.default = ({ realm, messageHandler }) => {
exports.default = ({ realm, messageHandler, }) => {
const app = express_1.default.Router();
const handle = (req, res, next) => {
const { id } = req.params;
console.log("Got request...");
if (!id)
return next();
const client = realm.getClientById(id);
Expand All @@ -19,7 +20,7 @@ exports.default = ({ realm, messageHandler }) => {
type,
src: id,
dst,
payload
payload,
};
messageHandler.handle(client, message);
res.sendStatus(200);
Expand Down
Empty file modified dist/src/api/v1/public/index.js
100644 → 100755
Empty file.
7 changes: 5 additions & 2 deletions dist/src/config/index.js
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ const defaultConfig = {
cleanup_out_msgs: 1000,
ssl: {
key: "",
cert: ""
}
cert: "",
},
redis: false,
redisHost: "",
redisPort: 0,
};
exports.default = defaultConfig;
Empty file modified dist/src/enums.js
100644 → 100755
Empty file.
3 changes: 1 addition & 2 deletions dist/src/index.js
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ function ExpressPeerServer(server, options) {
}
app.on("mount", () => {
if (!server) {
throw new Error("Server is not passed to constructor - " +
"can't start PeerServer");
throw new Error("Server is not passed to constructor - " + "can't start PeerServer");
}
instance_1.createInstance({ app, server, options: newOptions });
});
Expand Down
18 changes: 11 additions & 7 deletions dist/src/instance.js
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,36 @@ const messagesExpire_1 = require("./services/messagesExpire");
const webSocketServer_1 = require("./services/webSocketServer");
const messageHandler_1 = require("./messageHandler");
const api_1 = require("./api");
exports.createInstance = ({ app, server, options }) => {
exports.createInstance = ({ app, server, options, }) => {
const config = options;
const realm = new realm_1.Realm();
const messageHandler = new messageHandler_1.MessageHandler(realm);
const api = api_1.Api({ config, realm, messageHandler });
const messagesExpire = new messagesExpire_1.MessagesExpire({ realm, config, messageHandler });
const messagesExpire = new messagesExpire_1.MessagesExpire({
realm,
config,
messageHandler,
});
const checkBrokenConnections = new checkBrokenConnections_1.CheckBrokenConnections({
realm,
config,
onClose: client => {
onClose: (client) => {
app.emit("disconnect", client);
}
},
});
app.use(options.path, api);
//use mountpath for WS server
const customConfig = Object.assign(Object.assign({}, config), { path: path_1.default.posix.join(app.path(), options.path, '/') });
const customConfig = Object.assign(Object.assign({}, config), { path: path_1.default.posix.join(app.path(), options.path, "/") });
const wss = new webSocketServer_1.WebSocketServer({
server,
realm,
config: customConfig
config: customConfig,
});
wss.on("connection", (client) => {
const messageQueue = realm.getMessageQueueById(client.getId());
if (messageQueue) {
let message;
while (message = messageQueue.readMessage()) {
while ((message = messageQueue.readMessage())) {
messageHandler.handle(client, message);
}
realm.clearMessageQueue(client.getId());
Expand Down
Empty file modified dist/src/messageHandler/handler.js
100644 → 100755
Empty file.
Empty file modified dist/src/messageHandler/handlers/heartbeat/index.js
100644 → 100755
Empty file.
Empty file modified dist/src/messageHandler/handlers/index.js
100644 → 100755
Empty file.
4 changes: 2 additions & 2 deletions dist/src/messageHandler/handlers/transmission/index.js
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const enums_1 = require("../../../enums");
exports.TransmissionHandler = ({ realm }) => {
exports.TransmissionHandler = ({ realm, }) => {
const handle = (client, message) => {
const type = message.type;
const srcId = message.src;
Expand Down Expand Up @@ -33,7 +33,7 @@ exports.TransmissionHandler = ({ realm }) => {
handle(client, {
type: enums_1.MessageType.LEAVE,
src: dstId,
dst: srcId
dst: srcId,
});
}
}
Expand Down
Empty file modified dist/src/messageHandler/handlersRegistry.js
100644 → 100755
Empty file.
Empty file modified dist/src/messageHandler/index.js
100644 → 100755
Empty file.
Empty file modified dist/src/models/client.js
100644 → 100755
Empty file.
Empty file modified dist/src/models/message.js
100644 → 100755
Empty file.
Empty file modified dist/src/models/messageQueue.js
100644 → 100755
Empty file.
Empty file modified dist/src/models/realm.js
100644 → 100755
Empty file.
Empty file modified dist/src/services/checkBrokenConnections/index.js
100644 → 100755
Empty file.
Empty file modified dist/src/services/messagesExpire/index.js
100644 → 100755
Empty file.
38 changes: 32 additions & 6 deletions dist/src/services/webSocketServer/index.js
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,39 @@ const url_1 = __importDefault(require("url"));
const ws_1 = __importDefault(require("ws"));
const enums_1 = require("../../enums");
const client_1 = require("../../models/client");
const WS_PATH = 'peerjs';
const Redis = require("ioredis");
const WS_PATH = "peerjs";
class WebSocketServer extends events_1.default {
constructor({ server, realm, config }) {
constructor({ server, realm, config, }) {
super();
this.setMaxListeners(0);
this.realm = realm;
this.config = config;
const path = this.config.path;
this.path = `${path}${path.endsWith('/') ? "" : "/"}${WS_PATH}`;
this.path = `${path}${path.endsWith("/") ? "" : "/"}${WS_PATH}`;
this.socketServer = new ws_1.default.Server({ path: this.path, server });
this.socketServer.on("connection", (socket, req) => this._onSocketConnection(socket, req));
this.socketServer.on("error", (error) => this._onSocketError(error));
if (config.redis) {
this.messagePublisher = new Redis(this.config.redisPort, this.config.redisHost);
this.messageSubscriber = new Redis(this.config.redisPort, this.config.redisHost);
this._configureRedis();
}
}
_configureRedis() {
this.messageSubscriber.subscribe("transmission", (err) => {
if (!err)
console.log("Subscribed to Transmission messages");
});
this.messageSubscriber.on("message", (channel, tmessage) => {
if (channel === "transmission") {
const receivedMessage = JSON.parse(tmessage);
if (receivedMessage.dst &&
this.realm.getClientById(receivedMessage.dst)) {
this.emit("message", undefined, receivedMessage);
}
}
});
}
_onSocketConnection(socket, req) {
const { query = {} } = url_1.default.parse(req.url, true);
Expand All @@ -36,7 +57,7 @@ class WebSocketServer extends events_1.default {
// ID-taken, invalid token
socket.send(JSON.stringify({
type: enums_1.MessageType.ID_TAKEN,
payload: { msg: "ID is taken" }
payload: { msg: "ID is taken" },
}));
return socket.close();
}
Expand All @@ -48,12 +69,13 @@ class WebSocketServer extends events_1.default {
// handle error
this.emit("error", error);
}
_registerClient({ socket, id, token }) {
_registerClient({ socket, id, token, }) {
// Check concurrent limit
const clientsCount = this.realm.getClientsIds().length;
if (clientsCount >= this.config.concurrent_limit) {
return this._sendErrorAndClose(socket, enums_1.Errors.CONNECTION_LIMIT_EXCEED);
}
console.log("NEW CLIENT:::", id);
const newClient = new client_1.Client({ id, token });
this.realm.setClient(newClient, id);
socket.send(JSON.stringify({ type: enums_1.MessageType.OPEN }));
Expand All @@ -73,6 +95,10 @@ class WebSocketServer extends events_1.default {
try {
const message = JSON.parse(data);
message.src = client.getId();
if (message.type !== "HEARTBEAT" && this.config.redis) {
this.messagePublisher.publish("transmission", JSON.stringify(message));
return;
}
this.emit("message", client, message);
}
catch (e) {
Expand All @@ -84,7 +110,7 @@ class WebSocketServer extends events_1.default {
_sendErrorAndClose(socket, msg) {
socket.send(JSON.stringify({
type: enums_1.MessageType.ERROR,
payload: { msg }
payload: { msg },
}));
socket.close();
}
Expand Down
Empty file modified dist/src/services/webSocketServer/webSocket.js
100644 → 100755
Empty file.
19 changes: 19 additions & 0 deletions dist/src/start.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const index_1 = require("./index");
const options = {
port: 9000,
expire_timeout: 5000,
alive_timeout: 60000,
key: "peerjs",
path: "/myapp",
concurrent_limit: 5000,
allow_discovery: false,
proxied: false,
cleanup_out_msgs: 1000,
redis: true,
redisHost: "127.0.0.1",
redisPort: 6379,
};
index_1.PeerServer(options);
console.log("Server started at port 9000");
15 changes: 15 additions & 0 deletions dist/src/utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const chalk = require("chalk");
const stackTrace = require("stack-trace");
exports.clog = (message) => {
console.log(chalk.blue(message));
};
exports.trace = () => {
stackTrace
.get()
.filter((site) => !site.getFileName().includes("node_modules"))
.map((site) => {
console.log(chalk.red(`${site.getFileName()} --- ${site.getLineNumber()}`));
});
};
Empty file modified index.d.ts
100644 → 100755
Empty file.
Loading