Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: client types and service methods into server #5

Merged
merged 3 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions packages/client/src/MessagePickupRepositoryClient.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { Client } from 'rpc-websockets'
import log from 'loglevel'
import {
JsonRpcParamsMessage,
RemoveAllMessagesOptions,
ConnectionIdOptions,
AddLiveSessionOptions,
MessageReceivedCallbackParams,
} from './interfaces'
import {
AddMessageOptions,
Expand All @@ -20,7 +20,7 @@ log.setLevel('info')
export class MessagePickupRepositoryClient implements MessagePickupRepository {
private client?: Client
private readonly logger = log
private messageReceivedCallback: ((data: JsonRpcParamsMessage) => void) | null = null
private messageReceivedCallback: ((data: MessageReceivedCallbackParams) => void) | null = null

constructor(private readonly url: string) {}

Expand All @@ -41,7 +41,7 @@ export class MessagePickupRepositoryClient implements MessagePickupRepository {

client.addListener('messageReceive', (data) => {
if (this.messageReceivedCallback) {
this.messageReceivedCallback(data)
this.messageReceivedCallback(data as MessageReceivedCallbackParams)
} else {
this.logger.log('Received message event, but no callback is registered:', data)
}
Expand Down Expand Up @@ -72,19 +72,20 @@ export class MessagePickupRepositoryClient implements MessagePickupRepository {
* @param callback - The callback function to be invoked when 'messageReceive' is triggered.
* The callback receives a `data` parameter of type `JsonRpcParamsMessage`, containing:
*
* @param {JsonRpcParamsMessage} data - The data received via the 'messageReceive' event.
* @param {MessageReceivedCallbackParams} data - The data received via the 'messageReceive' event.
*
* @param {string} data.connectionId - The ID of the connection associated with the message.
* @param {QueuedMessage[]} data.message - An array of queued messages received.
* @param {QueuedMessage[]} data.message - Array of queued messages received.
* @param {string} [data.id] - (Optional) The identifier for the JSON-RPC message.
*
* @example
* messageReceived((data: JsonRpcParamsMessage) => {
* messageReceived((data: MessageReceivedCallbackParams) => {
* const { connectionId, message } = data
* console.log('ConnectionId:', data.connectionId);
* console.log('Messages:', data.message);
* console.log('Message:', message[0].id)
* });
*/
messageReceived(callback: (data: JsonRpcParamsMessage) => void): void {
messageReceived(callback: (data: MessageReceivedCallbackParams) => void): void {
this.messageReceivedCallback = callback
}

Expand Down
11 changes: 5 additions & 6 deletions packages/client/src/interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
import { QueuedMessage } from '@credo-ts/core'

export interface JsonRpcParamsMessage {
connectionId: string
message: QueuedMessage
id?: string
}

export interface RemoveAllMessagesOptions {
connectionId: string
recipientDid: string
Expand All @@ -19,3 +13,8 @@ export interface AddLiveSessionOptions {
connectionId: string
sessionId: string
}

export interface MessageReceivedCallbackParams {
connectionId: string
message: QueuedMessage[]
}
22 changes: 12 additions & 10 deletions packages/server/src/websocket/websocket.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,21 +110,23 @@ describe('WebsocketService', () => {
expect(result[1].encryptedMessage).toBe('test-message-2') // Verifica por 'id'
})

it('should getAvailableMessageCount of available messages from Redis', async () => {
// Mock Redis to return a predefined count of messages
jest.spyOn(redisMock, 'llen').mockResolvedValue(5)
it('should getAvailableMessageCount of available messages from Redis and MongoDB', async () => {
const connectionId = 'test-connection-id'

// Mock Redis response
jest.spyOn(redisMock, 'llen').mockResolvedValue(5) // Simulate 5 messages in Redis.

// Mock MongoDB countDocuments response
storeQueuedMessageMock.countDocuments = jest.fn().mockResolvedValue(3) //Simulate 3 messages in MongoDB.

// Execute the getAvailableMessageCount method
const result = await service.getAvailableMessageCount({
connectionId: 'test-connection-id',
id: '1',
})

// Verify that Redis was called with the correct key
expect(redisMock.llen).toHaveBeenCalledWith('connectionId:test-connection-id:queuemessages')

// Verify the returned count of available messages
expect(result).toBe(5)
expect(result).toBe(8) // 5 messages from Redis + 3 messages from MongoDB
expect(redisMock.llen).toHaveBeenCalledWith(`connectionId:${connectionId}:queuemessages`)
expect(storeQueuedMessageMock.countDocuments).toHaveBeenCalledWith({ connectionId })
})

it('should addmessage method to the queue and publish it to Redis', async () => {
Expand Down Expand Up @@ -214,7 +216,7 @@ describe('WebsocketService', () => {
// Verify that messages were removed from MongoDB
expect(storeQueuedMessageMock.deleteMany).toHaveBeenCalledWith({
connectionId: removeMessagesDto.connectionId,
_id: { $in: removeMessagesDto.messageIds.map((id) => new Object(id)) },
messageId: { $in: removeMessagesDto.messageIds.map((id) => new Object(id)) },
})
})
})
27 changes: 17 additions & 10 deletions packages/server/src/websocket/websocket.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,11 @@ export class WebsocketService {
this.logger.debug(
`[takeFromQueue] Fetched ${mongoMappedMessages.length} messages from MongoDB for connectionId ${connectionId}`,
)

// Combine messages from Redis and MongoDB
const combinedMessages: QueuedMessage[] = [...redisMessages, ...mongoMappedMessages]

this.logger.debug(`[takeFromQueue] combinedMessages for connectionId ${connectionId}: ${combinedMessages}`)

return combinedMessages
} catch (error) {
this.logger.error('[takeFromQueue] Error retrieving messages from Redis and MongoDB:', {
Expand All @@ -135,7 +136,11 @@ export class WebsocketService {

try {
// retrieve the list count of messages for the connection
const messageCount = await this.redis.llen(`connectionId:${connectionId}:queuemessages`)
const redisMessageCount = await this.redis.llen(`connectionId:${connectionId}:queuemessages`)

const mongoMessageCount = await this.queuedMessage.countDocuments({ connectionId })

const messageCount = redisMessageCount + mongoMessageCount

this.logger.debug(`[getAvailableMessageCount] Message count retrieved for connectionId ${connectionId}`, {
messageCount,
Expand Down Expand Up @@ -186,11 +191,13 @@ export class WebsocketService {
await this.redis.rpush(`connectionId:${connectionId}:queuemessages`, JSON.stringify(messageData))

// test send message to publish channel connection id
const messagePublish = {
id: messageId,
receivedAt,
encryptedMessage: payload,
}
const messagePublish: QueuedMessage[] = [
{
id: messageId,
receivedAt,
encryptedMessage: payload,
},
]

this.logger.debug(`[addMessage] Message stored in Redis for connectionId ${connectionId}`)

Expand Down Expand Up @@ -253,7 +260,7 @@ export class WebsocketService {
// Remove messages from MongoDB
const response = await this.queuedMessage.deleteMany({
connectionId: connectionId,
_id: { $in: messageIds.map((id) => new Object(id)) },
messageId: { $in: messageIds.map((id) => new Object(id)) },
})

this.logger.debug('[removeMessages] Messages removed from MongoDB', {
Expand Down Expand Up @@ -391,7 +398,7 @@ export class WebsocketService {
})

// Handles messages received on the subscribed Redis channel
this.redisSubscriber.on('message', (channel: string, message: QueuedMessage[]) => {
this.redisSubscriber.on('message', (channel: string, message: string) => {
if (channel === connectionId) {
this.logger.log(`*** [redisSubscriber] Received message from ${channel}: ${message} **`)

Expand All @@ -400,7 +407,7 @@ export class WebsocketService {
method: 'messageReceive',
params: {
connectionId,
message,
message: JSON.parse(message),
id: dto.id,
},
}
Expand Down
Loading