Skip to content

Commit

Permalink
fix: remove conditionial 'this.instance' to publish message when live…
Browse files Browse the repository at this point in the history
… session exist in other instance, modify logger text with new name function
  • Loading branch information
gabrielmatau79 committed Dec 20, 2024
1 parent 1aceb9c commit 2061999
Showing 1 changed file with 12 additions and 18 deletions.
30 changes: 12 additions & 18 deletions packages/postgres/src/PostgresMessagePickupRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
try {
// Initialize the database
await this.buildPgDatabase()
this.logger?.info(`The database has been initialized successfully`)
this.logger?.info(`[initialize] The database has been build successfully`)

// Configure PostgreSQL pool for the messages collections
this.messagesCollection = new Pool({
Expand All @@ -77,11 +77,9 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
database: this.postgresDatabaseName,
port: 5432,
})
this.logger?.debug(`[initialize] Listener status: ${this.dbListener}`)

// Initialize Pub/Sub instance if database listener is enabled

this.logger?.debug(`[initialize] Initializing pubSubInstance with listener: ${this.dbListener}`)
this.logger?.debug(`[initialize] Initializing pubSubInstance`)
this.pubSubInstance = new PGPubsub(
`postgres://${this.postgresUser}:${this.postgresPassword}@${this.postgresHost}/${this.postgresDatabaseName}`,
)
Expand Down Expand Up @@ -284,14 +282,12 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
} else {
this.logger?.error(`connectionInfoCallback is not defined`)
}
} else if (this.dbListener) {
} else {
// Publish to the Pub/Sub channel if a live session exists on another instance
this.logger?.debug(
`[addMessage] Publishing new message event to Pub/Sub channel for connectionId: ${connectionId}`,
)
await this.pubSubInstance?.publish('newMessage', connectionId)
} else {
this.logger?.debug('[addMessage] No live session and no DB channel configured.')
}
}

Expand Down Expand Up @@ -345,7 +341,7 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
}

public async shutdown() {
this.logger?.info(`[disposeDB] Close connection to postgres`)
this.logger?.info(`[shutdown] Close connection to postgres`)
await this.messagesCollection?.end()
}

Expand All @@ -356,21 +352,21 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
* @returns {Promise<void>} A promise resolving when the listener is initialized.
*/
private async initializeMessageListener(channel: string): Promise<void> {
this.logger?.info(`[getListenerPublishDb] Initializing method for channel: ${channel}`)
this.logger?.info(`[initializeMessageListener] Initializing method for channel: ${channel}`)

try {
// Add a listener to the specified Pub/Sub channel
await this.pubSubInstance?.addChannel(channel, async (connectionId: string) => {
this.logger?.debug(
`[getListenerPublishDb] Received new message on channel: ${channel} for connectionId: ${connectionId}`,
`[initializeMessageListener] Received new message on channel: ${channel} for connectionId: ${connectionId}`,
)

// Fetch the local live session for the given connectionId
const pickupLiveSession = await this.findLocalLiveSession(connectionId)

if (pickupLiveSession) {
this.logger?.debug(
`[getListenerPublishDb] ${this.instanceName} found a LiveSession on channel: ${channel} for connectionId: ${connectionId}. Delivering messages.`,
`[initializeMessageListener] ${this.instanceName} found a LiveSession on channel: ${channel} for connectionId: ${connectionId}. Delivering messages.`,
)

// Deliver messages from the queue for the live session
Expand All @@ -379,14 +375,14 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
})
} else {
this.logger?.debug(
`[getListenerPublishDb] No LiveSession found on channel: ${channel} for connectionId: ${connectionId}.`,
`[initializeMessageListener] No LiveSession found on channel: ${channel} for connectionId: ${connectionId}.`,
)
}
})

this.logger?.info(`[getListenerPublishDb] Listener successfully added for channel: ${channel}`)
this.logger?.info(`[initializeMessageListener] Listener successfully added for channel: ${channel}`)
} catch (error) {
this.logger?.error(`[getListenerPublishDb] Error initializing listener for channel ${channel}: ${error}`)
this.logger?.error(`[initializeMessageListener] Error initializing listener for channel ${channel}: ${error}`)
throw new Error(`Failed to initialize listener for channel ${channel}: ${error}`)
}
}
Expand All @@ -398,8 +394,6 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
private async buildPgDatabase(): Promise<void> {
this.logger?.info(`[buildPgDatabase] PostgresDbService Initializing`)

//const tableNameMessage = 'storequeuedmessage'

const clientConfig = {
user: this.postgresUser,
host: this.postgresHost,
Expand Down Expand Up @@ -501,13 +495,13 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
* @returns
*/
private async findLocalLiveSession(connectionId: string): Promise<MessagePickupSession | undefined> {
this.logger?.debug(`[getLocalliveSession] Verify current active live mode for connectionId ${connectionId}`)
this.logger?.debug(`[findLocalLiveSession] Verify current active live mode for connectionId ${connectionId}`)

try {
if (!this.agent) throw new Error('Agent is not defined')
return this.agent.messagePickup.getLiveModeSession({ connectionId })
} catch (error) {
this.logger?.error(`[getLocalliveSession] error in getLocalliveSession: ${error}`)
this.logger?.error(`[findLocalLiveSession] error in getLocalliveSession: ${error}`)
}
}

Expand Down

0 comments on commit 2061999

Please sign in to comment.