diff --git a/packages/server/src/websocket/services/MessagePersister.ts b/packages/server/src/websocket/services/MessagePersister.ts index 647d777..7cac26b 100644 --- a/packages/server/src/websocket/services/MessagePersister.ts +++ b/packages/server/src/websocket/services/MessagePersister.ts @@ -1,62 +1,143 @@ -import { Injectable, Logger } from '@nestjs/common' +import { Injectable, Logger, OnModuleDestroy } from '@nestjs/common' import { InjectModel } from '@nestjs/mongoose' import { StoreQueuedMessage } from '../schemas/StoreQueuedMessage' import { Model } from 'mongoose' import Redis from 'ioredis' import { InjectRedis } from '@nestjs-modules/ioredis' import { ConfigService } from '@nestjs/config' +import * as os from 'os' @Injectable() -export class MessagePersister { +export class MessagePersister implements OnModuleDestroy { private readonly logger = new Logger(MessagePersister.name) private thresholdTimestamp: number + private readonly lockKey = 'message_persister_master' + private readonly lockTTL = 60000 // 1 minute TTL + private renewInterval: NodeJS.Timeout | null = null + private checkInterval: NodeJS.Timeout | null = null + private readonly instanceId = os.hostname() // Unique ID for this instance constructor( @InjectModel(StoreQueuedMessage.name) private storeQueuedMessage: Model, @InjectRedis() private readonly redis: Redis, private readonly configService: ConfigService, ) { - this.startMonitoring() + this.initiateMasterRole() } + // Attempts to acquire the mastership role and start the monitoring process if successful + initiateMasterRole() { + this.checkInterval = setInterval(async () => { + const isLeader = await this.acquireLock() + if (isLeader) { + this.logger.log(`[initiateMasterRole] This instance (${this.instanceId}) has acquired mastership.`) + this.startMonitoring() + this.renewLockPeriodically() + clearInterval(this.checkInterval) // Stop trying to acquire lock once acquired + } else { + this.logger.log(`[initiateMasterRole] Another instance is currently the master.`) + } + }, this.lockTTL) // Attempt to acquire mastership every TTL interval + } + + // Tries to acquire the Redis lock to become the master + async acquireLock(): Promise { + try { + this.logger.debug(`[acquireLock] Attempting to acquire lock with key ${this.lockKey}`) + + // Attempt to set the lock with NX (only if it does not exist) and PX (with TTL) + const result = await (this.redis.set as any)(this.lockKey, this.instanceId, 'NX', 'PX', this.lockTTL) + + // If result is OK, lock was acquired successfully + if (result === 'OK') { + this.logger.debug(`[acquireLock] Lock acquired successfully by instance ${this.instanceId}`) + return true + } + + // Otherwise, log the current lock holder + const currentLockHolder = await this.redis.get(this.lockKey) + this.logger.debug(`[acquireLock] Lock is currently held by instance: ${currentLockHolder}`) + return false + } catch (error) { + this.logger.error(`[acquireLock] Error acquiring lock: ${error}`) + return false + } + } + + // Periodically renews the lock to maintain mastership + renewLockPeriodically() { + this.renewInterval = setInterval(async () => { + try { + // Check if this instance still holds the lock before renewing + const currentLockHolder = await this.redis.get(this.lockKey) + if (currentLockHolder === this.instanceId) { + await this.redis.pexpire(this.lockKey, this.lockTTL) + this.logger.debug(`[renewLockPeriodically] Lock renewed by instance ${this.instanceId}`) + } else { + this.logger.warn(`[renewLockPeriodically] Lock is no longer held by instance ${this.instanceId}`) + this.clearMasterShip() // Relinquish mastership if another instance took the lock + } + } catch (error) { + this.logger.error(`[renewLockPeriodically] Error renewing lock: ${error}`) + this.clearMasterShip() // Relinquish mastership if unable to renew + } + }, this.lockTTL / 2) // Renew before the TTL expires + } + + // Clear mastership if lock renewal fails or another instance acquires it + clearMasterShip() { + if (this.renewInterval) { + clearInterval(this.renewInterval) + this.renewInterval = null + } + if (!this.checkInterval) { + this.initiateMasterRole() // Restart attempting to acquire mastership if lost + } + this.logger.log('[clearMasterShip] Mastership relinquished.') + } + + // Releases the lock when the module is destroyed, allowing another instance to become master + onModuleDestroy() { + if (this.renewInterval) { + clearInterval(this.renewInterval) + } + if (this.checkInterval) { + clearInterval(this.checkInterval) + } + this.redis.del(this.lockKey) // Releases the lock on shutdown + this.logger.log(`[onModuleDestroy] Lock released by instance ${this.instanceId}`) + } + + // Starts the monitoring and migration process if this instance is the master startMonitoring() { this.logger.log(`[startMonitoring] Initialize MessagePersister`) this.thresholdTimestamp = this.configService.get('appConfig.thresholdTimestamp', 60000) setInterval(() => this.migrateData(), this.thresholdTimestamp) } + // Migrates messages from Redis to MongoDB if they meet the age threshold async migrateData() { this.logger.log(`[migrateData] Initialize MessagePersister`) - // Calculate the threshold timestamp (messages older than 60 seconds will be migrated) const threshold = Date.now() - this.thresholdTimestamp this.logger.log(`[migrateData] Threshold timestamp calculated: ${threshold}`) - // Get the keys for messages in Redis that match the pattern const connectionIds = await this.redis.keys('connectionId:*:queuemessages') this.logger.log(`[migrateData] Found ${connectionIds.length} connectionIds matching the pattern`) - // Iterate over each connection key for (const fullKey of connectionIds) { this.logger.log(`[migrateData] Processing Redis key: ${fullKey}`) - - // Fetch all messages from the Redis list const messages = await this.redis.lrange(fullKey, 0, -1) this.logger.log(`[migrateData] Found ${messages.length} messages in key: ${fullKey}`) - // Iterate over each message in the list for (const messageData of messages) { const message = JSON.parse(messageData) this.logger.log(`[migrateData] Processing message with messageId: ${message.messageId}`) - - this.logger.log(`[migrateData] receivedAt : ${message.receivedAt} *** threshold ${threshold} `) const receivedAtTimestamp = new Date(message.receivedAt).getTime() - // Check if the message is older than the threshold + if (receivedAtTimestamp < threshold) { this.logger.log(`[migrateData] Message is older than threshold, migrating...`) - try { - // Save the message to MongoDB await this.storeQueuedMessage.create({ messageId: message.messageId, connectionId: message.connectionId, @@ -65,14 +146,11 @@ export class MessagePersister { state: message.state, createdAt: new Date(message.receivedAt), }) - - // Remove the message from Redis after migration await this.redis.lrem(fullKey, 1, messageData) this.logger.log( `[migrateData] Migrated and deleted message with key: ${fullKey} and messageId: ${message.messageId}`, ) } catch (error) { - // Log the error if migration fails this.logger.error('[migrateData] Failed to migrate message', { fullKey, messageId: message.messageId, @@ -80,7 +158,6 @@ export class MessagePersister { }) } } else { - // Skip the message if it is not old enough this.logger.log(`[migrateData] Message with messageId: ${message.messageId} is not old enough, skipping...`) } }