Skip to content

Commit

Permalink
feat: Add handling of message sending limits by size in bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielmatau79 committed Nov 6, 2024
1 parent bcf4f7a commit 124f860
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 30 deletions.
7 changes: 7 additions & 0 deletions packages/server/src/config/app.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,11 @@ export default registerAs('appConfig', () => ({
*Allows set threshold time to execute messagePersist module on milisecond
*/
thresholdTimestamp: parseInt(process.env.THRESHOLD_TIMESTAMP) || 60000,

/**
* The maximum total message size in bytes.
* Defaults to 1 MB (1 * 1024 * 1024 bytes) if MAX_TOTAL_MESSAGE_SIZE_MB is not set in the environment variables.
* @type {number}
*/
maxMessageSizeBytes: (parseInt(process.env.MAX_MESSAGE_SIZE_BYTES, 10) || 1) * 1024 * 1024,
}))
8 changes: 8 additions & 0 deletions packages/server/src/websocket/schemas/StoreQueuedMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ export class StoreQueuedMessage extends Document {
@Prop({ type: Object, required: true })
encryptedMessage: EncryptedMessage

/**
* The size Encrypted Message store in collection
* @type {number}
*/
@Prop()
encryptedMessageSize?: number

/**
* The recipient keys (DIDs or other identifiers) associated with the message.
* @type {string[]}
Expand All @@ -44,6 +51,7 @@ export class StoreQueuedMessage extends Document {
*/
@Prop()
state?: string

/**
* The timestamp when the message was created.
* Mongoose automatically creates this field when `timestamps: true` is set in the schema.
Expand Down
1 change: 1 addition & 0 deletions packages/server/src/websocket/services/MessagePersister.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ export class MessagePersister {
connectionId: message.connectionId,
recipientKeys: message.recipientDids,
encryptedMessage: message.encryptedMessage,
encryptedMessageSize: message.encryptedMessageSize,
state: message.state,
createdAt: new Date(message.receivedAt),
})
Expand Down
82 changes: 52 additions & 30 deletions packages/server/src/websocket/websocket.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,54 +65,70 @@ export class WebsocketService {
* @returns {Promise<QueuedMessage[]>} - A promise that resolves to an array of queued messages.
*/
async takeFromQueue(dto: TakeFromQueueDto): Promise<QueuedMessage[]> {
const { connectionId, limit = 10, recipientDid } = dto
const { connectionId, recipientDid } = dto
const maxMessageSizeBytes = this.configService.get<number>('appConfig.maxMessageSizeBytes')
let currentSize = 0 // Accumulated size of messages in bytes
const combinedMessages: QueuedMessage[] = []

this.logger.debug('[takeFromQueue] Method called with DTO:', dto)

try {
// Retrieve messages from Redis
const redisMessagesRaw = await this.redis.lrange(`connectionId:${connectionId}:queuemessages`, 0, limit - 1)
const redisMessages: QueuedMessage[] = redisMessagesRaw.map((message) => {
const parsedMessage = JSON.parse(message)

// Map Redis data to QueuedMessage type
return {
id: parsedMessage.messageId,
receivedAt: new Date(parsedMessage.receivedAt),
encryptedMessage: parsedMessage.encryptedMessage,
}
})

this.logger.debug(
`[takeFromQueue] Fetched ${redisMessages.length} messages from Redis for connectionId ${connectionId}`,
)

// Query MongoDB with the provided connectionId or recipientDid, and state 'pending'
// Step 1: Retrieve messages from MongoDB, respecting the accumulated size limit
const mongoMessages = await this.queuedMessage
.find({
$or: [{ connectionId }, { recipientKeys: recipientDid }],
state: 'pending',
})
.sort({ createdAt: 1 })
.limit(limit)
.select({ messageId: 1, encryptedMessage: 1, createdAt: 1 })
.select({ messageId: 1, encryptedMessage: 1, createdAt: 1, encryptedMessageSize: 1 })
.lean()
.exec()

const mongoMappedMessages: QueuedMessage[] = mongoMessages.map((msg) => ({
id: msg.messageId,
receivedAt: msg.createdAt,
encryptedMessage: msg.encryptedMessage,
}))
for (const msg of mongoMessages) {
const messageSize = msg.encryptedMessageSize || Buffer.byteLength(JSON.stringify(msg.encryptedMessage), 'utf8')

// Check if adding this message would exceed the max message size limit
if (currentSize + messageSize > maxMessageSizeBytes) break

// Add message to the result and update the accumulated size
combinedMessages.push({
id: msg.messageId,
receivedAt: msg.createdAt,
encryptedMessage: msg.encryptedMessage,
})
currentSize += messageSize
}

// Step 2: Retrieve messages from Redis
const redisMessagesRaw = await this.redis.lrange(`connectionId:${connectionId}:queuemessages`, 0, -1)
for (const message of redisMessagesRaw) {
const parsedMessage = JSON.parse(message)
const messageSize =
parsedMessage.sizeInBytes || Buffer.byteLength(JSON.stringify(parsedMessage.encryptedMessage), 'utf8')

// Check if adding this message would exceed the max message size limit
if (currentSize + messageSize > maxMessageSizeBytes) break

// Add message to the result and update the accumulated size
combinedMessages.push({
id: parsedMessage.messageId,
receivedAt: new Date(parsedMessage.receivedAt),
encryptedMessage: parsedMessage.encryptedMessage,
})
currentSize += messageSize
}

this.logger.debug(
`[takeFromQueue] Fetched ${mongoMappedMessages.length} messages from MongoDB for connectionId ${connectionId}`,
`[takeFromQueue] Fetched ${combinedMessages.length} messages from Redis for connectionId ${connectionId}`,
)
// Combine messages from Redis and MongoDB
const combinedMessages: QueuedMessage[] = [...redisMessages, ...mongoMappedMessages]

this.logger.debug(`[takeFromQueue] combinedMessages for connectionId ${connectionId}: ${combinedMessages}`)
this.logger.debug(
`[takeFromQueue] Fetched ${combinedMessages.length} total messages (from Redis and MongoDB) in ${currentSize} MB for connectionId ${connectionId}`,
)

this.logger.debug(
`[takeFromQueue] Total message size to be sent for connectionId ${connectionId}: ${currentSize} bytes`,
)
return combinedMessages
} catch (error) {
this.logger.error('[takeFromQueue] Error retrieving messages from Redis and MongoDB:', {
Expand Down Expand Up @@ -178,13 +194,19 @@ export class WebsocketService {
messageId = new ObjectId().toString()
receivedAt = new Date()

// Calculate the size in bytes of the encrypted message to add database
const encryptedMessageSize = Buffer.byteLength(JSON.stringify(payload), 'utf8')

this.logger.debug(`[addMessage] Size Encrypted Message ${encryptedMessageSize} `)

// Create a message object to store in Redis
const messageData = {
messageId,
connectionId,
recipientDids,
encryptedMessage: payload,
state: MessageState.pending,
encryptedMessageSize,
receivedAt,
}

Expand Down

0 comments on commit 124f860

Please sign in to comment.