From 124f8603e983f2670c0763b873c6af1edba67c7f Mon Sep 17 00:00:00 2001 From: Gabriel Mata Date: Wed, 6 Nov 2024 14:09:18 -0500 Subject: [PATCH] feat: Add handling of message sending limits by size in bytes --- packages/server/src/config/app.config.ts | 7 ++ .../websocket/schemas/StoreQueuedMessage.ts | 8 ++ .../websocket/services/MessagePersister.ts | 1 + .../server/src/websocket/websocket.service.ts | 82 ++++++++++++------- 4 files changed, 68 insertions(+), 30 deletions(-) diff --git a/packages/server/src/config/app.config.ts b/packages/server/src/config/app.config.ts index 5a2c484..c86dd1c 100644 --- a/packages/server/src/config/app.config.ts +++ b/packages/server/src/config/app.config.ts @@ -65,4 +65,11 @@ export default registerAs('appConfig', () => ({ *Allows set threshold time to execute messagePersist module on milisecond */ thresholdTimestamp: parseInt(process.env.THRESHOLD_TIMESTAMP) || 60000, + + /** + * The maximum total message size in bytes. + * Defaults to 1 MB (1 * 1024 * 1024 bytes) if MAX_TOTAL_MESSAGE_SIZE_MB is not set in the environment variables. + * @type {number} + */ + maxMessageSizeBytes: (parseInt(process.env.MAX_MESSAGE_SIZE_BYTES, 10) || 1) * 1024 * 1024, })) diff --git a/packages/server/src/websocket/schemas/StoreQueuedMessage.ts b/packages/server/src/websocket/schemas/StoreQueuedMessage.ts index 4905b05..810829f 100644 --- a/packages/server/src/websocket/schemas/StoreQueuedMessage.ts +++ b/packages/server/src/websocket/schemas/StoreQueuedMessage.ts @@ -31,6 +31,13 @@ export class StoreQueuedMessage extends Document { @Prop({ type: Object, required: true }) encryptedMessage: EncryptedMessage + /** + * The size Encrypted Message store in collection + * @type {number} + */ + @Prop() + encryptedMessageSize?: number + /** * The recipient keys (DIDs or other identifiers) associated with the message. * @type {string[]} @@ -44,6 +51,7 @@ export class StoreQueuedMessage extends Document { */ @Prop() state?: string + /** * The timestamp when the message was created. * Mongoose automatically creates this field when `timestamps: true` is set in the schema. diff --git a/packages/server/src/websocket/services/MessagePersister.ts b/packages/server/src/websocket/services/MessagePersister.ts index 647d777..53eb99c 100644 --- a/packages/server/src/websocket/services/MessagePersister.ts +++ b/packages/server/src/websocket/services/MessagePersister.ts @@ -62,6 +62,7 @@ export class MessagePersister { connectionId: message.connectionId, recipientKeys: message.recipientDids, encryptedMessage: message.encryptedMessage, + encryptedMessageSize: message.encryptedMessageSize, state: message.state, createdAt: new Date(message.receivedAt), }) diff --git a/packages/server/src/websocket/websocket.service.ts b/packages/server/src/websocket/websocket.service.ts index 729c99d..a3e10a9 100644 --- a/packages/server/src/websocket/websocket.service.ts +++ b/packages/server/src/websocket/websocket.service.ts @@ -65,54 +65,70 @@ export class WebsocketService { * @returns {Promise} - A promise that resolves to an array of queued messages. */ async takeFromQueue(dto: TakeFromQueueDto): Promise { - const { connectionId, limit = 10, recipientDid } = dto + const { connectionId, recipientDid } = dto + const maxMessageSizeBytes = this.configService.get('appConfig.maxMessageSizeBytes') + let currentSize = 0 // Accumulated size of messages in bytes + const combinedMessages: QueuedMessage[] = [] this.logger.debug('[takeFromQueue] Method called with DTO:', dto) try { - // Retrieve messages from Redis - const redisMessagesRaw = await this.redis.lrange(`connectionId:${connectionId}:queuemessages`, 0, limit - 1) - const redisMessages: QueuedMessage[] = redisMessagesRaw.map((message) => { - const parsedMessage = JSON.parse(message) - - // Map Redis data to QueuedMessage type - return { - id: parsedMessage.messageId, - receivedAt: new Date(parsedMessage.receivedAt), - encryptedMessage: parsedMessage.encryptedMessage, - } - }) - - this.logger.debug( - `[takeFromQueue] Fetched ${redisMessages.length} messages from Redis for connectionId ${connectionId}`, - ) - - // Query MongoDB with the provided connectionId or recipientDid, and state 'pending' + // Step 1: Retrieve messages from MongoDB, respecting the accumulated size limit const mongoMessages = await this.queuedMessage .find({ $or: [{ connectionId }, { recipientKeys: recipientDid }], state: 'pending', }) .sort({ createdAt: 1 }) - .limit(limit) - .select({ messageId: 1, encryptedMessage: 1, createdAt: 1 }) + .select({ messageId: 1, encryptedMessage: 1, createdAt: 1, encryptedMessageSize: 1 }) .lean() .exec() - const mongoMappedMessages: QueuedMessage[] = mongoMessages.map((msg) => ({ - id: msg.messageId, - receivedAt: msg.createdAt, - encryptedMessage: msg.encryptedMessage, - })) + for (const msg of mongoMessages) { + const messageSize = msg.encryptedMessageSize || Buffer.byteLength(JSON.stringify(msg.encryptedMessage), 'utf8') + + // Check if adding this message would exceed the max message size limit + if (currentSize + messageSize > maxMessageSizeBytes) break + + // Add message to the result and update the accumulated size + combinedMessages.push({ + id: msg.messageId, + receivedAt: msg.createdAt, + encryptedMessage: msg.encryptedMessage, + }) + currentSize += messageSize + } + + // Step 2: Retrieve messages from Redis + const redisMessagesRaw = await this.redis.lrange(`connectionId:${connectionId}:queuemessages`, 0, -1) + for (const message of redisMessagesRaw) { + const parsedMessage = JSON.parse(message) + const messageSize = + parsedMessage.sizeInBytes || Buffer.byteLength(JSON.stringify(parsedMessage.encryptedMessage), 'utf8') + + // Check if adding this message would exceed the max message size limit + if (currentSize + messageSize > maxMessageSizeBytes) break + + // Add message to the result and update the accumulated size + combinedMessages.push({ + id: parsedMessage.messageId, + receivedAt: new Date(parsedMessage.receivedAt), + encryptedMessage: parsedMessage.encryptedMessage, + }) + currentSize += messageSize + } this.logger.debug( - `[takeFromQueue] Fetched ${mongoMappedMessages.length} messages from MongoDB for connectionId ${connectionId}`, + `[takeFromQueue] Fetched ${combinedMessages.length} messages from Redis for connectionId ${connectionId}`, ) - // Combine messages from Redis and MongoDB - const combinedMessages: QueuedMessage[] = [...redisMessages, ...mongoMappedMessages] - this.logger.debug(`[takeFromQueue] combinedMessages for connectionId ${connectionId}: ${combinedMessages}`) + this.logger.debug( + `[takeFromQueue] Fetched ${combinedMessages.length} total messages (from Redis and MongoDB) in ${currentSize} MB for connectionId ${connectionId}`, + ) + this.logger.debug( + `[takeFromQueue] Total message size to be sent for connectionId ${connectionId}: ${currentSize} bytes`, + ) return combinedMessages } catch (error) { this.logger.error('[takeFromQueue] Error retrieving messages from Redis and MongoDB:', { @@ -178,6 +194,11 @@ export class WebsocketService { messageId = new ObjectId().toString() receivedAt = new Date() + // Calculate the size in bytes of the encrypted message to add database + const encryptedMessageSize = Buffer.byteLength(JSON.stringify(payload), 'utf8') + + this.logger.debug(`[addMessage] Size Encrypted Message ${encryptedMessageSize} `) + // Create a message object to store in Redis const messageData = { messageId, @@ -185,6 +206,7 @@ export class WebsocketService { recipientDids, encryptedMessage: payload, state: MessageState.pending, + encryptedMessageSize, receivedAt, }