Skip to content

Commit

Permalink
fix: ensure single Redis message listener and include new handle mess…
Browse files Browse the repository at this point in the history
…ages (#19)
  • Loading branch information
gabrielmatau79 authored Oct 24, 2024
1 parent c19ec29 commit 22848d7
Showing 1 changed file with 78 additions and 22 deletions.
100 changes: 78 additions & 22 deletions packages/server/src/websocket/websocket.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ export class WebsocketService {
this.logger = new Logger(WebsocketService.name)
this.redisSubscriber = this.redis.duplicate()
this.redisPublisher = this.redis.duplicate()
this.initializeRedisMessageListener()
}

async onModuleInit() {
Expand Down Expand Up @@ -385,6 +386,7 @@ export class WebsocketService {
const sessionKey = `liveSession:${connectionId}`
const response = await this.redis.hmset(sessionKey, {
sessionId,
socket_id,
})

this.logger.debug('[addLiveSession] response:', { response })
Expand Down Expand Up @@ -415,28 +417,6 @@ export class WebsocketService {
this.logger.log(`Subscribed ${count} to ${connectionId} channel.`)
})

this.logger.log(`Listener event count: ${this.redisSubscriber.listenerCount('message')}`)

// Handle messages received on the subscribed Redis channel
if (this.redisSubscriber.listenerCount('message') === 0) {
this.redisSubscriber.on('message', (channel: string, message: string) => {
if (channel === connectionId) {
this.logger.log(`*** [redisSubscriber] Received message from ${channel}: ${message} **`)

const jsonRpcResponse: JsonRpcResponseSubscriber = {
jsonrpc: '2.0',
method: 'messageReceive',
params: {
connectionId,
message: JSON.parse(message),
id: dto.id,
},
}

this.sendMessageToClientById(socket_id, jsonRpcResponse)
}
})
}
return true
} else {
this.logger.error('[addLiveSession] Failed to add LiveSession', { connectionId })
Expand Down Expand Up @@ -642,4 +622,80 @@ export class WebsocketService {
throw new Error(`[sendMessageToClientById] Failed to send message: ${error.message}`)
}
}

/**
* Initializes the Redis message listener to handle incoming messages from subscribed channels.
* This listener will log and delegate message handling to the `handleMessage` method.
*/
private initializeRedisMessageListener(): void {
this.logger.log('[initializeRedisMessageListener] Initializing Redis message listener')

try {
// Register the message listener for Redis channels
this.redisSubscriber.on('message', (channel: string, message: string) => {
this.logger.log(`*** [initializeRedisMessageListener] Received message from ${channel}: ${message} **`)

// Delegate message processing to the handleMessage method
this.handleMessage(channel, message)
})

this.logger.log('[initializeRedisMessageListener] Listener successfully registered')
} catch (error) {
// Log any errors that occur during listener initialization
this.logger.error('[initializeRedisMessageListener] Error initializing message listener', {
error: error.message,
})
}
}

/**
* Handles incoming messages from a Redis channel, retrieves the associated socket ID from Redis,
* and sends the message to the corresponding WebSocket client.
*
* @param {string} channel - The Redis channel (which corresponds to a connectionId) from which the message was received.
* @param {string} message - The message content received from the Redis channel.
* @returns {Promise<void>} - Returns nothing, but logs errors or actions.
*/
private async handleMessage(channel: string, message: string): Promise<void> {
this.logger.log(`[handleMessage] Processing message for channel: ${channel}`)

try {
// Recover the session data (including socket_id) from Redis using the connectionId (channel)
const sessionKey = `liveSession:${channel}`
const sessionData = await this.redis.hgetall(sessionKey)

if (!sessionData) {
this.logger.error(`[handleMessage] No session data found for connectionId: ${channel}`)
return // Exit if no session data is found in Redis
}

const socket_id = sessionData.socket_id // Retrieve the socket_id associated with the connectionId
this.logger.debug(`[handleMessage] Recovered socket_id: ${socket_id} for connectionId: ${channel}`)

if (!socket_id) {
this.logger.error(`[handleMessage] No socket_id found for connectionId: ${channel}`)
return // Exit if socket_id is not found in the session data
}

// Parse and process the received message, and construct the JSON-RPC response
const jsonRpcResponse: JsonRpcResponseSubscriber = {
jsonrpc: '2.0',
method: 'messageReceive',
params: {
connectionId: channel, // The channel is treated as the connectionId
message: JSON.parse(message), // Parse the message to ensure it's valid JSON
id: '',
},
}

// Send the processed message to the WebSocket client using the recovered socket_id
await this.sendMessageToClientById(socket_id, jsonRpcResponse)
this.logger.log(`[handleMessage] Message sent to socket_id: ${socket_id}`)
} catch (error) {
// Log any errors encountered during the handling of the message
this.logger.error(`[handleMessage] Error processing message for channel: ${channel}`, {
error: error.message,
})
}
}
}

0 comments on commit 22848d7

Please sign in to comment.