diff --git a/packages/server/src/websocket/websocket.service.ts b/packages/server/src/websocket/websocket.service.ts index a3e10a9..67ba882 100644 --- a/packages/server/src/websocket/websocket.service.ts +++ b/packages/server/src/websocket/websocket.service.ts @@ -88,7 +88,9 @@ export class WebsocketService { const messageSize = msg.encryptedMessageSize || Buffer.byteLength(JSON.stringify(msg.encryptedMessage), 'utf8') // Check if adding this message would exceed the max message size limit - if (currentSize + messageSize > maxMessageSizeBytes) break + if (currentSize + messageSize > maxMessageSizeBytes) { + return combinedMessages + } // Add message to the result and update the accumulated size combinedMessages.push({ @@ -101,13 +103,16 @@ export class WebsocketService { // Step 2: Retrieve messages from Redis const redisMessagesRaw = await this.redis.lrange(`connectionId:${connectionId}:queuemessages`, 0, -1) + for (const message of redisMessagesRaw) { const parsedMessage = JSON.parse(message) const messageSize = parsedMessage.sizeInBytes || Buffer.byteLength(JSON.stringify(parsedMessage.encryptedMessage), 'utf8') // Check if adding this message would exceed the max message size limit - if (currentSize + messageSize > maxMessageSizeBytes) break + if (currentSize + messageSize > maxMessageSizeBytes) { + return combinedMessages + } // Add message to the result and update the accumulated size combinedMessages.push({ @@ -119,11 +124,7 @@ export class WebsocketService { } this.logger.debug( - `[takeFromQueue] Fetched ${combinedMessages.length} messages from Redis for connectionId ${connectionId}`, - ) - - this.logger.debug( - `[takeFromQueue] Fetched ${combinedMessages.length} total messages (from Redis and MongoDB) in ${currentSize} MB for connectionId ${connectionId}`, + `[takeFromQueue] Fetched ${combinedMessages.length} total messages for connectionId ${connectionId}`, ) this.logger.debug(