Skip to content

Commit

Permalink
refactor: broadcast events to the right person
Browse files Browse the repository at this point in the history
  • Loading branch information
Dolu89 committed Feb 11, 2023
1 parent 69aa1c0 commit 5739781
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 128 deletions.
2 changes: 1 addition & 1 deletion src/routes/root.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const root: FastifyPluginAsync = async (fastify, opts): Promise<void> => {
},
wsHandler: (connection, req) => {
connection.socket.on('message', async (message: WebSocket.RawData) => {
await WebSocketPool.broadcast(message.toString(), connection.socket, req.id)
await WebSocketPool.broadcastToRelays(message.toString(), connection.socket, req.id)
})
WebSocketPool.on(`message:${req.id}`, (message: string) => {
connection.socket.send(message)
Expand Down
117 changes: 109 additions & 8 deletions src/services/Event.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import clone from "lodash/clone";

export enum EventType {
Event = "EVENT",
Close = "CLOSE",
Expand All @@ -7,16 +9,115 @@ export enum EventType {
Ok = "OK",
}

export default class NostrEvent {
public static getEventType(event: string): EventType {
if (!this.isValidEventType(event)) {
throw new Error(`Invalid event type for event: ${event}`);
const proxyEventIdSeparator = "-proxy-";

export class Event {
public type: EventType;
public subscriptionId: string;
public proxySubscriptionId: string;

private nostrEvent: string;
public clientId: string;

constructor(type: EventType, subscriptionId: string, nostrEvent: string, clientId?: string) {
this.type = type;
this.subscriptionId = subscriptionId;
this.proxySubscriptionId = subscriptionId;
this.nostrEvent = nostrEvent;

// If clientId is provided, it means it's a CLIENT to RELAY event, clientId is the unique id of the query
if (clientId) {
// suffixSubId should be like this -proxy-req-X
const suffixSubId = `${proxyEventIdSeparator}${clientId}`;
const oldSubId = clone(subscriptionId);
let newSubId = clone(subscriptionId);
// truncate intial subscription id because some relay servers don't like it
// https://github.com/hoytech/strfry/blob/HEAD/src/Subscription.h#L11
if (oldSubId.length > 63 || oldSubId.length + suffixSubId.length > 63) {
// truncate to 63 characters
newSubId = oldSubId.slice(0, 63);
// remove enough characters to put the suffixSubId at the end
newSubId = newSubId.slice(0, newSubId.length - suffixSubId.length);
}
newSubId = `${newSubId}${suffixSubId}`;

this.clientId = clientId;
this.proxySubscriptionId = newSubId;
}
// If not provided, it means it's a RELAY to CLIENT event, clientId is in the nostrEvent
else {
const parts = this.subscriptionId.split(proxyEventIdSeparator);
this.clientId = parts[parts.length - 1];
}
}

return event as EventType;
public getNostrEventForClient(subscriptionId: string): string {
return this.nostrEvent.replace(this.proxySubscriptionId, subscriptionId);
}
public getNostrEventForRelay(): string {
if (this.type === EventType.Request || this.type === EventType.Close) {
const eventJson = JSON.parse(this.nostrEvent);
eventJson[1] = this.proxySubscriptionId;
return JSON.stringify(eventJson);
}
return this.nostrEvent;
}

}

function isValidEventType(eventType: string) {
return Object.values(EventType).includes(eventType as EventType);
}

private static isValidEventType(eventType: string) {
return Object.values(EventType).includes(eventType as EventType);
export function getEventType(event: string): EventType {
if (!isValidEventType(event)) {
throw new Error(`Invalid event type for event: ${event}`);
}
}

return event as EventType;
}

export function parseEvent(message: string, clientId?: string): Event {
const messageJson = JSON.parse(message);
if (messageJson.length === 0) {
throw new Error(`Invalid event: ${message}`, { cause: "A problem occured while JSON.parse(event)" });
}

const eventType = getEventType(messageJson[0]);
let subscriptionId = null;

if (eventType === EventType.Event) {
// Length of 2 means that the event is a PUSH event from CLIENT to RELAYS
if (messageJson.length === 2) {
subscriptionId = messageJson[1].id;
}
// Length of 3 means that the event is a GET event from RELAYS to CLIENT
else if (messageJson.length === 3) {
subscriptionId = messageJson[1];
}
else {
throw new Error(`Invalid event: ${message}`, { cause: "Invalid event length" });
}
}
else if (eventType === EventType.Request) {
if (messageJson.length !== 3) {
throw new Error(`Invalid event: ${message}`, { cause: "Invalid event length" });
}
subscriptionId = messageJson[1];
}
else if (eventType === EventType.Ok || eventType === EventType.Eose || eventType === EventType.Close) {
if (messageJson.length < 2) {
throw new Error(`Invalid event: ${message}`, { cause: "Invalid event length" });
}
subscriptionId = messageJson[1];
}
else if (eventType === EventType.Notice) {
// TODO
}
else {
throw new Error(`Invalid event: ${message}`, { cause: "Can't determine event type" });
}

return new Event(eventType, subscriptionId, message, clientId);
}

136 changes: 17 additions & 119 deletions src/services/WebSocketPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ import WebSocket from "ws";
import { getRelays } from "./env";
import Keyv from "keyv";
import crypto from "crypto";
import clone from "lodash/clone";
import NostrEvent, { EventType } from "./Event";
import { parseEvent } from "./Event";

class WebSocketPool extends EventEmitter {
private proxyEventIdSeparator = "-proxy-";

servers: { url: string; attempts: number; timeout: NodeJS.Timeout | null; }[];
maxAttempts: number;
Expand Down Expand Up @@ -66,8 +64,16 @@ class WebSocketPool extends EventEmitter {
return;
}
await this.cache.set(messageHash, message, 6000);
const clientId = await this.getRequestIdFromEvent(message);
this.emit(`message:${clientId}`, await this.restoreInitialSubscriptionId(message));

// const clientId = await this.getRequestIdFromEvent(message);
const event = parseEvent(message);
const initialSubscriptionId: string | null = await this.cache.get(event.proxySubscriptionId);
if (initialSubscriptionId) {
this.emit(`message:${event.clientId}`, event.getNostrEventForClient(initialSubscriptionId));
}
else {
console.error(`Subscription ID not found for client ${event.clientId}`)
}
});

socket.on("close", () => {
Expand All @@ -84,127 +90,19 @@ class WebSocketPool extends EventEmitter {
});
}

public async broadcast(data: string, exclude: WebSocket, clientId: string) {
public async broadcastToRelays(message: string, exclude: WebSocket, clientId: string) {
for (const socket of Object.values(this.sockets)) {
if (socket !== exclude) {
const message = await this.prepareMessageForClient(data, clientId);
if (!message) return;
socket.send(Buffer.from(message));
}
}
}

///
/// We need to change Subscription ID to be unique per client
/// [..., SubscriptionID, ...] => [..., SubscriptionID-proxy-clientId, ...]
/// SubscriptionID is always in the 2nd position
///

private async getRequestIdFromEvent(event: string) {
const eventJson = JSON.parse(event);
if (eventJson.length === 0) {
return null;
}

if (eventJson[0] === EventType.Request ||
eventJson[0] === EventType.Close ||
eventJson[0] === EventType.Eose ||
eventJson[0] === EventType.Event
) {
const parts = eventJson[1].split(this.proxyEventIdSeparator);
return parts[parts.length - 1];
}
else if (eventJson[0] === EventType.Ok) {
const clientId = await this.cache.get(eventJson[1])
if (!clientId) {
throw new Error(`ClientId not found for proxy event id: ${event}`);
const event = parseEvent(message, clientId)
await this.cache.set(event.proxySubscriptionId, event.subscriptionId, 6000);
const messageBuffer = Buffer.from(event.getNostrEventForRelay());
socket.send(messageBuffer);
}
return clientId;
}
else {
console.error(`No request id found for event: ${event}`)
return null;
}

}

private async getInitialSubscriptionIdFromCache(event: string): Promise<{ cacheKey: string, subId: string } | null> {
const eventJson = JSON.parse(event);
if (eventJson.length === 0) {
return null;
}
const proxySubId = eventJson[1]


const initialSubId = await this.cache.get(proxySubId)
if (!initialSubId) {
throw new Error(`Initial subscription id not found for proxy subscription id: ${proxySubId}`);
}
return { cacheKey: proxySubId, subId: initialSubId }
}

private async restoreInitialSubscriptionId(event: string) {
const eventJson = JSON.parse(event);
if (eventJson.length === 0) {
throw new Error("Invalid event");
}

if (eventJson[0] === EventType.Request ||
eventJson[0] === EventType.Close ||
eventJson[0] === EventType.Eose ||
eventJson[0] === EventType.Event) {
const cachedResult = await this.getInitialSubscriptionIdFromCache(event);

// If we can't find the initial subscription id, we just return the event as is
if (!cachedResult) return event;

eventJson[1] = cachedResult.subId;
return JSON.stringify(eventJson);
}
else {
return event;
}
}

private async prepareMessageForClient(message: string, clientId: string) {
const messageJson = JSON.parse(message);
if (messageJson.length === 0) {
return null;
}

const eventType = NostrEvent.getEventType(messageJson[0]);

// Changing subscription id only for request and close events
// it's used to identify the client that sent the request
if (eventType === EventType.Request || eventType === EventType.Close) {
const postSubId = `${this.proxyEventIdSeparator}${clientId}`;
let oldSubId = clone(messageJson[1]);
let newSubId = clone(messageJson[1]);

// truncate intial subscription id because some relay servers don't like it
// https://github.com/hoytech/strfry/blob/HEAD/src/Subscription.h#L11
if (oldSubId.length > 63 || oldSubId.length + postSubId.length > 63) {
// truncate to 63 characters
newSubId = oldSubId.slice(0, 63);
// remove enough characters to put the postSubId at the end
newSubId = newSubId.slice(0, newSubId.length - postSubId.length);
}

newSubId = `${newSubId}${postSubId}`;

// Caching the subscription id so we can use it later to send it back to the client
await this.cache.set(newSubId, oldSubId);

messageJson[1] = newSubId;

return JSON.stringify(messageJson);
}
// Events sent do not contains subscription id. We need to use event ID to identify the client
else if (eventType === EventType.Event) {
const eventId = messageJson[1].id;
await this.cache.set(eventId, clientId);
}
return message
}
public getRelays() {
return Object.keys(this.sockets);
}
Expand Down

0 comments on commit 5739781

Please sign in to comment.