Skip to content

Commit

Permalink
fix: fix querys sql into dbcollections.ts
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielmatau79 committed Dec 20, 2024
1 parent 739e45f commit 1aceb9c
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 32 deletions.
11 changes: 5 additions & 6 deletions packages/postgres/config/dbCollections.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export const messagesTableName = 'storequeuedMessage'
export const messagesTableName = 'storequeuedmessage'

export const createTableMessage = `
CREATE TABLE IF NOT EXISTS ${messagesTableName} (
Expand All @@ -9,9 +9,6 @@ CREATE TABLE IF NOT EXISTS ${messagesTableName} (
state VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS "${messagesTableName}_connectionId_index" ON "queuedmessages" (connectionId);
CREATE INDEX IF NOT EXISTS "${messagesTableName}_created_at_index" ON "queuedmessages" (created_at);
`

export const liveSessionTableName = 'storelivesession'
Expand All @@ -24,6 +21,8 @@ CREATE TABLE IF NOT EXISTS ${liveSessionTableName} (
role VARCHAR(50),
instance VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
);`

export const indexMessageTable = `CREATE INDEX IF NOT EXISTS "${messagesTableName}_connectionId_index" ON "${messagesTableName}" (connectionId);`

CREATE INDEX IF NOT EXISTS "${liveSessionTableName}_connectionid" ON "${liveSessionTableName}" USING btree ("connectionid");`
export const indexLiveSessionTable = `CREATE INDEX IF NOT EXISTS "${liveSessionTableName}_connectionid" ON "${liveSessionTableName}" USING btree ("connectionid");`
4 changes: 2 additions & 2 deletions packages/postgres/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@2060.io/credo-ts-message-pickup-repository-pg",
"main": "build/index",
"types": "build/index",
"main": "build/src/index.js",
"types": "build/src/index.d.ts",
"version": "0.0.1",
"files": [
"build"
Expand Down
55 changes: 31 additions & 24 deletions packages/postgres/src/PostgresMessagePickupRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@ import {
import { Pool, Client } from 'pg'
import PGPubsub from 'pg-pubsub'
import * as os from 'os'
import { createTableMessage, createTableLive, messagesTableName, liveSessionTableName } from '../config/dbCollections'
import {
createTableMessage,
createTableLive,
messagesTableName,
liveSessionTableName,
indexLiveSessionTable,
} from '../config/dbCollections'
import { ConnectionInfo, PostgresMessagePickupRepositoryConfig } from './interfaces'
import { MessagePickupSession } from '@credo-ts/core/build/modules/message-pickup/MessagePickupSession'
import axios from 'axios'
Expand Down Expand Up @@ -56,7 +62,7 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
*/
public async initialize(options: {
agent: Agent
connectionInfoCallback: (connectionId: string) => Promise<ConnectionInfo | undefined>
connectionInfoCallback?: (connectionId: string) => Promise<ConnectionInfo | undefined>
}): Promise<void> {
try {
// Initialize the database
Expand Down Expand Up @@ -142,7 +148,7 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
// Query to fetch messages from the database
const query = `
SELECT id, encryptedmessage, state
FROM queuedmessages
FROM ${messagesTableName}
WHERE (connectionid = $1 OR $2 = ANY (recipientkeys)) AND state = 'pending'
ORDER BY created_at
LIMIT $3
Expand All @@ -159,7 +165,7 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository

// Update message states to 'sending' if deleteMessages is false
if (!deleteMessages && messagesToUpdateIds.length > 0) {
const updateQuery = `UPDATE queuedmessages SET state = 'sending' WHERE id = ANY($1)`
const updateQuery = `UPDATE ${messagesTableName} SET state = 'sending' WHERE id = ANY($1)`
const updateResult = await this.messagesCollection?.query(updateQuery, [messagesToUpdateIds])

if (updateResult?.rowCount !== result.rows.length) {
Expand Down Expand Up @@ -198,7 +204,7 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
// Query to count pending messages for the specified connection ID
const query = `
SELECT COUNT(*) AS count
FROM queuedmessages
FROM ${messagesTableName}
WHERE connectionid = $1 AND state = 'pending'
`
const params = [connectionId]
Expand Down Expand Up @@ -245,7 +251,7 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository

// Insert the message into the database
const query = `
INSERT INTO queuedmessages(connectionid, recipientKeys, encryptedmessage, state)
INSERT INTO ${messagesTableName}(connectionid, recipientKeys, encryptedmessage, state)
VALUES($1, $2, $3, $4)
RETURNING id
`
Expand Down Expand Up @@ -277,7 +283,6 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
}
} else {
this.logger?.error(`connectionInfoCallback is not defined`)
throw new Error(`connectionInfoCallback is not defined`)
}
} else if (this.dbListener) {
// Publish to the Pub/Sub channel if a live session exists on another instance
Expand Down Expand Up @@ -322,7 +327,7 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
const placeholders = messageIds.map((_, index) => `$${index + 2}`).join(', ')

// Construct the SQL DELETE query
const query = `DELETE FROM queuedmessages WHERE connectionid = $1 AND id IN (${placeholders})`
const query = `DELETE FROM ${messagesTableName} WHERE connectionid = $1 AND id IN (${placeholders})`

// Combine connectionId with messageIds as query parameters
const queryParams = [connectionId, ...messageIds]
Expand Down Expand Up @@ -428,11 +433,12 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
try {
await dbClient.connect()

// Check if the 'queuedmessages' table exists.
// Check if the 'messagesTableName' table exists.
const messageTableResult = await dbClient.query(`SELECT to_regclass('${messagesTableName}')`)
if (!messageTableResult.rows[0].to_regclass) {
// If it doesn't exist, create the 'storequeuedmessage' table.
// If it doesn't exist, create the table.
await dbClient.query(createTableMessage)
await dbClient.query(indexLiveSessionTable)
this.logger?.info(`[buildPgDatabase] PostgresDbService Table "${messagesTableName}" created.`)
}

Expand All @@ -441,6 +447,7 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
if (!liveTableResult.rows[0].to_regclass) {
// If it doesn't exist, create the table.
await dbClient.query(createTableLive)
await dbClient.query(indexLiveSessionTable)
this.logger?.info(`[buildPgDatabase] PostgresDbService Table "${liveSessionTableName}" created.`)
} else {
// If the table exists, clean it (truncate or delete, depending on your requirements).
Expand All @@ -467,13 +474,13 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
try {
this.logger?.debug(`[checkQueueMessages] Init verify messages state 'sending'`)
const messagesToSend = await this.messagesCollection?.query(
'SELECT * FROM queuedmessages WHERE state = $1 and connectionid = $2',
`SELECT * FROM ${messagesTableName} WHERE state = $1 and connectionid = $2`,
['sending', connectionID],
)
if (messagesToSend && messagesToSend.rows.length > 0) {
for (const message of messagesToSend.rows) {
// Update the message state to 'pending'
await this.messagesCollection?.query('UPDATE queuedmessages SET state = $1 WHERE id = $2', [
await this.messagesCollection?.query(`UPDATE ${messagesTableName} SET state = $1 WHERE id = $2`, [
'pending',
message.id,
])
Expand Down Expand Up @@ -510,19 +517,19 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
* @returns liveSession object or false
*/
private async findLiveSessionInDb(connectionId: string): Promise<MessagePickupSession | undefined> {
this.logger?.debug(`[getLiveSessionFromDB] initializing find registry for connectionId ${connectionId}`)
this.logger?.debug(`[findLiveSessionInDb] initializing find registry for connectionId ${connectionId}`)
if (!connectionId) throw new Error('connectionId is not defined')
try {
const queryLiveSession = await this.messagesCollection?.query(
`SELECT sessionid, connectionid, protocolVersion, role FROM storelivesession WHERE connectionid = $1 LIMIT $2`,
`SELECT sessionid, connectionid, protocolVersion, role FROM ${liveSessionTableName} WHERE connectionid = $1 LIMIT $2`,
[connectionId, 1],
)
// Check if liveSession is not empty (record found)
const recordFound = queryLiveSession && queryLiveSession.rows && queryLiveSession.rows.length > 0
this.logger?.debug(`[getLiveSessionFromDB] record found status ${recordFound} to connectionId ${connectionId}`)
this.logger?.debug(`[findLiveSessionInDb] record found status ${recordFound} to connectionId ${connectionId}`)
return recordFound ? queryLiveSession.rows[0] : undefined
} catch (error) {
this.logger?.debug(`[getLiveSessionFromDB] Error find to connectionId ${connectionId}`)
this.logger?.debug(`[findLiveSessionInDb] Error find to connectionId ${connectionId}`)
return undefined // Return false in case of an error
}
}
Expand All @@ -534,17 +541,17 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
*/
private async addLiveSessionOnDb(session: MessagePickupSession, instance: string): Promise<void> {
const { id, connectionId, protocolVersion, role } = session
this.logger?.debug(`[addLiveSessionFromDb] initializing add LiveSession DB to connectionId ${connectionId}`)
this.logger?.debug(`[addLiveSessionOnDb] initializing add LiveSession DB to connectionId ${connectionId}`)
if (!session) throw new Error('session is not defined')
try {
const insertMessageDB = await this.messagesCollection?.query(
'INSERT INTO storelivesession (sessionid, connectionid, protocolVersion, role, instance) VALUES($1, $2, $3, $4, $5) RETURNING sessionid',
`INSERT INTO ${liveSessionTableName} (sessionid, connectionid, protocolVersion, role, instance) VALUES($1, $2, $3, $4, $5) RETURNING sessionid`,
[id, connectionId, protocolVersion, role, instance],
)
const liveSessionId = insertMessageDB?.rows[0].sessionid
this.logger?.debug(`[addLiveSessionFromDb] add liveSession to ${connectionId} and result ${liveSessionId}`)
this.logger?.debug(`[addLiveSessionOnDb] add liveSession to ${connectionId} and result ${liveSessionId}`)
} catch (error) {
this.logger?.debug(`[addLiveSessionFromDb] error add liveSession DB ${connectionId}`)
this.logger?.debug(`[addLiveSessionOnDb] error add liveSession DB ${connectionId}`)
}
}

Expand All @@ -553,20 +560,20 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
* @param connectionId
*/
private async removeLiveSessionOnDb(connectionId: string): Promise<void> {
this.logger?.debug(`[removeLiveSessionFromDb] initializing remove LiveSession to connectionId ${connectionId}`)
this.logger?.debug(`[removeLiveSessionOnDb] initializing remove LiveSession to connectionId ${connectionId}`)
if (!connectionId) throw new Error('connectionId is not defined')
try {
// Construct the SQL query with the placeholders
const query = `DELETE FROM storelivesession WHERE connectionid = $1`
const query = `DELETE FROM ${liveSessionTableName} WHERE connectionid = $1`

// Add connectionId for query parameters
const queryParams = [connectionId]

await this.messagesCollection?.query(query, queryParams)

this.logger?.debug(`[removeLiveSessionFromDb] removed LiveSession to connectionId ${connectionId}`)
this.logger?.debug(`[removeLiveSessionOnDb] removed LiveSession to connectionId ${connectionId}`)
} catch (error) {
this.logger?.error(`[removeLiveSessionFromDb] Error removing LiveSession: ${error}`)
this.logger?.error(`[removeLiveSessionOnDb] Error removing LiveSession: ${error}`)
}
}
}

0 comments on commit 1aceb9c

Please sign in to comment.