From 2061999b218268ed788121bd0991ab7d87332a30 Mon Sep 17 00:00:00 2001 From: Gabriel Mata Date: Fri, 20 Dec 2024 08:51:45 -0500 Subject: [PATCH] fix: remove conditionial 'this.instance' to publish message when live session exist in other instance, modify logger text with new name function --- .../src/PostgresMessagePickupRepository.ts | 30 ++++++++----------- 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/packages/postgres/src/PostgresMessagePickupRepository.ts b/packages/postgres/src/PostgresMessagePickupRepository.ts index f35aa01..2e1ab80 100644 --- a/packages/postgres/src/PostgresMessagePickupRepository.ts +++ b/packages/postgres/src/PostgresMessagePickupRepository.ts @@ -67,7 +67,7 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository try { // Initialize the database await this.buildPgDatabase() - this.logger?.info(`The database has been initialized successfully`) + this.logger?.info(`[initialize] The database has been build successfully`) // Configure PostgreSQL pool for the messages collections this.messagesCollection = new Pool({ @@ -77,11 +77,9 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository database: this.postgresDatabaseName, port: 5432, }) - this.logger?.debug(`[initialize] Listener status: ${this.dbListener}`) // Initialize Pub/Sub instance if database listener is enabled - - this.logger?.debug(`[initialize] Initializing pubSubInstance with listener: ${this.dbListener}`) + this.logger?.debug(`[initialize] Initializing pubSubInstance`) this.pubSubInstance = new PGPubsub( `postgres://${this.postgresUser}:${this.postgresPassword}@${this.postgresHost}/${this.postgresDatabaseName}`, ) @@ -284,14 +282,12 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository } else { this.logger?.error(`connectionInfoCallback is not defined`) } - } else if (this.dbListener) { + } else { // Publish to the Pub/Sub channel if a live session exists on another instance this.logger?.debug( `[addMessage] Publishing new message event to Pub/Sub channel for connectionId: ${connectionId}`, ) await this.pubSubInstance?.publish('newMessage', connectionId) - } else { - this.logger?.debug('[addMessage] No live session and no DB channel configured.') } } @@ -345,7 +341,7 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository } public async shutdown() { - this.logger?.info(`[disposeDB] Close connection to postgres`) + this.logger?.info(`[shutdown] Close connection to postgres`) await this.messagesCollection?.end() } @@ -356,13 +352,13 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository * @returns {Promise} A promise resolving when the listener is initialized. */ private async initializeMessageListener(channel: string): Promise { - this.logger?.info(`[getListenerPublishDb] Initializing method for channel: ${channel}`) + this.logger?.info(`[initializeMessageListener] Initializing method for channel: ${channel}`) try { // Add a listener to the specified Pub/Sub channel await this.pubSubInstance?.addChannel(channel, async (connectionId: string) => { this.logger?.debug( - `[getListenerPublishDb] Received new message on channel: ${channel} for connectionId: ${connectionId}`, + `[initializeMessageListener] Received new message on channel: ${channel} for connectionId: ${connectionId}`, ) // Fetch the local live session for the given connectionId @@ -370,7 +366,7 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository if (pickupLiveSession) { this.logger?.debug( - `[getListenerPublishDb] ${this.instanceName} found a LiveSession on channel: ${channel} for connectionId: ${connectionId}. Delivering messages.`, + `[initializeMessageListener] ${this.instanceName} found a LiveSession on channel: ${channel} for connectionId: ${connectionId}. Delivering messages.`, ) // Deliver messages from the queue for the live session @@ -379,14 +375,14 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository }) } else { this.logger?.debug( - `[getListenerPublishDb] No LiveSession found on channel: ${channel} for connectionId: ${connectionId}.`, + `[initializeMessageListener] No LiveSession found on channel: ${channel} for connectionId: ${connectionId}.`, ) } }) - this.logger?.info(`[getListenerPublishDb] Listener successfully added for channel: ${channel}`) + this.logger?.info(`[initializeMessageListener] Listener successfully added for channel: ${channel}`) } catch (error) { - this.logger?.error(`[getListenerPublishDb] Error initializing listener for channel ${channel}: ${error}`) + this.logger?.error(`[initializeMessageListener] Error initializing listener for channel ${channel}: ${error}`) throw new Error(`Failed to initialize listener for channel ${channel}: ${error}`) } } @@ -398,8 +394,6 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository private async buildPgDatabase(): Promise { this.logger?.info(`[buildPgDatabase] PostgresDbService Initializing`) - //const tableNameMessage = 'storequeuedmessage' - const clientConfig = { user: this.postgresUser, host: this.postgresHost, @@ -501,13 +495,13 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository * @returns */ private async findLocalLiveSession(connectionId: string): Promise { - this.logger?.debug(`[getLocalliveSession] Verify current active live mode for connectionId ${connectionId}`) + this.logger?.debug(`[findLocalLiveSession] Verify current active live mode for connectionId ${connectionId}`) try { if (!this.agent) throw new Error('Agent is not defined') return this.agent.messagePickup.getLiveModeSession({ connectionId }) } catch (error) { - this.logger?.error(`[getLocalliveSession] error in getLocalliveSession: ${error}`) + this.logger?.error(`[findLocalLiveSession] error in getLocalliveSession: ${error}`) } }