Skip to content

Commit

Permalink
Update errors proccessing.
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrey committed Nov 20, 2024
1 parent 744b70e commit 525469a
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 24 deletions.
9 changes: 6 additions & 3 deletions indexer/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,13 +805,16 @@ func (p *PostgreSQLpgx) WriteLabes(
}

defer func() {
if err := recover(); err != nil {
if pErr := recover(); pErr != nil {
tx.Rollback(context.Background())
panic(err)
panic(pErr)
} else if err != nil {
tx.Rollback(context.Background())
} else {
err = tx.Commit(context.Background())
if err != nil {
fmt.Println("Error committing transaction:", err)
}
}
}()

Expand All @@ -831,7 +834,7 @@ func (p *PostgreSQLpgx) WriteLabes(
}
}

return nil
return err
}

func (p *PostgreSQLpgx) WriteEvents(tx pgx.Tx, blockchain string, events []EventLabel) error {
Expand Down
76 changes: 55 additions & 21 deletions synchronizer/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,9 +477,23 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) {

log.Printf("Read %d users updates from the indexer db in range of blocks %d-%d\n", len(updates), d.startBlock, lastBlockOfChank)
var wg sync.WaitGroup
var errWg sync.WaitGroup
errWg.Add(1)

sem := make(chan struct{}, 5) // Semaphore to control concurrency
errChan := make(chan error, 1) // Buffered channel for error handling
sem := make(chan struct{}, d.threads) // Semaphore to limit the number of concurrent goroutines
errChan := make(chan error) // Channel to collect errors from goroutines

var errs []error
var mu sync.Mutex // To protect access to errs

go func() {
defer errWg.Done()
for err := range errChan {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
}
}()

for _, update := range updates {
for instanceId := range customerDBConnections[update.CustomerID] {
Expand All @@ -489,13 +503,15 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) {
}

wg.Wait()

close(sem)
close(errChan) // Close the channel to signal that all goroutines have finished

// Check for errors from goroutines
if err := <-errChan; err != nil {
return isEnd, fmt.Errorf("error processing customer updates: %w", err)
// Check if there were any errors
if len(errs) > 0 {
var errMsg string
for _, e := range errs {
errMsg += e.Error() + "\n"
}
return isEnd, fmt.Errorf("errors processing customer updates:\n%s", errMsg)
}

d.startBlock = lastBlockOfChank + 1
Expand Down Expand Up @@ -666,8 +682,23 @@ func (d *Synchronizer) HistoricalSyncRef(customerDbUriFlag string, addresses []s

// Process customer updates in parallel
var wg sync.WaitGroup
var errWg sync.WaitGroup
errWg.Add(1)

sem := make(chan struct{}, d.threads) // Semaphore to control concurrency
errChan := make(chan error, 1) // Buffered channel for error handling
errChan := make(chan error) // Channel to collect errors from goroutines

var errs []error
var mu sync.Mutex // To protect access to errs

go func() {
defer errWg.Done()
for err := range errChan {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
}
}()

for _, update := range customerUpdates {

Expand All @@ -679,13 +710,15 @@ func (d *Synchronizer) HistoricalSyncRef(customerDbUriFlag string, addresses []s
}

wg.Wait()
close(sem)
close(errChan) // Close the channel to signal that all goroutines have finished

// Check for errors from goroutines
// Check for errors from goroutines
if err := <-errChan; err != nil {
return fmt.Errorf("error processing customer updates: %w", err)
// Check if there were any errors
if len(errs) > 0 {
var errMsg string
for _, e := range errs {
errMsg += e.Error() + "\n"
}
return fmt.Errorf("errors processing customer updates:\n%s", errMsg)
}

d.startBlock = d.endBlock - 1
Expand Down Expand Up @@ -725,20 +758,25 @@ func (d *Synchronizer) processProtoCustomerUpdate(
// Decode input raw proto data using ABIs
// Write decoded data to the user Database

defer wg.Done()
sem <- struct{}{} // Acquire semaphore
defer func() {
if r := recover(); r != nil {
errChan <- fmt.Errorf("panic in goroutine for customer %s, instance %d: %v", update.CustomerID, id, r)
}
wg.Done()
}()

sem <- struct{}{} // Acquire semaphore
defer func() { <-sem }() // Release semaphore

customer, exists := customerDBConnections[update.CustomerID][id]
if !exists {
errChan <- fmt.Errorf("no DB connection for customer %s", update.CustomerID)
<-sem // Release semaphore
return
}

conn, err := customer.Pgx.GetPool().Acquire(context.Background())
if err != nil {
errChan <- fmt.Errorf("error acquiring connection for customer %s: %w", update.CustomerID, err)
<-sem // Release semaphore
return
}
defer conn.Release()
Expand All @@ -755,7 +793,6 @@ func (d *Synchronizer) processProtoCustomerUpdate(

if err != nil {
errChan <- fmt.Errorf("error decoding data for customer %s: %w", update.CustomerID, err)
<-sem // Release semaphore
return
}

Expand All @@ -764,9 +801,6 @@ func (d *Synchronizer) processProtoCustomerUpdate(

if err != nil {
errChan <- fmt.Errorf("error writing labels for customer %s: %w", update.CustomerID, err)
<-sem // Release semaphore
return
}

<-sem // Release semaphore
}

0 comments on commit 525469a

Please sign in to comment.