diff --git a/packages/client/src/MessagePickupRepositoryClient.ts b/packages/client/src/MessagePickupRepositoryClient.ts index 4c767e2..0975e68 100644 --- a/packages/client/src/MessagePickupRepositoryClient.ts +++ b/packages/client/src/MessagePickupRepositoryClient.ts @@ -1,10 +1,10 @@ import { Client } from 'rpc-websockets' import log from 'loglevel' import { - JsonRpcParamsMessage, RemoveAllMessagesOptions, ConnectionIdOptions, AddLiveSessionOptions, + MessageReceivedCallbackParams, } from './interfaces' import { AddMessageOptions, @@ -20,7 +20,7 @@ log.setLevel('info') export class MessagePickupRepositoryClient implements MessagePickupRepository { private client?: Client private readonly logger = log - private messageReceivedCallback: ((data: JsonRpcParamsMessage) => void) | null = null + private messageReceivedCallback: ((data: MessageReceivedCallbackParams) => void) | null = null constructor(private readonly url: string) {} @@ -41,7 +41,7 @@ export class MessagePickupRepositoryClient implements MessagePickupRepository { client.addListener('messageReceive', (data) => { if (this.messageReceivedCallback) { - this.messageReceivedCallback(data) + this.messageReceivedCallback(data as MessageReceivedCallbackParams) } else { this.logger.log('Received message event, but no callback is registered:', data) } @@ -72,19 +72,20 @@ export class MessagePickupRepositoryClient implements MessagePickupRepository { * @param callback - The callback function to be invoked when 'messageReceive' is triggered. * The callback receives a `data` parameter of type `JsonRpcParamsMessage`, containing: * - * @param {JsonRpcParamsMessage} data - The data received via the 'messageReceive' event. + * @param {MessageReceivedCallbackParams} data - The data received via the 'messageReceive' event. * * @param {string} data.connectionId - The ID of the connection associated with the message. - * @param {QueuedMessage[]} data.message - An array of queued messages received. + * @param {QueuedMessage[]} data.message - Array of queued messages received. * @param {string} [data.id] - (Optional) The identifier for the JSON-RPC message. * * @example - * messageReceived((data: JsonRpcParamsMessage) => { + * messageReceived((data: MessageReceivedCallbackParams) => { + * const { connectionId, message } = data * console.log('ConnectionId:', data.connectionId); - * console.log('Messages:', data.message); + * console.log('Message:', message[0].id) * }); */ - messageReceived(callback: (data: JsonRpcParamsMessage) => void): void { + messageReceived(callback: (data: MessageReceivedCallbackParams) => void): void { this.messageReceivedCallback = callback } diff --git a/packages/client/src/interfaces.ts b/packages/client/src/interfaces.ts index f857552..39aa18f 100644 --- a/packages/client/src/interfaces.ts +++ b/packages/client/src/interfaces.ts @@ -1,11 +1,5 @@ import { QueuedMessage } from '@credo-ts/core' -export interface JsonRpcParamsMessage { - connectionId: string - message: QueuedMessage - id?: string -} - export interface RemoveAllMessagesOptions { connectionId: string recipientDid: string @@ -19,3 +13,8 @@ export interface AddLiveSessionOptions { connectionId: string sessionId: string } + +export interface MessageReceivedCallbackParams { + connectionId: string + message: QueuedMessage[] +} diff --git a/packages/server/src/websocket/websocket.service.spec.ts b/packages/server/src/websocket/websocket.service.spec.ts index 31d0f7a..039df06 100644 --- a/packages/server/src/websocket/websocket.service.spec.ts +++ b/packages/server/src/websocket/websocket.service.spec.ts @@ -110,21 +110,23 @@ describe('WebsocketService', () => { expect(result[1].encryptedMessage).toBe('test-message-2') // Verifica por 'id' }) - it('should getAvailableMessageCount of available messages from Redis', async () => { - // Mock Redis to return a predefined count of messages - jest.spyOn(redisMock, 'llen').mockResolvedValue(5) + it('should getAvailableMessageCount of available messages from Redis and MongoDB', async () => { + const connectionId = 'test-connection-id' + + // Mock Redis response + jest.spyOn(redisMock, 'llen').mockResolvedValue(5) // Simulate 5 messages in Redis. + + // Mock MongoDB countDocuments response + storeQueuedMessageMock.countDocuments = jest.fn().mockResolvedValue(3) //Simulate 3 messages in MongoDB. - // Execute the getAvailableMessageCount method const result = await service.getAvailableMessageCount({ connectionId: 'test-connection-id', id: '1', }) - // Verify that Redis was called with the correct key - expect(redisMock.llen).toHaveBeenCalledWith('connectionId:test-connection-id:queuemessages') - - // Verify the returned count of available messages - expect(result).toBe(5) + expect(result).toBe(8) // 5 messages from Redis + 3 messages from MongoDB + expect(redisMock.llen).toHaveBeenCalledWith(`connectionId:${connectionId}:queuemessages`) + expect(storeQueuedMessageMock.countDocuments).toHaveBeenCalledWith({ connectionId }) }) it('should addmessage method to the queue and publish it to Redis', async () => { @@ -214,7 +216,7 @@ describe('WebsocketService', () => { // Verify that messages were removed from MongoDB expect(storeQueuedMessageMock.deleteMany).toHaveBeenCalledWith({ connectionId: removeMessagesDto.connectionId, - _id: { $in: removeMessagesDto.messageIds.map((id) => new Object(id)) }, + messageId: { $in: removeMessagesDto.messageIds.map((id) => new Object(id)) }, }) }) }) diff --git a/packages/server/src/websocket/websocket.service.ts b/packages/server/src/websocket/websocket.service.ts index 5a4f4b8..1b7435e 100644 --- a/packages/server/src/websocket/websocket.service.ts +++ b/packages/server/src/websocket/websocket.service.ts @@ -107,10 +107,11 @@ export class WebsocketService { this.logger.debug( `[takeFromQueue] Fetched ${mongoMappedMessages.length} messages from MongoDB for connectionId ${connectionId}`, ) - // Combine messages from Redis and MongoDB const combinedMessages: QueuedMessage[] = [...redisMessages, ...mongoMappedMessages] + this.logger.debug(`[takeFromQueue] combinedMessages for connectionId ${connectionId}: ${combinedMessages}`) + return combinedMessages } catch (error) { this.logger.error('[takeFromQueue] Error retrieving messages from Redis and MongoDB:', { @@ -135,7 +136,11 @@ export class WebsocketService { try { // retrieve the list count of messages for the connection - const messageCount = await this.redis.llen(`connectionId:${connectionId}:queuemessages`) + const redisMessageCount = await this.redis.llen(`connectionId:${connectionId}:queuemessages`) + + const mongoMessageCount = await this.queuedMessage.countDocuments({ connectionId }) + + const messageCount = redisMessageCount + mongoMessageCount this.logger.debug(`[getAvailableMessageCount] Message count retrieved for connectionId ${connectionId}`, { messageCount, @@ -186,11 +191,13 @@ export class WebsocketService { await this.redis.rpush(`connectionId:${connectionId}:queuemessages`, JSON.stringify(messageData)) // test send message to publish channel connection id - const messagePublish = { - id: messageId, - receivedAt, - encryptedMessage: payload, - } + const messagePublish: QueuedMessage[] = [ + { + id: messageId, + receivedAt, + encryptedMessage: payload, + }, + ] this.logger.debug(`[addMessage] Message stored in Redis for connectionId ${connectionId}`) @@ -253,7 +260,7 @@ export class WebsocketService { // Remove messages from MongoDB const response = await this.queuedMessage.deleteMany({ connectionId: connectionId, - _id: { $in: messageIds.map((id) => new Object(id)) }, + messageId: { $in: messageIds.map((id) => new Object(id)) }, }) this.logger.debug('[removeMessages] Messages removed from MongoDB', { @@ -391,7 +398,7 @@ export class WebsocketService { }) // Handles messages received on the subscribed Redis channel - this.redisSubscriber.on('message', (channel: string, message: QueuedMessage[]) => { + this.redisSubscriber.on('message', (channel: string, message: string) => { if (channel === connectionId) { this.logger.log(`*** [redisSubscriber] Received message from ${channel}: ${message} **`) @@ -400,7 +407,7 @@ export class WebsocketService { method: 'messageReceive', params: { connectionId, - message, + message: JSON.parse(message), id: dto.id, }, }