Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: client types and service methods into server #5

Merged
merged 3 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions packages/client/src/MessagePickupRepositoryClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,14 @@ export class MessagePickupRepositoryClient implements MessagePickupRepository {
* @param {JsonRpcParamsMessage} data - The data received via the 'messageReceive' event.
*
* @param {string} data.connectionId - The ID of the connection associated with the message.
* @param {QueuedMessage[]} data.message - An array of queued messages received.
* @param {string} data.message - queued messages received.
gabrielmatau79 marked this conversation as resolved.
Show resolved Hide resolved
* @param {string} [data.id] - (Optional) The identifier for the JSON-RPC message.
*
* @example
* messageReceived((data: JsonRpcParamsMessage) => {
* console.log('ConnectionId:', data.connectionId);
* console.log('Messages:', data.message);
* const message = JSON.parse(data.message)
* console.log('Messages:', message.id);
* });
*/
messageReceived(callback: (data: JsonRpcParamsMessage) => void): void {
Expand Down
4 changes: 1 addition & 3 deletions packages/client/src/interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import { QueuedMessage } from '@credo-ts/core'

export interface JsonRpcParamsMessage {
connectionId: string
message: QueuedMessage
message: string
id?: string
}

Expand Down
22 changes: 12 additions & 10 deletions packages/server/src/websocket/websocket.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,21 +110,23 @@ describe('WebsocketService', () => {
expect(result[1].encryptedMessage).toBe('test-message-2') // Verifica por 'id'
})

it('should getAvailableMessageCount of available messages from Redis', async () => {
// Mock Redis to return a predefined count of messages
jest.spyOn(redisMock, 'llen').mockResolvedValue(5)
it('should getAvailableMessageCount of available messages from Redis and MongoDB', async () => {
const connectionId = 'test-connection-id'

// Mock Redis response
jest.spyOn(redisMock, 'llen').mockResolvedValue(5) // simula 5 mensajes en Redis
gabrielmatau79 marked this conversation as resolved.
Show resolved Hide resolved

// Mock MongoDB countDocuments response
storeQueuedMessageMock.countDocuments = jest.fn().mockResolvedValue(3) // simula 3 mensajes en MongoDB

// Execute the getAvailableMessageCount method
const result = await service.getAvailableMessageCount({
connectionId: 'test-connection-id',
id: '1',
})

// Verify that Redis was called with the correct key
expect(redisMock.llen).toHaveBeenCalledWith('connectionId:test-connection-id:queuemessages')

// Verify the returned count of available messages
expect(result).toBe(5)
expect(result).toBe(8) // 5 mensajes de Redis + 3 mensajes de MongoDB
expect(redisMock.llen).toHaveBeenCalledWith(`connectionId:${connectionId}:queuemessages`)
expect(storeQueuedMessageMock.countDocuments).toHaveBeenCalledWith({ connectionId })
})

it('should addmessage method to the queue and publish it to Redis', async () => {
Expand Down Expand Up @@ -214,7 +216,7 @@ describe('WebsocketService', () => {
// Verify that messages were removed from MongoDB
expect(storeQueuedMessageMock.deleteMany).toHaveBeenCalledWith({
connectionId: removeMessagesDto.connectionId,
_id: { $in: removeMessagesDto.messageIds.map((id) => new Object(id)) },
messageId: { $in: removeMessagesDto.messageIds.map((id) => new Object(id)) },
})
})
})
11 changes: 8 additions & 3 deletions packages/server/src/websocket/websocket.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,11 @@ export class WebsocketService {
this.logger.debug(
`[takeFromQueue] Fetched ${mongoMappedMessages.length} messages from MongoDB for connectionId ${connectionId}`,
)

// Combine messages from Redis and MongoDB
const combinedMessages: QueuedMessage[] = [...redisMessages, ...mongoMappedMessages]

this.logger.debug(`[takeFromQueue] combinedMessages for connectionId ${connectionId}: ${combinedMessages}`)

return combinedMessages
} catch (error) {
this.logger.error('[takeFromQueue] Error retrieving messages from Redis and MongoDB:', {
Expand All @@ -135,7 +136,11 @@ export class WebsocketService {

try {
// retrieve the list count of messages for the connection
const messageCount = await this.redis.llen(`connectionId:${connectionId}:queuemessages`)
const redisMessageCount = await this.redis.llen(`connectionId:${connectionId}:queuemessages`)

const mongoMessageCount = await this.queuedMessage.countDocuments({ connectionId })

const messageCount = redisMessageCount + mongoMessageCount

this.logger.debug(`[getAvailableMessageCount] Message count retrieved for connectionId ${connectionId}`, {
messageCount,
Expand Down Expand Up @@ -253,7 +258,7 @@ export class WebsocketService {
// Remove messages from MongoDB
const response = await this.queuedMessage.deleteMany({
connectionId: connectionId,
_id: { $in: messageIds.map((id) => new Object(id)) },
messageId: { $in: messageIds.map((id) => new Object(id)) },
})

this.logger.debug('[removeMessages] Messages removed from MongoDB', {
Expand Down