From 22848d7df5a5b8c02055a12b6fda5d299b26ad28 Mon Sep 17 00:00:00 2001 From: Gabriel Mata Date: Thu, 24 Oct 2024 13:46:25 -0500 Subject: [PATCH] fix: ensure single Redis message listener and include new handle messages (#19) --- .../server/src/websocket/websocket.service.ts | 100 ++++++++++++++---- 1 file changed, 78 insertions(+), 22 deletions(-) diff --git a/packages/server/src/websocket/websocket.service.ts b/packages/server/src/websocket/websocket.service.ts index e1a407c..b137cf9 100644 --- a/packages/server/src/websocket/websocket.service.ts +++ b/packages/server/src/websocket/websocket.service.ts @@ -37,6 +37,7 @@ export class WebsocketService { this.logger = new Logger(WebsocketService.name) this.redisSubscriber = this.redis.duplicate() this.redisPublisher = this.redis.duplicate() + this.initializeRedisMessageListener() } async onModuleInit() { @@ -385,6 +386,7 @@ export class WebsocketService { const sessionKey = `liveSession:${connectionId}` const response = await this.redis.hmset(sessionKey, { sessionId, + socket_id, }) this.logger.debug('[addLiveSession] response:', { response }) @@ -415,28 +417,6 @@ export class WebsocketService { this.logger.log(`Subscribed ${count} to ${connectionId} channel.`) }) - this.logger.log(`Listener event count: ${this.redisSubscriber.listenerCount('message')}`) - - // Handle messages received on the subscribed Redis channel - if (this.redisSubscriber.listenerCount('message') === 0) { - this.redisSubscriber.on('message', (channel: string, message: string) => { - if (channel === connectionId) { - this.logger.log(`*** [redisSubscriber] Received message from ${channel}: ${message} **`) - - const jsonRpcResponse: JsonRpcResponseSubscriber = { - jsonrpc: '2.0', - method: 'messageReceive', - params: { - connectionId, - message: JSON.parse(message), - id: dto.id, - }, - } - - this.sendMessageToClientById(socket_id, jsonRpcResponse) - } - }) - } return true } else { this.logger.error('[addLiveSession] Failed to add LiveSession', { connectionId }) @@ -642,4 +622,80 @@ export class WebsocketService { throw new Error(`[sendMessageToClientById] Failed to send message: ${error.message}`) } } + + /** + * Initializes the Redis message listener to handle incoming messages from subscribed channels. + * This listener will log and delegate message handling to the `handleMessage` method. + */ + private initializeRedisMessageListener(): void { + this.logger.log('[initializeRedisMessageListener] Initializing Redis message listener') + + try { + // Register the message listener for Redis channels + this.redisSubscriber.on('message', (channel: string, message: string) => { + this.logger.log(`*** [initializeRedisMessageListener] Received message from ${channel}: ${message} **`) + + // Delegate message processing to the handleMessage method + this.handleMessage(channel, message) + }) + + this.logger.log('[initializeRedisMessageListener] Listener successfully registered') + } catch (error) { + // Log any errors that occur during listener initialization + this.logger.error('[initializeRedisMessageListener] Error initializing message listener', { + error: error.message, + }) + } + } + + /** + * Handles incoming messages from a Redis channel, retrieves the associated socket ID from Redis, + * and sends the message to the corresponding WebSocket client. + * + * @param {string} channel - The Redis channel (which corresponds to a connectionId) from which the message was received. + * @param {string} message - The message content received from the Redis channel. + * @returns {Promise} - Returns nothing, but logs errors or actions. + */ + private async handleMessage(channel: string, message: string): Promise { + this.logger.log(`[handleMessage] Processing message for channel: ${channel}`) + + try { + // Recover the session data (including socket_id) from Redis using the connectionId (channel) + const sessionKey = `liveSession:${channel}` + const sessionData = await this.redis.hgetall(sessionKey) + + if (!sessionData) { + this.logger.error(`[handleMessage] No session data found for connectionId: ${channel}`) + return // Exit if no session data is found in Redis + } + + const socket_id = sessionData.socket_id // Retrieve the socket_id associated with the connectionId + this.logger.debug(`[handleMessage] Recovered socket_id: ${socket_id} for connectionId: ${channel}`) + + if (!socket_id) { + this.logger.error(`[handleMessage] No socket_id found for connectionId: ${channel}`) + return // Exit if socket_id is not found in the session data + } + + // Parse and process the received message, and construct the JSON-RPC response + const jsonRpcResponse: JsonRpcResponseSubscriber = { + jsonrpc: '2.0', + method: 'messageReceive', + params: { + connectionId: channel, // The channel is treated as the connectionId + message: JSON.parse(message), // Parse the message to ensure it's valid JSON + id: '', + }, + } + + // Send the processed message to the WebSocket client using the recovered socket_id + await this.sendMessageToClientById(socket_id, jsonRpcResponse) + this.logger.log(`[handleMessage] Message sent to socket_id: ${socket_id}`) + } catch (error) { + // Log any errors encountered during the handling of the message + this.logger.error(`[handleMessage] Error processing message for channel: ${channel}`, { + error: error.message, + }) + } + } }