Skip to content

Commit

Permalink
feat: ✨ added server Remove all messages for a recipient function issue
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielmatau79 committed Sep 23, 2024
1 parent a3fa4ac commit 18ee788
Show file tree
Hide file tree
Showing 5 changed files with 4,967 additions and 321 deletions.
4 changes: 2 additions & 2 deletions packages/client/src/lib/MessagePickupRepositoryClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,10 @@ export class MessagePickupRepositoryClient implements MessagePickupRepository {
/**
* Call the 'getLiveSession' RPC method.
* This method retrieves the live session data from the WebSocket server.
* It expects the response to be a `StoreLiveSession` object or `null`.
* It expects the response to be a `true` object or `null`.
*
* @param params - Parameters to pass to the 'getLiveSession' method.
* @returns {Promise<StoreLiveSession | null>} - The live session data.
* @returns {Promise<boolean | null>} - The live session data.
*/
async getLiveSession(params: ConnectionIdDto): Promise<boolean> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,15 @@ export class AddLiveSessionDto {
@IsNotEmpty()
sessionId: string
}

export class RemoveAllMessagesDto {
@IsNotEmpty()
id: string

@IsNotEmpty()
connectionId: string

@IsNotEmpty()
@IsString()
recipientDid: string
}
11 changes: 11 additions & 0 deletions packages/server/src/websocket/websocket.gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
AddLiveSessionDto,
AddMessageDto,
ConnectionIdDto,
RemoveAllMessagesDto,
RemoveMessagesDto,
TakeFromQueueDto,
} from './dto/messagerepository-websocket.dto'
Expand Down Expand Up @@ -118,6 +119,16 @@ export class WebsocketGateway implements OnModuleInit, OnModuleDestroy {
}
})

this.server.register('removeAllMessages', async (params: RemoveAllMessagesDto) => {
try {
await this.websocketService.removeAllMessage(params)
return true
} catch (error) {
this.logger.error('Error in removeMessages method', error.stack)
throw this.server.createError(500, 'Internal server error', { details: error.message })
}
})

this.server.register('getLiveSession', async (params: ConnectionIdDto) => {
try {
return await this.websocketService.getLiveSession(params)
Expand Down
46 changes: 46 additions & 0 deletions packages/server/src/websocket/websocket.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
TakeFromQueueDto,
ConnectionIdDto,
AddLiveSessionDto,
RemoveAllMessagesDto,
} from './dto/messagerepository-websocket.dto'
import { StoreQueuedMessage } from './schemas/StoreQueuedMessage'
import { InjectRedis } from '@nestjs-modules/ioredis'
Expand Down Expand Up @@ -271,6 +272,51 @@ export class WebsocketService {
}
}

/**
* Removes all messages associated with the given connectionId and recipientDid.
* Messages are removed from both Redis and MongoDB.
*
* @param removeAllMessagesDto - Data Transfer Object containing connectionId and recipientDid
* @returns {Promise<void>} - This function does not return any value.
* @throws {Error} - If the operation fails to remove messages.
*/
async removeAllMessage(dto: RemoveAllMessagesDto): Promise<void> {
const { connectionId, recipientDid } = dto

try {
// Get the list of messages stored in Redis associated with the connectionId
const key = `connectionId:${connectionId}:queuemessages`
const messages = await this.redis.lrange(key, 0, -1) // Retrieve all messages from Redis

// Filter messages that match the recipientDid
const messagesToRemove = messages.filter((message) => {
const parsedMessage = JSON.parse(message)
return parsedMessage.recipientDids.includes(recipientDid)
})

// Remove the filtered messages from Redis
for (const message of messagesToRemove) {
await this.redis.lrem(key, 1, message) // Remove each message from the list in Redis
}

// Remove the corresponding messages from MongoDB
await this.queuedMessage.deleteMany({
connectionId,
recipientKeys: recipientDid, // Assuming recipientDids is stored as an array in MongoDB
})

this.logger.log(
`Successfully removed all messages for connectionId ${connectionId} and recipientDid ${recipientDid}`,
)
} catch (error) {
this.logger.error(
`Failed to remove messages for connectionId ${connectionId} and recipientDid ${recipientDid}`,
error,
)
throw new Error('Failed to remove messages')
}
}

/**
* Retrieves the live session associated with the given connection ID.
*
Expand Down
Loading

0 comments on commit 18ee788

Please sign in to comment.