Skip to content

Commit

Permalink
Update gorutine channels handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrey committed Nov 21, 2024
1 parent 525469a commit 4b44b36
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 46 deletions.
15 changes: 11 additions & 4 deletions blockchain/blockchain.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm
func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) {
var (
blocks []*seer_common.BlockJson

collectedErrors []error
mu sync.Mutex
wg sync.WaitGroup
ctx = context.Background()
Expand All @@ -238,7 +238,7 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque
}

sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency
errChan := make(chan error, 1)
errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines

for _, b := range blockNumbersRange {
wg.Add(1)
Expand Down Expand Up @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque
close(sem)
close(errChan)

if err := <-errChan; err != nil {
return nil, err
for err := range errChan {
collectedErrors = append(collectedErrors, err)
}

if len(collectedErrors) > 0 {
var errStrings []string
for _, err := range collectedErrors {
errStrings = append(errStrings, err.Error())
}
return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; "))
}
return blocks, nil
}

Expand Down
26 changes: 13 additions & 13 deletions indexer/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,8 +557,10 @@ func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, custome

conn, err := pool.Acquire(context.Background())

var paths []string

if err != nil {
return 0, 0, make([]string, 0), nil, err
return 0, 0, paths, nil, err
}

defer conn.Release()
Expand All @@ -576,7 +578,7 @@ func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, custome
),
latest_block_of_path as (
SELECT
block_number as last_block_number
block_number as latest_block_number
from
%s
WHERE
Expand Down Expand Up @@ -627,7 +629,7 @@ func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, custome
customer_id
)
SELECT
last_block_number,
latest_block_number,
(SELECT array_agg(path) FROM path) as paths,
(SELECT json_agg(json_build_object(customer_id, abis)) FROM reformatted_jobs) as jobs
FROM
Expand All @@ -638,18 +640,17 @@ func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, custome

if err != nil {
log.Println("Error querying abi jobs from database", err)
return 0, 0, make([]string, 0), nil, err
return 0, 0, paths, nil, err
}

var customers []map[string]map[string]map[string]*AbiEntry
var paths []string
var firstBlockNumber, lastBlockNumber uint64

for rows.Next() {
err = rows.Scan(&lastBlockNumber, &paths, &customers)
if err != nil {
log.Println("Error scanning row:", err)
return 0, 0, make([]string, 0), nil, err
return 0, 0, paths, nil, err
}
}

Expand Down Expand Up @@ -791,7 +792,6 @@ func (p *PostgreSQLpgx) WriteLabes(
conn, err := pool.Acquire(context.Background())

if err != nil {
fmt.Println("Error acquiring connection:", err)
return err
}

Expand All @@ -800,7 +800,6 @@ func (p *PostgreSQLpgx) WriteLabes(
tx, err := conn.Begin(context.Background())

if err != nil {
fmt.Println("Error beginning transaction:", err)
return err
}

Expand All @@ -813,23 +812,24 @@ func (p *PostgreSQLpgx) WriteLabes(
} else {
err = tx.Commit(context.Background())
if err != nil {
fmt.Println("Error committing transaction:", err)
log.Println("Error committing transaction:", err)
panic(err)
}
}
}()

if len(transactions) > 0 {
err := p.WriteTransactions(tx, blockchain, transactions)
if err != nil {
fmt.Println("Error writing transactions:", err)
log.Println("Error writing transactions:", err)
return err
}
}

if len(events) > 0 {
err := p.WriteEvents(tx, blockchain, events)
if err != nil {
fmt.Println("Error writing events:", err)
log.Println("Error writing events:", err)
return err
}
}
Expand Down Expand Up @@ -1452,7 +1452,7 @@ func (p *PostgreSQLpgx) RetrievePathsAndBlockBounds(blockchain string, blockNumb
block_number >= $2 and block_number <= $1
), latest_block_of_path as (
SELECT
block_number as last_block_number
block_number as latest_block_number
from
%s
WHERE
Expand All @@ -1469,7 +1469,7 @@ func (p *PostgreSQLpgx) RetrievePathsAndBlockBounds(blockchain string, blockNumb
order by block_number asc
limit 1
)
select ARRAY_AGG(path) as paths, (SELECT first_block_number FROM earliest_block_of_path) as min_block_number, (SELECT last_block_number FROM latest_block_of_path) as max_block_number from path
select ARRAY_AGG(path) as paths, (SELECT first_block_number FROM earliest_block_of_path) as min_block_number, (SELECT latest_block_number FROM latest_block_of_path) as max_block_number from path
`, BlocksTableName(blockchain), BlocksTableName(blockchain), BlocksTableName(blockchain))

err = conn.QueryRow(context.Background(), query, blockNumber, blockNumber-uint64(minBlocksToSync)).Scan(&paths, &minBlockNumber, &maxBlockNumber)
Expand Down
54 changes: 25 additions & 29 deletions synchronizer/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,25 +476,19 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) {
}

log.Printf("Read %d users updates from the indexer db in range of blocks %d-%d\n", len(updates), d.startBlock, lastBlockOfChank)
// Process customer updates in parallel
var wg sync.WaitGroup
var errWg sync.WaitGroup
errWg.Add(1)

sem := make(chan struct{}, d.threads) // Semaphore to limit the number of concurrent goroutines
errChan := make(chan error) // Channel to collect errors from goroutines
// count the number of goroutines that will be running
var totalGoroutines int
for _, update := range updates {
totalGoroutines += len(customerDBConnections[update.CustomerID])
}

var errs []error
var mu sync.Mutex // To protect access to errs

go func() {
defer errWg.Done()
for err := range errChan {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
}
}()
sem := make(chan struct{}, d.threads) // Semaphore to control concurrency
errChan := make(chan error, totalGoroutines) // Channel to collect errors from goroutines

var errs []error
for _, update := range updates {
for instanceId := range customerDBConnections[update.CustomerID] {
wg.Add(1)
Expand All @@ -506,6 +500,10 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) {
close(errChan) // Close the channel to signal that all goroutines have finished

// Check if there were any errors
for err := range errChan {
errs = append(errs, err)
}

if len(errs) > 0 {
var errMsg string
for _, e := range errs {
Expand Down Expand Up @@ -682,23 +680,17 @@ func (d *Synchronizer) HistoricalSyncRef(customerDbUriFlag string, addresses []s

// Process customer updates in parallel
var wg sync.WaitGroup
var errWg sync.WaitGroup
errWg.Add(1)

sem := make(chan struct{}, d.threads) // Semaphore to control concurrency
errChan := make(chan error) // Channel to collect errors from goroutines
// count the number of goroutines that will be running
var totalGoroutines int
for _, update := range customerUpdates {
totalGoroutines += len(customerDBConnections[update.CustomerID])
}

sem := make(chan struct{}, d.threads) // Semaphore to control concurrency
errChan := make(chan error, totalGoroutines) // Channel to collect errors from goroutines

var errs []error
var mu sync.Mutex // To protect access to errs

go func() {
defer errWg.Done()
for err := range errChan {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
}
}()

for _, update := range customerUpdates {

Expand All @@ -713,6 +705,10 @@ func (d *Synchronizer) HistoricalSyncRef(customerDbUriFlag string, addresses []s
close(errChan) // Close the channel to signal that all goroutines have finished

// Check if there were any errors
for err := range errChan {
errs = append(errs, err)
}

if len(errs) > 0 {
var errMsg string
for _, e := range errs {
Expand Down

0 comments on commit 4b44b36

Please sign in to comment.