From 525469a915edb7f22a38eea100db23e85a3b86bb Mon Sep 17 00:00:00 2001 From: Andrey Date: Wed, 20 Nov 2024 17:20:20 +0200 Subject: [PATCH] Update errors proccessing. --- indexer/db.go | 9 +++-- synchronizer/synchronizer.go | 76 ++++++++++++++++++++++++++---------- 2 files changed, 61 insertions(+), 24 deletions(-) diff --git a/indexer/db.go b/indexer/db.go index e06e265..23c55b5 100644 --- a/indexer/db.go +++ b/indexer/db.go @@ -805,13 +805,16 @@ func (p *PostgreSQLpgx) WriteLabes( } defer func() { - if err := recover(); err != nil { + if pErr := recover(); pErr != nil { tx.Rollback(context.Background()) - panic(err) + panic(pErr) } else if err != nil { tx.Rollback(context.Background()) } else { err = tx.Commit(context.Background()) + if err != nil { + fmt.Println("Error committing transaction:", err) + } } }() @@ -831,7 +834,7 @@ func (p *PostgreSQLpgx) WriteLabes( } } - return nil + return err } func (p *PostgreSQLpgx) WriteEvents(tx pgx.Tx, blockchain string, events []EventLabel) error { diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 35ae4a1..4d35f87 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -477,9 +477,23 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) { log.Printf("Read %d users updates from the indexer db in range of blocks %d-%d\n", len(updates), d.startBlock, lastBlockOfChank) var wg sync.WaitGroup + var errWg sync.WaitGroup + errWg.Add(1) - sem := make(chan struct{}, 5) // Semaphore to control concurrency - errChan := make(chan error, 1) // Buffered channel for error handling + sem := make(chan struct{}, d.threads) // Semaphore to limit the number of concurrent goroutines + errChan := make(chan error) // Channel to collect errors from goroutines + + var errs []error + var mu sync.Mutex // To protect access to errs + + go func() { + defer errWg.Done() + for err := range errChan { + mu.Lock() + errs = append(errs, err) + mu.Unlock() + } + }() for _, update := range updates { for instanceId := range customerDBConnections[update.CustomerID] { @@ -489,13 +503,15 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) { } wg.Wait() - - close(sem) close(errChan) // Close the channel to signal that all goroutines have finished - // Check for errors from goroutines - if err := <-errChan; err != nil { - return isEnd, fmt.Errorf("error processing customer updates: %w", err) + // Check if there were any errors + if len(errs) > 0 { + var errMsg string + for _, e := range errs { + errMsg += e.Error() + "\n" + } + return isEnd, fmt.Errorf("errors processing customer updates:\n%s", errMsg) } d.startBlock = lastBlockOfChank + 1 @@ -666,8 +682,23 @@ func (d *Synchronizer) HistoricalSyncRef(customerDbUriFlag string, addresses []s // Process customer updates in parallel var wg sync.WaitGroup + var errWg sync.WaitGroup + errWg.Add(1) + sem := make(chan struct{}, d.threads) // Semaphore to control concurrency - errChan := make(chan error, 1) // Buffered channel for error handling + errChan := make(chan error) // Channel to collect errors from goroutines + + var errs []error + var mu sync.Mutex // To protect access to errs + + go func() { + defer errWg.Done() + for err := range errChan { + mu.Lock() + errs = append(errs, err) + mu.Unlock() + } + }() for _, update := range customerUpdates { @@ -679,13 +710,15 @@ func (d *Synchronizer) HistoricalSyncRef(customerDbUriFlag string, addresses []s } wg.Wait() - close(sem) close(errChan) // Close the channel to signal that all goroutines have finished - // Check for errors from goroutines - // Check for errors from goroutines - if err := <-errChan; err != nil { - return fmt.Errorf("error processing customer updates: %w", err) + // Check if there were any errors + if len(errs) > 0 { + var errMsg string + for _, e := range errs { + errMsg += e.Error() + "\n" + } + return fmt.Errorf("errors processing customer updates:\n%s", errMsg) } d.startBlock = d.endBlock - 1 @@ -725,20 +758,25 @@ func (d *Synchronizer) processProtoCustomerUpdate( // Decode input raw proto data using ABIs // Write decoded data to the user Database - defer wg.Done() - sem <- struct{}{} // Acquire semaphore + defer func() { + if r := recover(); r != nil { + errChan <- fmt.Errorf("panic in goroutine for customer %s, instance %d: %v", update.CustomerID, id, r) + } + wg.Done() + }() + + sem <- struct{}{} // Acquire semaphore + defer func() { <-sem }() // Release semaphore customer, exists := customerDBConnections[update.CustomerID][id] if !exists { errChan <- fmt.Errorf("no DB connection for customer %s", update.CustomerID) - <-sem // Release semaphore return } conn, err := customer.Pgx.GetPool().Acquire(context.Background()) if err != nil { errChan <- fmt.Errorf("error acquiring connection for customer %s: %w", update.CustomerID, err) - <-sem // Release semaphore return } defer conn.Release() @@ -755,7 +793,6 @@ func (d *Synchronizer) processProtoCustomerUpdate( if err != nil { errChan <- fmt.Errorf("error decoding data for customer %s: %w", update.CustomerID, err) - <-sem // Release semaphore return } @@ -764,9 +801,6 @@ func (d *Synchronizer) processProtoCustomerUpdate( if err != nil { errChan <- fmt.Errorf("error writing labels for customer %s: %w", update.CustomerID, err) - <-sem // Release semaphore return } - - <-sem // Release semaphore }