Skip to content

Commit

Permalink
Reuse pgx conn
Browse files Browse the repository at this point in the history
  • Loading branch information
kompotkot committed Jun 20, 2024
1 parent f73a2dc commit 7591648
Showing 1 changed file with 54 additions and 65 deletions.
119 changes: 54 additions & 65 deletions synchronizer/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,22 +86,6 @@ func NewSynchronizer(blockchain, baseDir string, startBlock, endBlock, batchSize

// -------------------------------------------------------------------------------------------------------------------------------

func GetDBConnections(uuids []string) (map[string]string, error) {
connections := make(map[string]string) // Initialize the map

for _, id := range uuids {

connectionString, err := GetDBConnection(id)

if err != nil {
return nil, err
}

connections[id] = connectionString
}
return connections, nil
}

func GetDBConnection(uuid string) (string, error) {

// Create the request
Expand Down Expand Up @@ -142,8 +126,6 @@ func GetDBConnection(uuid string) (string, error) {
}

func (d *Synchronizer) ReadAbiJobsFromDatabase(blockchain string) ([]indexer.AbiJob, error) {
// Simulate reading ABI jobs from the database for a given blockchain.
// This function will need to interact with a real database or an internal API in the future.
abiJobs, err := indexer.DBConnection.ReadABIJobs(blockchain)
if err != nil {
return nil, err
Expand All @@ -166,15 +148,20 @@ func ensurePortInConnectionString(connStr string) (string, error) {
return parsedURL.String(), nil
}

type CustomerDBConnection struct {
Uri string

Pgx *indexer.PostgreSQLpgx
}

// getCustomers fetch ABI jobs, customer IDs and database URLs
func (d *Synchronizer) getCustomers(customerDbUriFlag string) (map[string]string, []string, error) {
rdsConnections := make(map[string]string)
var customerIds []string
func (d *Synchronizer) getCustomers(customerDbUriFlag string) (map[string]CustomerDBConnection, error) {
customerDBConnections := make(map[string]CustomerDBConnection)

// Read ABI jobs from database
abiJobs, err := d.ReadAbiJobsFromDatabase(d.blockchain)
if err != nil {
return nil, customerIds, err
return customerDBConnections, err
}

// Create a set of customer IDs from ABI jobs to remove duplicates
Expand All @@ -183,28 +170,30 @@ func (d *Synchronizer) getCustomers(customerDbUriFlag string) (map[string]string
customerIdsSet[job.CustomerID] = struct{}{}
}

// Convert set to slice
for id := range customerIdsSet {
customerIds = append(customerIds, id)
}
log.Println("Customer IDs to sync:", customerIds)
var customerIds []string

if customerDbUriFlag == "" {
// Get RDS connections for customer IDs
rdsConnections, err = GetDBConnections(customerIds)
if err != nil {
return nil, customerIds, err
for id := range customerIdsSet {
var connectionString string
var dbConnErr error
if customerDbUriFlag == "" {
connectionString, dbConnErr = GetDBConnection(id)
if dbConnErr != nil {
log.Printf("Unable to get connection database URI for %s customer, err: %v", id, dbConnErr)
continue
}
} else {
connectionString = customerDbUriFlag
}
} else {
customersLen := 0
for _, id := range customerIds {
rdsConnections[id] = customerDbUriFlag
customersLen++

customerDBConnections[id] = CustomerDBConnection{
Uri: connectionString,
}
log.Printf("For %d customers set one specified db URI with flag", customersLen)
customerIds = append(customerIds, id)

}
log.Println("Customer IDs to sync:", customerIds)

return rdsConnections, customerIds, nil
return customerDBConnections, nil
}

func (d *Synchronizer) Start(customerDbUriFlag string) {
Expand Down Expand Up @@ -238,38 +227,45 @@ func (d *Synchronizer) Start(customerDbUriFlag string) {
func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) {
var isEnd bool

rdsConnections, customerIds, customersErr := d.getCustomers(customerDbUriFlag)
customerDBConnections, customersErr := d.getCustomers(customerDbUriFlag)
if customersErr != nil {
return isEnd, customersErr
}
var customerIds []string
for id, customer := range customerDBConnections {
pgx, err := indexer.NewPostgreSQLpgxWithCustomURI(customer.Uri)
if err != nil {
log.Println("Error creating RDS connection: ", err)
return isEnd, err
}

if d.startBlock == 0 {
var latestCustomerBlocks []uint64
for _, id := range customerIds {
uri := rdsConnections[id]
updatedCustomer := CustomerDBConnection{
Uri: customer.Uri,
Pgx: pgx,
}
customerDBConnections[id] = updatedCustomer

// TODO(kompotkot): Rewrite to not initialize each time new psql conneciton
pgx, err := indexer.NewPostgreSQLpgxWithCustomURI(uri)
if err != nil {
log.Println("Error creating RDS connection: ", err)
return isEnd, err
}
pool := pgx.GetPool()
customerIds = append(customerIds, id)
}

if d.startBlock == 0 {
var latestCustomerBlocks []uint64
for id, customer := range customerDBConnections {
pool := customer.Pgx.GetPool()
conn, err := pool.Acquire(context.Background())
if err != nil {
log.Println("Error acquiring pool connection: ", err)
return isEnd, err
}
defer conn.Release()

latestBlock, err := pgx.ReadLastLabel(d.blockchain)
latestLabelBlock, err := customer.Pgx.ReadLastLabel(d.blockchain)
if err != nil {
log.Println("Error reading latest block: ", err)
return isEnd, err
}
latestCustomerBlocks = append(latestCustomerBlocks, latestBlock)
log.Printf("Latest block for customer %s is: %d\n", id, latestBlock)
latestCustomerBlocks = append(latestCustomerBlocks, latestLabelBlock)
log.Printf("Latest block for customer %s is: %d\n", id, latestLabelBlock)
}

// Determine the start block as the maximum of the latest blocks of all customers
Expand Down Expand Up @@ -349,17 +345,10 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) {
sem <- struct{}{} // Acquire semaphore

// Get the RDS connection for the customer
uri := rdsConnections[update.CustomerID]
customer := customerDBConnections[update.CustomerID]

// Create a connection to the user RDS
// TODO(kompotkot): Rewrite to not initialize each time new psql conneciton
pgx, err := indexer.NewPostgreSQLpgxWithCustomURI(uri)
if err != nil {
errChan <- fmt.Errorf("error creating connection to RDS for customer %s: %w", update.CustomerID, err)
return
}

pool := pgx.GetPool()
pool := customer.Pgx.GetPool()
conn, err := pool.Acquire(context.Background())
if err != nil {
errChan <- fmt.Errorf("error acquiring connection for customer %s: %w", update.CustomerID, err)
Expand Down Expand Up @@ -413,7 +402,7 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) {
}

// Write events to user RDS
pgx.WriteEvents(
customer.Pgx.WriteEvents(
d.blockchain,
decodedEvents,
)
Expand Down Expand Up @@ -461,7 +450,7 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) {
}

// Write transactions to user RDS
pgx.WriteTransactions(
customer.Pgx.WriteTransactions(
d.blockchain,
decodedTransactions,
)
Expand Down

0 comments on commit 7591648

Please sign in to comment.