Skip to content

Commit

Permalink
feat: fix broadcast events to the right client (refacto needed)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dolu89 committed Feb 11, 2023
1 parent 3b18581 commit 69aa1c0
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 9 deletions.
6 changes: 5 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
"dev": "npm run build:ts && concurrently -k -p \"[{name}]\" -n \"TypeScript,App\" -c \"yellow.bold,cyan.bold\" \"npm:watch:ts\" \"npm:dev:start\"",
"dev:start": "fastify start --ignore-watch=.ts$ -w -l info -P dist/app.js"
},
"keywords": ["nostr"],
"keywords": [
"nostr"
],
"author": "Dolu",
"license": "MIT",
"dependencies": {
Expand All @@ -27,10 +29,12 @@
"fastify-cli": "^5.7.1",
"fastify-plugin": "^4.0.0",
"keyv": "^4.5.2",
"lodash": "^4.17.21",
"ws": "^8.12.0"
},
"devDependencies": {
"@types/ejs": "^3.1.1",
"@types/lodash": "^4.14.191",
"@types/node": "^18.0.0",
"@types/tap": "^15.0.5",
"@types/ws": "^8.5.4",
Expand Down
9 changes: 8 additions & 1 deletion pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions src/routes/root.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ const root: FastifyPluginAsync = async (fastify, opts): Promise<void> => {
const proxyUrl = getProxyUrl()
reply.view("/index.ejs", { connected, disconnected, total, proxyUrl });
},
wsHandler: (connection, _) => {
connection.socket.on('message', (message: WebSocket.RawData) => {
WebSocketPool.broadcast(message.toString(), connection.socket)
wsHandler: (connection, req) => {
connection.socket.on('message', async (message: WebSocket.RawData) => {
await WebSocketPool.broadcast(message.toString(), connection.socket, req.id)
})
WebSocketPool.on('message', (message: string) => {
WebSocketPool.on(`message:${req.id}`, (message: string) => {
connection.socket.send(message)
})
}
Expand Down
22 changes: 22 additions & 0 deletions src/services/Event.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
export enum EventType {
Event = "EVENT",
Close = "CLOSE",
Request = "REQ",
Eose = "EOSE",
Notice = "NOTICE",
Ok = "OK",
}

export default class NostrEvent {
public static getEventType(event: string): EventType {
if (!this.isValidEventType(event)) {
throw new Error(`Invalid event type for event: ${event}`);
}

return event as EventType;
}

private static isValidEventType(eventType: string) {
return Object.values(EventType).includes(eventType as EventType);
}
}
124 changes: 121 additions & 3 deletions src/services/WebSocketPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ import WebSocket from "ws";
import { getRelays } from "./env";
import Keyv from "keyv";
import crypto from "crypto";
import clone from "lodash/clone";
import NostrEvent, { EventType } from "./Event";

class WebSocketPool extends EventEmitter {
private proxyEventIdSeparator = "-proxy-";

servers: { url: string; attempts: number; timeout: NodeJS.Timeout | null; }[];
maxAttempts: number;
resetTimeout: number;
Expand Down Expand Up @@ -62,7 +66,8 @@ class WebSocketPool extends EventEmitter {
return;
}
await this.cache.set(messageHash, message, 6000);
this.emit("message", message);
const clientId = await this.getRequestIdFromEvent(message);
this.emit(`message:${clientId}`, await this.restoreInitialSubscriptionId(message));
});

socket.on("close", () => {
Expand All @@ -79,14 +84,127 @@ class WebSocketPool extends EventEmitter {
});
}

public broadcast(data: string, exclude: WebSocket) {
public async broadcast(data: string, exclude: WebSocket, clientId: string) {
for (const socket of Object.values(this.sockets)) {
if (socket !== exclude) {
socket.send(Buffer.from(data));
const message = await this.prepareMessageForClient(data, clientId);
if (!message) return;
socket.send(Buffer.from(message));
}
}
}

///
/// We need to change Subscription ID to be unique per client
/// [..., SubscriptionID, ...] => [..., SubscriptionID-proxy-clientId, ...]
/// SubscriptionID is always in the 2nd position
///

private async getRequestIdFromEvent(event: string) {
const eventJson = JSON.parse(event);
if (eventJson.length === 0) {
return null;
}

if (eventJson[0] === EventType.Request ||
eventJson[0] === EventType.Close ||
eventJson[0] === EventType.Eose ||
eventJson[0] === EventType.Event
) {
const parts = eventJson[1].split(this.proxyEventIdSeparator);
return parts[parts.length - 1];
}
else if (eventJson[0] === EventType.Ok) {
const clientId = await this.cache.get(eventJson[1])
if (!clientId) {
throw new Error(`ClientId not found for proxy event id: ${event}`);
}
return clientId;
}
else {
console.error(`No request id found for event: ${event}`)
return null;
}

}

private async getInitialSubscriptionIdFromCache(event: string): Promise<{ cacheKey: string, subId: string } | null> {
const eventJson = JSON.parse(event);
if (eventJson.length === 0) {
return null;
}
const proxySubId = eventJson[1]

const initialSubId = await this.cache.get(proxySubId)
if (!initialSubId) {
throw new Error(`Initial subscription id not found for proxy subscription id: ${proxySubId}`);
}
return { cacheKey: proxySubId, subId: initialSubId }
}

private async restoreInitialSubscriptionId(event: string) {
const eventJson = JSON.parse(event);
if (eventJson.length === 0) {
throw new Error("Invalid event");
}

if (eventJson[0] === EventType.Request ||
eventJson[0] === EventType.Close ||
eventJson[0] === EventType.Eose ||
eventJson[0] === EventType.Event) {
const cachedResult = await this.getInitialSubscriptionIdFromCache(event);

// If we can't find the initial subscription id, we just return the event as is
if (!cachedResult) return event;

eventJson[1] = cachedResult.subId;
return JSON.stringify(eventJson);
}
else {
return event;
}
}

private async prepareMessageForClient(message: string, clientId: string) {
const messageJson = JSON.parse(message);
if (messageJson.length === 0) {
return null;
}

const eventType = NostrEvent.getEventType(messageJson[0]);

// Changing subscription id only for request and close events
// it's used to identify the client that sent the request
if (eventType === EventType.Request || eventType === EventType.Close) {
const postSubId = `${this.proxyEventIdSeparator}${clientId}`;
let oldSubId = clone(messageJson[1]);
let newSubId = clone(messageJson[1]);

// truncate intial subscription id because some relay servers don't like it
// https://github.com/hoytech/strfry/blob/HEAD/src/Subscription.h#L11
if (oldSubId.length > 63 || oldSubId.length + postSubId.length > 63) {
// truncate to 63 characters
newSubId = oldSubId.slice(0, 63);
// remove enough characters to put the postSubId at the end
newSubId = newSubId.slice(0, newSubId.length - postSubId.length);
}

newSubId = `${newSubId}${postSubId}`;

// Caching the subscription id so we can use it later to send it back to the client
await this.cache.set(newSubId, oldSubId);

messageJson[1] = newSubId;

return JSON.stringify(messageJson);
}
// Events sent do not contains subscription id. We need to use event ID to identify the client
else if (eventType === EventType.Event) {
const eventId = messageJson[1].id;
await this.cache.set(eventId, clientId);
}
return message
}
public getRelays() {
return Object.keys(this.sockets);
}
Expand Down

0 comments on commit 69aa1c0

Please sign in to comment.