From b01be85c58172c2943151ec44f989c0c386943f1 Mon Sep 17 00:00:00 2001 From: Andrey Date: Sun, 27 Oct 2024 22:10:06 +0200 Subject: [PATCH 01/12] Add batch read with min blocks read. --- indexer/db.go | 26 +++++++++++++------------- storage/aws_bucket.go | 5 +++++ storage/filesystem.go | 18 ++++++++++++++++++ storage/gcp_storage.go | 17 +++++++++++++++++ storage/storage.go | 1 + synchronizer/synchronizer.go | 6 +++--- 6 files changed, 57 insertions(+), 16 deletions(-) diff --git a/indexer/db.go b/indexer/db.go index ed91f44..1346ca2 100644 --- a/indexer/db.go +++ b/indexer/db.go @@ -545,14 +545,14 @@ func (p *PostgreSQLpgx) GetCustomersIDs(blockchain string) ([]string, error) { return customerIds, nil } -func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, customerIds []string) (uint64, uint64, string, []CustomerUpdates, error) { +func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, customerIds []string) (uint64, uint64, []string, []CustomerUpdates, error) { pool := p.GetPool() conn, err := pool.Acquire(context.Background()) if err != nil { - return 0, 0, "", nil, err + return 0, 0, make([]string, 0), nil, err } defer conn.Release() @@ -565,16 +565,16 @@ func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, custome from %s WHERE - block_number = $1 + block_number >= $1 and block_number <= $1 + 50 ), latest_block_of_path as ( SELECT - block_number as block_number, - path as path + max(block_number) as last_block_number, + json_agg(path) as paths from %s WHERE - path = (SELECT path from path) + path = (SELECT path from path ORDER BY block_number desc limit 1) order by block_number desc limit 1 ), @@ -621,8 +621,8 @@ func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, custome customer_id ) SELECT - block_number, - path, + last_block_number, + paths, (SELECT json_agg(json_build_object(customer_id, abis)) FROM reformatted_jobs) as jobs FROM latest_block_of_path @@ -632,18 +632,18 @@ func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, custome if err != nil { log.Println("Error querying abi jobs from database", err) - return 0, 0, "", nil, err + return 0, 0, make([]string, 0), nil, err } var customers []map[string]map[string]map[string]*AbiEntry - var path string + var paths []string var firstBlockNumber, lastBlockNumber uint64 for rows.Next() { - err = rows.Scan(&lastBlockNumber, &path, &customers) + err = rows.Scan(&lastBlockNumber, &paths, &customers) if err != nil { log.Println("Error scanning row:", err) - return 0, 0, "", nil, err + return 0, 0, make([]string, 0), nil, err } } @@ -662,7 +662,7 @@ func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, custome } - return firstBlockNumber, lastBlockNumber, path, customerUpdates, nil + return firstBlockNumber, lastBlockNumber, paths, customerUpdates, nil } diff --git a/storage/aws_bucket.go b/storage/aws_bucket.go index fdf5261..43ea840 100644 --- a/storage/aws_bucket.go +++ b/storage/aws_bucket.go @@ -101,3 +101,8 @@ func (s *S3) ReadBatch(readItems []ReadItem) (map[string][]string, error) { // Implement the ReadBatch method return nil, nil } + +func (s *S3) ReadFiles(keys []string) (bytes.Buffer, error) { + // Implement the ReadFiles method + return bytes.Buffer{}, nil +} diff --git a/storage/filesystem.go b/storage/filesystem.go index 463cd95..a6de3e7 100644 --- a/storage/filesystem.go +++ b/storage/filesystem.go @@ -144,3 +144,21 @@ func (fs *FileStorage) Delete(key string) error { // Implement the Delete method return nil } + +func (fs *FileStorage) ReadFiles(keys []string) (bytes.Buffer, error) { + + var data bytes.Buffer + + for _, key := range keys { + data, err := fs.Read(key) + if err != nil { + return bytes.Buffer{}, err + } + + if _, err := io.Copy(&data, &data); err != nil { + return bytes.Buffer{}, fmt.Errorf("failed to read object data: %v", err) + } + + } + return data, nil +} diff --git a/storage/gcp_storage.go b/storage/gcp_storage.go index 1488ce1..13e0dde 100644 --- a/storage/gcp_storage.go +++ b/storage/gcp_storage.go @@ -216,3 +216,20 @@ func (g *GCS) ReadBatch(readItems []ReadItem) (map[string][]string, error) { return result, nil } + +func (g *GCS) ReadFiles(keys []string) (bytes.Buffer, error) { + var result bytes.Buffer + + for _, key := range keys { + buf, err := g.Read(key) + if err != nil { + return bytes.Buffer{}, fmt.Errorf("failed to read object from bucket: %v", err) + } + + if _, err := io.Copy(&result, &buf); err != nil { + return bytes.Buffer{}, fmt.Errorf("failed to read object data: %v", err) + } + } + + return result, nil +} diff --git a/storage/storage.go b/storage/storage.go index a7874f5..b919e40 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -11,6 +11,7 @@ type Storer interface { Save(batchDir, filename string, bf bytes.Buffer) error Read(key string) (bytes.Buffer, error) ReadBatch(readItems []ReadItem) (map[string][]string, error) + ReadFiles(keys []string) (bytes.Buffer, error) Delete(key string) error List(ctx context.Context, delim, blockBatch string, timeout int, returnFunc ListReturnFunc) ([]string, error) } diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index a0fc1eb..b443b24 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -441,7 +441,7 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) { // Read updates from the indexer db // This function will return a list of customer updates 1 update is 1 customer - _, lastBlockOfChank, path, updates, err := indexer.DBConnection.ReadUpdates(d.blockchain, d.startBlock, customerIds) + _, lastBlockOfChank, paths, updates, err := indexer.DBConnection.ReadUpdates(d.blockchain, d.startBlock, customerIds) if err != nil { return isEnd, fmt.Errorf("error reading updates: %w", err) } @@ -452,12 +452,12 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) { } if crawler.SEER_CRAWLER_DEBUG { - log.Printf("Read batch key: %s", path) + log.Printf("Read batch key: %s", paths) } log.Println("Last block of current chank: ", lastBlockOfChank) // Read the raw data from the storage for current path - rawData, readErr := d.StorageInstance.Read(path) + rawData, readErr := d.StorageInstance.ReadFiles(paths) if readErr != nil { return isEnd, fmt.Errorf("error reading raw data: %w", readErr) } From 7c7e2c88f1d1c0b8df43b99bfa57a8b07cecb35b Mon Sep 17 00:00:00 2001 From: Andrey Date: Mon, 28 Oct 2024 13:34:26 +0200 Subject: [PATCH 02/12] Add min blocks sync parameter. --- cmd.go | 12 +++++++----- indexer/db.go | 6 +++--- synchronizer/synchronizer.go | 34 ++++++++++++++++++---------------- 3 files changed, 28 insertions(+), 24 deletions(-) diff --git a/cmd.go b/cmd.go index badd1e5..4c199e1 100644 --- a/cmd.go +++ b/cmd.go @@ -296,7 +296,7 @@ func CreateCrawlerCommand() *cobra.Command { func CreateSynchronizerCommand() *cobra.Command { var startBlock, endBlock, batchSize uint64 - var timeout, threads, cycleTickerWaitTime int + var timeout, threads, cycleTickerWaitTime, minBlocksToSync int var chain, baseDir, customerDbUriFlag string synchronizerCmd := &cobra.Command{ @@ -337,14 +337,14 @@ func CreateSynchronizerCommand() *cobra.Command { RunE: func(cmd *cobra.Command, args []string) error { indexer.InitDBConnection() - newSynchronizer, synchonizerErr := synchronizer.NewSynchronizer(chain, baseDir, startBlock, endBlock, batchSize, timeout, threads) + newSynchronizer, synchonizerErr := synchronizer.NewSynchronizer(chain, baseDir, startBlock, endBlock, batchSize, timeout, threads, minBlocksToSync) if synchonizerErr != nil { return synchonizerErr } latestBlockNumber, latestErr := newSynchronizer.Client.GetLatestBlockNumber() if latestErr != nil { - return fmt.Errorf("Failed to get latest block number: %v", latestErr) + return fmt.Errorf("failed to get latest block number: %v", latestErr) } if startBlock > latestBlockNumber.Uint64() { @@ -368,6 +368,7 @@ func CreateSynchronizerCommand() *cobra.Command { synchronizerCmd.Flags().StringVar(&customerDbUriFlag, "customer-db-uri", "", "Set customer database URI for development. This workflow bypass fetching customer IDs and its database URL connection strings from mdb-v3-controller API") synchronizerCmd.Flags().IntVar(&threads, "threads", 5, "Number of go-routines for concurrent decoding") synchronizerCmd.Flags().IntVar(&cycleTickerWaitTime, "cycle-ticker-wait-time", 10, "The wait time for the synchronizer in seconds before it try to start new cycle") + synchronizerCmd.Flags().IntVar(&minBlocksToSync, "min-blocks-to-sync", 10, "The minimum number of blocks to sync before the synchronizer starts decoding") return synchronizerCmd } @@ -1011,7 +1012,7 @@ func CreateHistoricalSyncCommand() *cobra.Command { var chain, baseDir, customerDbUriFlag string var addresses, customerIds []string var startBlock, endBlock, batchSize uint64 - var timeout, threads int + var timeout, threads, minBlocksToSync int var auto bool historicalSyncCmd := &cobra.Command{ @@ -1047,7 +1048,7 @@ func CreateHistoricalSyncCommand() *cobra.Command { RunE: func(cmd *cobra.Command, args []string) error { indexer.InitDBConnection() - newSynchronizer, synchonizerErr := synchronizer.NewSynchronizer(chain, baseDir, startBlock, endBlock, batchSize, timeout, threads) + newSynchronizer, synchonizerErr := synchronizer.NewSynchronizer(chain, baseDir, startBlock, endBlock, batchSize, timeout, threads, minBlocksToSync) if synchonizerErr != nil { return synchonizerErr } @@ -1073,6 +1074,7 @@ func CreateHistoricalSyncCommand() *cobra.Command { historicalSyncCmd.Flags().StringSliceVar(&addresses, "addresses", []string{}, "The list of addresses to sync") historicalSyncCmd.Flags().BoolVar(&auto, "auto", false, "Set this flag to sync all unfinished historical crawl from the database (default: false)") historicalSyncCmd.Flags().IntVar(&threads, "threads", 5, "Number of go-routines for concurrent crawling (default: 5)") + historicalSyncCmd.Flags().IntVar(&minBlocksToSync, "min-blocks-to-sync", 50, "The minimum number of blocks to sync before the synchronizer starts decoding") return historicalSyncCmd } diff --git a/indexer/db.go b/indexer/db.go index 1346ca2..b39ed2f 100644 --- a/indexer/db.go +++ b/indexer/db.go @@ -545,7 +545,7 @@ func (p *PostgreSQLpgx) GetCustomersIDs(blockchain string) ([]string, error) { return customerIds, nil } -func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, customerIds []string) (uint64, uint64, []string, []CustomerUpdates, error) { +func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, customerIds []string, minBlocksToSync int) (uint64, uint64, []string, []CustomerUpdates, error) { pool := p.GetPool() @@ -565,7 +565,7 @@ func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, custome from %s WHERE - block_number >= $1 and block_number <= $1 + 50 + block_number >= $1 and block_number <= $1 + $3 ), latest_block_of_path as ( SELECT @@ -628,7 +628,7 @@ func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, custome latest_block_of_path `, blocksTableName, blocksTableName) - rows, err := conn.Query(context.Background(), query, fromBlock, blockchain) + rows, err := conn.Query(context.Background(), query, fromBlock, blockchain, minBlocksToSync) if err != nil { log.Println("Error querying abi jobs from database", err) diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index b443b24..a5b96e3 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -25,17 +25,18 @@ type Synchronizer struct { Client seer_blockchain.BlockchainClient StorageInstance storage.Storer - blockchain string - startBlock uint64 - endBlock uint64 - batchSize uint64 - baseDir string - basePath string - threads int + blockchain string + startBlock uint64 + endBlock uint64 + batchSize uint64 + baseDir string + basePath string + threads int + minBlocksToSync int } // NewSynchronizer creates a new synchronizer instance with the given blockchain handler. -func NewSynchronizer(blockchain, baseDir string, startBlock, endBlock, batchSize uint64, timeout int, threads int) (*Synchronizer, error) { +func NewSynchronizer(blockchain, baseDir string, startBlock, endBlock, batchSize uint64, timeout int, threads int, minBlocksToSync int) (*Synchronizer, error) { var synchronizer Synchronizer basePath := filepath.Join(baseDir, crawler.SeerCrawlerStoragePrefix, "data", blockchain) @@ -61,13 +62,14 @@ func NewSynchronizer(blockchain, baseDir string, startBlock, endBlock, batchSize Client: client, StorageInstance: storageInstance, - blockchain: blockchain, - startBlock: startBlock, - endBlock: endBlock, - batchSize: batchSize, - baseDir: baseDir, - basePath: basePath, - threads: threads, + blockchain: blockchain, + startBlock: startBlock, + endBlock: endBlock, + batchSize: batchSize, + baseDir: baseDir, + basePath: basePath, + threads: threads, + minBlocksToSync: minBlocksToSync, } return &synchronizer, nil @@ -441,7 +443,7 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) { // Read updates from the indexer db // This function will return a list of customer updates 1 update is 1 customer - _, lastBlockOfChank, paths, updates, err := indexer.DBConnection.ReadUpdates(d.blockchain, d.startBlock, customerIds) + _, lastBlockOfChank, paths, updates, err := indexer.DBConnection.ReadUpdates(d.blockchain, d.startBlock, customerIds, d.minBlocksToSync) if err != nil { return isEnd, fmt.Errorf("error reading updates: %w", err) } From 3ec6d8965933d66513dde1c2dd94235570496f3b Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 29 Oct 2024 17:42:16 +0200 Subject: [PATCH 03/12] Update Async Batch read. --- indexer/db.go | 12 ++++----- storage/aws_bucket.go | 5 ++++ storage/filesystem.go | 51 +++++++++++++++++++++++++++++++++++ storage/gcp_storage.go | 52 ++++++++++++++++++++++++++++++++++++ storage/storage.go | 1 + synchronizer/synchronizer.go | 2 +- 6 files changed, 116 insertions(+), 7 deletions(-) diff --git a/indexer/db.go b/indexer/db.go index b39ed2f..c9182b9 100644 --- a/indexer/db.go +++ b/indexer/db.go @@ -561,20 +561,20 @@ func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, custome query := fmt.Sprintf(`WITH path as ( SELECT - path + path, + block_number from %s WHERE - block_number >= $1 and block_number <= $1 + $3 + block_number >= $1 and block_number <= $1 + $3 ), latest_block_of_path as ( SELECT - max(block_number) as last_block_number, - json_agg(path) as paths + block_number as last_block_number from %s WHERE - path = (SELECT path from path ORDER BY block_number desc limit 1) + path = (SELECT path FROM path ORDER BY block_number DESC LIMIT 1) order by block_number desc limit 1 ), @@ -622,7 +622,7 @@ func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, custome ) SELECT last_block_number, - paths, + (SELECT array_agg(path) FROM path) as paths, (SELECT json_agg(json_build_object(customer_id, abis)) FROM reformatted_jobs) as jobs FROM latest_block_of_path diff --git a/storage/aws_bucket.go b/storage/aws_bucket.go index 43ea840..60f31ae 100644 --- a/storage/aws_bucket.go +++ b/storage/aws_bucket.go @@ -106,3 +106,8 @@ func (s *S3) ReadFiles(keys []string) (bytes.Buffer, error) { // Implement the ReadFiles method return bytes.Buffer{}, nil } + +func (s *S3) ReadFilesAsync(keys []string, threads int) ([]bytes.Buffer, error) { + // Implement the ReadFilesAsync method + return []bytes.Buffer{}, nil +} diff --git a/storage/filesystem.go b/storage/filesystem.go index a6de3e7..7662eb1 100644 --- a/storage/filesystem.go +++ b/storage/filesystem.go @@ -9,6 +9,8 @@ import ( "log" "os" "path/filepath" + "strings" + "sync" ) type FileStorage struct { @@ -162,3 +164,52 @@ func (fs *FileStorage) ReadFiles(keys []string) (bytes.Buffer, error) { } return data, nil } + +func (fs *FileStorage) ReadFilesAsync(keys []string, threads int) (bytes.Buffer, error) { + var data bytes.Buffer + var mu sync.Mutex + var wg sync.WaitGroup + errChan := make(chan error, len(keys)) + + // Semaphore to limit the number of concurrent reads + sem := make(chan struct{}, threads) + + for _, key := range keys { + wg.Add(1) + sem <- struct{}{} + go func(k string) { + defer func() { + <-sem + wg.Done() + }() + bf, err := fs.Read(k) + if err != nil { + errChan <- fmt.Errorf("failed to read file %s: %v", k, err) + return + } + mu.Lock() + _, writeErr := data.Write(bf.Bytes()) + mu.Unlock() + if writeErr != nil { + errChan <- fmt.Errorf("failed to write data for file %s: %v", k, writeErr) + } + }(key) + } + + // Wait for all goroutines to finish + wg.Wait() + close(errChan) + + // Check if any errors occurred + if len(errChan) > 0 { + // Collect all errors + var errMsgs []string + for err := range errChan { + errMsgs = append(errMsgs, err.Error()) + } + return data, fmt.Errorf("errors occurred during file reads:\n%s", + strings.Join(errMsgs, "\n")) + } + + return data, nil +} diff --git a/storage/gcp_storage.go b/storage/gcp_storage.go index 13e0dde..46c0ffb 100644 --- a/storage/gcp_storage.go +++ b/storage/gcp_storage.go @@ -9,6 +9,7 @@ import ( "log" "path/filepath" "strings" + "sync" "time" "cloud.google.com/go/storage" @@ -233,3 +234,54 @@ func (g *GCS) ReadFiles(keys []string) (bytes.Buffer, error) { return result, nil } + +func (g *GCS) ReadFilesAsync(keys []string, threads int) (bytes.Buffer, error) { + var result bytes.Buffer + var mu sync.Mutex + var wg sync.WaitGroup + errChan := make(chan error, len(keys)) + + // Semaphore to limit the number of concurrent reads + sem := make(chan struct{}, threads) + + for _, key := range keys { + wg.Add(1) + sem <- struct{}{} + go func(k string) { + defer func() { + <-sem + wg.Done() + }() + + buf, err := g.Read(k) + if err != nil { + errChan <- fmt.Errorf("failed to read object from bucket %s: %v", k, err) + return + } + + mu.Lock() + if _, err := io.Copy(&result, &buf); err != nil { + mu.Unlock() + errChan <- fmt.Errorf("failed to copy data for key %s: %v", k, err) + return + } + mu.Unlock() + }(key) + } + + // Wait for all goroutines to finish + wg.Wait() + close(errChan) + + // Check if any errors occurred + if len(errChan) > 0 { + var errMsgs []string + for err := range errChan { + errMsgs = append(errMsgs, err.Error()) + } + return result, fmt.Errorf("errors occurred during file reads:\n%s", + strings.Join(errMsgs, "\n")) + } + + return result, nil +} diff --git a/storage/storage.go b/storage/storage.go index b919e40..01bf7f8 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -12,6 +12,7 @@ type Storer interface { Read(key string) (bytes.Buffer, error) ReadBatch(readItems []ReadItem) (map[string][]string, error) ReadFiles(keys []string) (bytes.Buffer, error) + ReadFilesAsync(keys []string, threads int) (bytes.Buffer, error) Delete(key string) error List(ctx context.Context, delim, blockBatch string, timeout int, returnFunc ListReturnFunc) ([]string, error) } diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index a5b96e3..6bfe5d6 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -459,7 +459,7 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) { log.Println("Last block of current chank: ", lastBlockOfChank) // Read the raw data from the storage for current path - rawData, readErr := d.StorageInstance.ReadFiles(paths) + rawData, readErr := d.StorageInstance.ReadFilesAsync(paths, d.threads) if readErr != nil { return isEnd, fmt.Errorf("error reading raw data: %w", readErr) } From c15bae455606b73512f35b169e7090c4ffe5749b Mon Sep 17 00:00:00 2001 From: Andrey Date: Wed, 30 Oct 2024 16:32:17 +0200 Subject: [PATCH 04/12] Update batch reading. --- storage/aws_bucket.go | 4 ++-- storage/filesystem.go | 35 +++++++++++------------------------ storage/gcp_storage.go | 20 +++++++------------- storage/storage.go | 4 ++-- synchronizer/synchronizer.go | 27 +++++++++++++++++++-------- 5 files changed, 41 insertions(+), 49 deletions(-) diff --git a/storage/aws_bucket.go b/storage/aws_bucket.go index 60f31ae..244f7ef 100644 --- a/storage/aws_bucket.go +++ b/storage/aws_bucket.go @@ -102,9 +102,9 @@ func (s *S3) ReadBatch(readItems []ReadItem) (map[string][]string, error) { return nil, nil } -func (s *S3) ReadFiles(keys []string) (bytes.Buffer, error) { +func (s *S3) ReadFiles(keys []string) ([]bytes.Buffer, error) { // Implement the ReadFiles method - return bytes.Buffer{}, nil + return []bytes.Buffer{}, nil } func (s *S3) ReadFilesAsync(keys []string, threads int) ([]bytes.Buffer, error) { diff --git a/storage/filesystem.go b/storage/filesystem.go index 7662eb1..3187d1b 100644 --- a/storage/filesystem.go +++ b/storage/filesystem.go @@ -9,7 +9,6 @@ import ( "log" "os" "path/filepath" - "strings" "sync" ) @@ -147,26 +146,24 @@ func (fs *FileStorage) Delete(key string) error { return nil } -func (fs *FileStorage) ReadFiles(keys []string) (bytes.Buffer, error) { +func (fs *FileStorage) ReadFiles(keys []string) ([]bytes.Buffer, error) { - var data bytes.Buffer + var data []bytes.Buffer for _, key := range keys { - data, err := fs.Read(key) - if err != nil { - return bytes.Buffer{}, err - } + dataBlock, err := fs.Read(key) - if _, err := io.Copy(&data, &data); err != nil { - return bytes.Buffer{}, fmt.Errorf("failed to read object data: %v", err) + if err != nil { + return nil, err } + data = append(data, dataBlock) } return data, nil } -func (fs *FileStorage) ReadFilesAsync(keys []string, threads int) (bytes.Buffer, error) { - var data bytes.Buffer +func (fs *FileStorage) ReadFilesAsync(keys []string, threads int) ([]bytes.Buffer, error) { + var data []bytes.Buffer var mu sync.Mutex var wg sync.WaitGroup errChan := make(chan error, len(keys)) @@ -187,28 +184,18 @@ func (fs *FileStorage) ReadFilesAsync(keys []string, threads int) (bytes.Buffer, errChan <- fmt.Errorf("failed to read file %s: %v", k, err) return } + mu.Lock() - _, writeErr := data.Write(bf.Bytes()) + data = append(data, bf) mu.Unlock() - if writeErr != nil { - errChan <- fmt.Errorf("failed to write data for file %s: %v", k, writeErr) - } }(key) } - // Wait for all goroutines to finish wg.Wait() close(errChan) - // Check if any errors occurred if len(errChan) > 0 { - // Collect all errors - var errMsgs []string - for err := range errChan { - errMsgs = append(errMsgs, err.Error()) - } - return data, fmt.Errorf("errors occurred during file reads:\n%s", - strings.Join(errMsgs, "\n")) + return nil, fmt.Errorf("failed to read files: %v", <-errChan) } return data, nil diff --git a/storage/gcp_storage.go b/storage/gcp_storage.go index 46c0ffb..352bfc8 100644 --- a/storage/gcp_storage.go +++ b/storage/gcp_storage.go @@ -218,25 +218,23 @@ func (g *GCS) ReadBatch(readItems []ReadItem) (map[string][]string, error) { return result, nil } -func (g *GCS) ReadFiles(keys []string) (bytes.Buffer, error) { - var result bytes.Buffer +func (g *GCS) ReadFiles(keys []string) ([]bytes.Buffer, error) { + var result []bytes.Buffer for _, key := range keys { buf, err := g.Read(key) if err != nil { - return bytes.Buffer{}, fmt.Errorf("failed to read object from bucket: %v", err) + return nil, fmt.Errorf("failed to read object from bucket %s: %v", key, err) } - if _, err := io.Copy(&result, &buf); err != nil { - return bytes.Buffer{}, fmt.Errorf("failed to read object data: %v", err) - } + result = append(result, buf) } return result, nil } -func (g *GCS) ReadFilesAsync(keys []string, threads int) (bytes.Buffer, error) { - var result bytes.Buffer +func (g *GCS) ReadFilesAsync(keys []string, threads int) ([]bytes.Buffer, error) { + var result []bytes.Buffer var mu sync.Mutex var wg sync.WaitGroup errChan := make(chan error, len(keys)) @@ -260,11 +258,7 @@ func (g *GCS) ReadFilesAsync(keys []string, threads int) (bytes.Buffer, error) { } mu.Lock() - if _, err := io.Copy(&result, &buf); err != nil { - mu.Unlock() - errChan <- fmt.Errorf("failed to copy data for key %s: %v", k, err) - return - } + result = append(result, buf) mu.Unlock() }(key) } diff --git a/storage/storage.go b/storage/storage.go index 01bf7f8..a45795c 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -11,8 +11,8 @@ type Storer interface { Save(batchDir, filename string, bf bytes.Buffer) error Read(key string) (bytes.Buffer, error) ReadBatch(readItems []ReadItem) (map[string][]string, error) - ReadFiles(keys []string) (bytes.Buffer, error) - ReadFilesAsync(keys []string, threads int) (bytes.Buffer, error) + ReadFiles(keys []string) ([]bytes.Buffer, error) + ReadFilesAsync(keys []string, threads int) ([]bytes.Buffer, error) Delete(key string) error List(ctx context.Context, delim, blockBatch string, timeout int, returnFunc ListReturnFunc) ([]string, error) } diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 6bfe5d6..f7bf96c 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -704,7 +704,7 @@ func (d *Synchronizer) HistoricalSyncRef(customerDbUriFlag string, addresses []s func (d *Synchronizer) processProtoCustomerUpdate( update indexer.CustomerUpdates, - rawData bytes.Buffer, + rawDataList []bytes.Buffer, customerDBConnections map[string]map[int]CustomerDBConnection, id int, sem chan struct{}, @@ -731,14 +731,25 @@ func (d *Synchronizer) processProtoCustomerUpdate( return } defer conn.Release() - decodedEvents, decodedTransactions, err := d.Client.DecodeProtoEntireBlockToLabels(&rawData, update.Abis, d.threads) - if err != nil { - errChan <- fmt.Errorf("error %s: %w", update.CustomerID, err) - <-sem // Release semaphore - return - } - err = customer.Pgx.WriteLabes(d.blockchain, decodedTransactions, decodedEvents) + var listDecodedEvents []indexer.EventLabel + var listDecodedTransactions []indexer.TransactionLabel + + for _, rawData := range rawDataList { + // Decode the raw data to transactions + decodedEvents, decodedTransactions, err := d.Client.DecodeProtoEntireBlockToLabels(&rawData, update.Abis, d.threads) + + listDecodedEvents = append(listDecodedEvents, decodedEvents...) + listDecodedTransactions = append(listDecodedTransactions, decodedTransactions...) + + if err != nil { + errChan <- fmt.Errorf("error decoding data for customer %s: %w", update.CustomerID, err) + <-sem // Release semaphore + return + } + + } + err = customer.Pgx.WriteLabes(d.blockchain, listDecodedTransactions, listDecodedEvents) if err != nil { errChan <- fmt.Errorf("error writing labels for customer %s: %w", update.CustomerID, err) From a8bf6ff777b48ff388c53e33ea7d3e2e1539819e Mon Sep 17 00:00:00 2001 From: Andrey Date: Mon, 4 Nov 2024 17:55:47 +0200 Subject: [PATCH 05/12] Add batch reading. --- indexer/db.go | 63 ++++++++++++++++++++++++++++++++++++ synchronizer/synchronizer.go | 10 +++--- 2 files changed, 68 insertions(+), 5 deletions(-) diff --git a/indexer/db.go b/indexer/db.go index c9182b9..a9fb6d7 100644 --- a/indexer/db.go +++ b/indexer/db.go @@ -1412,6 +1412,69 @@ func (p *PostgreSQLpgx) FindBatchPath(blockchain string, blockNumber uint64) (st } +func (p *PostgreSQLpgx) RetrievePathsAndBlockBounds(blockchain string, blockNumber uint64, minBlocksToSync int) ([]string, uint64, uint64, error) { + pool := p.GetPool() + + conn, err := pool.Acquire(context.Background()) + + if err != nil { + return nil, 0, 0, err + } + + defer conn.Release() + + var paths []string + + var minBlockNumber uint64 + + var maxBlockNumber uint64 + query := fmt.Sprintf(`WITH path as ( + SELECT + path, + block_number + from + %s + WHERE + block_number >= $1 and block_number <= $1 + $3 + ), latest_block_of_path as ( + SELECT + block_number as last_block_number + from + %s + WHERE + path = (SELECT path FROM path ORDER BY block_number DESC LIMIT 1) + order by block_number desc + limit 1 + ), earliest_block_of_path as ( + SELECT + block_number as first_block_number + from + %s + WHERE + path = (SELECT path FROM path ORDER BY block_number ASC LIMIT 1) + order by block_number asc + limit 1 + ) + select arr_agg(select path from path) as paths, (SELECT first_block_number FROM earliest_block_of_path) as min_block_number, (SELECT last_block_number FROM latest_block_of_path) as max_block_number + `, BlocksTableName(blockchain), BlocksTableName(blockchain), BlocksTableName(blockchain)) + + err = conn.QueryRow(context.Background(), query, blockNumber).Scan(&paths, &minBlockNumber, &maxBlockNumber) + + if err != nil { + if err == pgx.ErrNoRows { + // Blocks not indexed yet + return nil, 0, 0, nil + } + return nil, + 0, + 0, + err + } + + return paths, minBlockNumber, maxBlockNumber, nil + +} + func (p *PostgreSQLpgx) GetAbiJobsWithoutDeployBlocks(blockchain string) (map[string]map[string][]string, error) { pool := p.GetPool() diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index f7bf96c..b664760 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -626,16 +626,16 @@ func (d *Synchronizer) HistoricalSyncRef(customerDbUriFlag string, addresses []s } // Determine the processing strategy (RPC or storage) - var path string + var paths []string var firstBlockOfChunk uint64 for { - path, firstBlockOfChunk, _, err = indexer.DBConnection.FindBatchPath(d.blockchain, d.startBlock) + paths, firstBlockOfChunk, _, err = indexer.DBConnection.RetrievePathsAndBlockBounds(d.blockchain, d.startBlock, d.minBlocksToSync) if err != nil { return fmt.Errorf("error finding batch path: %w", err) } - if path != "" { + if paths != nil { d.endBlock = firstBlockOfChunk break } @@ -645,8 +645,8 @@ func (d *Synchronizer) HistoricalSyncRef(customerDbUriFlag string, addresses []s } // Read raw data from storage or via RPC - var rawData bytes.Buffer - rawData, err = d.StorageInstance.Read(path) + var rawData []bytes.Buffer + rawData, err = d.StorageInstance.ReadFilesAsync(paths, d.threads) if err != nil { return fmt.Errorf("error reading events from storage: %w", err) } From 744b70e84ea91d1d6257180481246ce8855e9ba9 Mon Sep 17 00:00:00 2001 From: Andrey Date: Thu, 14 Nov 2024 12:10:36 +0200 Subject: [PATCH 06/12] Update historical sync. --- cmd.go | 5 +++++ indexer/db.go | 11 ++++++++--- synchronizer/synchronizer.go | 10 +++++----- 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/cmd.go b/cmd.go index 4c199e1..cd389f0 100644 --- a/cmd.go +++ b/cmd.go @@ -1039,6 +1039,11 @@ func CreateHistoricalSyncCommand() *cobra.Command { return syncErr } + blockchainErr := seer_blockchain.CheckVariablesForBlockchains() + if blockchainErr != nil { + return blockchainErr + } + if chain == "" { return fmt.Errorf("blockchain is required via --chain") } diff --git a/indexer/db.go b/indexer/db.go index 669acbd..e06e265 100644 --- a/indexer/db.go +++ b/indexer/db.go @@ -791,6 +791,7 @@ func (p *PostgreSQLpgx) WriteLabes( conn, err := pool.Acquire(context.Background()) if err != nil { + fmt.Println("Error acquiring connection:", err) return err } @@ -799,6 +800,7 @@ func (p *PostgreSQLpgx) WriteLabes( tx, err := conn.Begin(context.Background()) if err != nil { + fmt.Println("Error beginning transaction:", err) return err } @@ -816,6 +818,7 @@ func (p *PostgreSQLpgx) WriteLabes( if len(transactions) > 0 { err := p.WriteTransactions(tx, blockchain, transactions) if err != nil { + fmt.Println("Error writing transactions:", err) return err } } @@ -823,6 +826,7 @@ func (p *PostgreSQLpgx) WriteLabes( if len(events) > 0 { err := p.WriteEvents(tx, blockchain, events) if err != nil { + fmt.Println("Error writing events:", err) return err } } @@ -1434,6 +1438,7 @@ func (p *PostgreSQLpgx) RetrievePathsAndBlockBounds(blockchain string, blockNumb var minBlockNumber uint64 var maxBlockNumber uint64 + query := fmt.Sprintf(`WITH path as ( SELECT path, @@ -1441,7 +1446,7 @@ func (p *PostgreSQLpgx) RetrievePathsAndBlockBounds(blockchain string, blockNumb from %s WHERE - block_number >= $1 and block_number <= $1 + $3 + block_number >= $2 and block_number <= $1 ), latest_block_of_path as ( SELECT block_number as last_block_number @@ -1461,10 +1466,10 @@ func (p *PostgreSQLpgx) RetrievePathsAndBlockBounds(blockchain string, blockNumb order by block_number asc limit 1 ) - select arr_agg(select path from path) as paths, (SELECT first_block_number FROM earliest_block_of_path) as min_block_number, (SELECT last_block_number FROM latest_block_of_path) as max_block_number + select ARRAY_AGG(path) as paths, (SELECT first_block_number FROM earliest_block_of_path) as min_block_number, (SELECT last_block_number FROM latest_block_of_path) as max_block_number from path `, BlocksTableName(blockchain), BlocksTableName(blockchain), BlocksTableName(blockchain)) - err = conn.QueryRow(context.Background(), query, blockNumber).Scan(&paths, &minBlockNumber, &maxBlockNumber) + err = conn.QueryRow(context.Background(), query, blockNumber, blockNumber-uint64(minBlocksToSync)).Scan(&paths, &minBlockNumber, &maxBlockNumber) if err != nil { if err == pgx.ErrNoRows { diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index a27fa18..35ae4a1 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -683,15 +683,15 @@ func (d *Synchronizer) HistoricalSyncRef(customerDbUriFlag string, addresses []s close(errChan) // Close the channel to signal that all goroutines have finished // Check for errors from goroutines - select { - case err := <-errChan: - log.Printf("Error processing customer updates: %v", err) - return err - default: + // Check for errors from goroutines + if err := <-errChan; err != nil { + return fmt.Errorf("error processing customer updates: %w", err) } d.startBlock = d.endBlock - 1 + fmt.Printf("Processed %d customer updates for block range %d-%d\n", len(customerUpdates), d.startBlock, d.endBlock) + if isCycleFinished || d.startBlock == 0 { if autoJobs { for address, abisInfo := range addressesAbisInfo { From 525469a915edb7f22a38eea100db23e85a3b86bb Mon Sep 17 00:00:00 2001 From: Andrey Date: Wed, 20 Nov 2024 17:20:20 +0200 Subject: [PATCH 07/12] Update errors proccessing. --- indexer/db.go | 9 +++-- synchronizer/synchronizer.go | 76 ++++++++++++++++++++++++++---------- 2 files changed, 61 insertions(+), 24 deletions(-) diff --git a/indexer/db.go b/indexer/db.go index e06e265..23c55b5 100644 --- a/indexer/db.go +++ b/indexer/db.go @@ -805,13 +805,16 @@ func (p *PostgreSQLpgx) WriteLabes( } defer func() { - if err := recover(); err != nil { + if pErr := recover(); pErr != nil { tx.Rollback(context.Background()) - panic(err) + panic(pErr) } else if err != nil { tx.Rollback(context.Background()) } else { err = tx.Commit(context.Background()) + if err != nil { + fmt.Println("Error committing transaction:", err) + } } }() @@ -831,7 +834,7 @@ func (p *PostgreSQLpgx) WriteLabes( } } - return nil + return err } func (p *PostgreSQLpgx) WriteEvents(tx pgx.Tx, blockchain string, events []EventLabel) error { diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 35ae4a1..4d35f87 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -477,9 +477,23 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) { log.Printf("Read %d users updates from the indexer db in range of blocks %d-%d\n", len(updates), d.startBlock, lastBlockOfChank) var wg sync.WaitGroup + var errWg sync.WaitGroup + errWg.Add(1) - sem := make(chan struct{}, 5) // Semaphore to control concurrency - errChan := make(chan error, 1) // Buffered channel for error handling + sem := make(chan struct{}, d.threads) // Semaphore to limit the number of concurrent goroutines + errChan := make(chan error) // Channel to collect errors from goroutines + + var errs []error + var mu sync.Mutex // To protect access to errs + + go func() { + defer errWg.Done() + for err := range errChan { + mu.Lock() + errs = append(errs, err) + mu.Unlock() + } + }() for _, update := range updates { for instanceId := range customerDBConnections[update.CustomerID] { @@ -489,13 +503,15 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) { } wg.Wait() - - close(sem) close(errChan) // Close the channel to signal that all goroutines have finished - // Check for errors from goroutines - if err := <-errChan; err != nil { - return isEnd, fmt.Errorf("error processing customer updates: %w", err) + // Check if there were any errors + if len(errs) > 0 { + var errMsg string + for _, e := range errs { + errMsg += e.Error() + "\n" + } + return isEnd, fmt.Errorf("errors processing customer updates:\n%s", errMsg) } d.startBlock = lastBlockOfChank + 1 @@ -666,8 +682,23 @@ func (d *Synchronizer) HistoricalSyncRef(customerDbUriFlag string, addresses []s // Process customer updates in parallel var wg sync.WaitGroup + var errWg sync.WaitGroup + errWg.Add(1) + sem := make(chan struct{}, d.threads) // Semaphore to control concurrency - errChan := make(chan error, 1) // Buffered channel for error handling + errChan := make(chan error) // Channel to collect errors from goroutines + + var errs []error + var mu sync.Mutex // To protect access to errs + + go func() { + defer errWg.Done() + for err := range errChan { + mu.Lock() + errs = append(errs, err) + mu.Unlock() + } + }() for _, update := range customerUpdates { @@ -679,13 +710,15 @@ func (d *Synchronizer) HistoricalSyncRef(customerDbUriFlag string, addresses []s } wg.Wait() - close(sem) close(errChan) // Close the channel to signal that all goroutines have finished - // Check for errors from goroutines - // Check for errors from goroutines - if err := <-errChan; err != nil { - return fmt.Errorf("error processing customer updates: %w", err) + // Check if there were any errors + if len(errs) > 0 { + var errMsg string + for _, e := range errs { + errMsg += e.Error() + "\n" + } + return fmt.Errorf("errors processing customer updates:\n%s", errMsg) } d.startBlock = d.endBlock - 1 @@ -725,20 +758,25 @@ func (d *Synchronizer) processProtoCustomerUpdate( // Decode input raw proto data using ABIs // Write decoded data to the user Database - defer wg.Done() - sem <- struct{}{} // Acquire semaphore + defer func() { + if r := recover(); r != nil { + errChan <- fmt.Errorf("panic in goroutine for customer %s, instance %d: %v", update.CustomerID, id, r) + } + wg.Done() + }() + + sem <- struct{}{} // Acquire semaphore + defer func() { <-sem }() // Release semaphore customer, exists := customerDBConnections[update.CustomerID][id] if !exists { errChan <- fmt.Errorf("no DB connection for customer %s", update.CustomerID) - <-sem // Release semaphore return } conn, err := customer.Pgx.GetPool().Acquire(context.Background()) if err != nil { errChan <- fmt.Errorf("error acquiring connection for customer %s: %w", update.CustomerID, err) - <-sem // Release semaphore return } defer conn.Release() @@ -755,7 +793,6 @@ func (d *Synchronizer) processProtoCustomerUpdate( if err != nil { errChan <- fmt.Errorf("error decoding data for customer %s: %w", update.CustomerID, err) - <-sem // Release semaphore return } @@ -764,9 +801,6 @@ func (d *Synchronizer) processProtoCustomerUpdate( if err != nil { errChan <- fmt.Errorf("error writing labels for customer %s: %w", update.CustomerID, err) - <-sem // Release semaphore return } - - <-sem // Release semaphore } From 4b44b36c65e2a8c001cf295cc5e2f2c5a3d1c25d Mon Sep 17 00:00:00 2001 From: Andrey Date: Thu, 21 Nov 2024 11:36:54 +0200 Subject: [PATCH 08/12] Update gorutine channels handling. --- blockchain/blockchain.go.tmpl | 15 +++++++--- indexer/db.go | 26 ++++++++--------- synchronizer/synchronizer.go | 54 ++++++++++++++++------------------- 3 files changed, 49 insertions(+), 46 deletions(-) diff --git a/blockchain/blockchain.go.tmpl b/blockchain/blockchain.go.tmpl index 8b0a4f1..e2dfcff 100644 --- a/blockchain/blockchain.go.tmpl +++ b/blockchain/blockchain.go.tmpl @@ -226,7 +226,7 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( blocks []*seer_common.BlockJson - + collectedErrors []error mu sync.Mutex wg sync.WaitGroup ctx = context.Background() @@ -238,7 +238,7 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque } sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/indexer/db.go b/indexer/db.go index 23c55b5..474065d 100644 --- a/indexer/db.go +++ b/indexer/db.go @@ -557,8 +557,10 @@ func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, custome conn, err := pool.Acquire(context.Background()) + var paths []string + if err != nil { - return 0, 0, make([]string, 0), nil, err + return 0, 0, paths, nil, err } defer conn.Release() @@ -576,7 +578,7 @@ func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, custome ), latest_block_of_path as ( SELECT - block_number as last_block_number + block_number as latest_block_number from %s WHERE @@ -627,7 +629,7 @@ func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, custome customer_id ) SELECT - last_block_number, + latest_block_number, (SELECT array_agg(path) FROM path) as paths, (SELECT json_agg(json_build_object(customer_id, abis)) FROM reformatted_jobs) as jobs FROM @@ -638,18 +640,17 @@ func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, custome if err != nil { log.Println("Error querying abi jobs from database", err) - return 0, 0, make([]string, 0), nil, err + return 0, 0, paths, nil, err } var customers []map[string]map[string]map[string]*AbiEntry - var paths []string var firstBlockNumber, lastBlockNumber uint64 for rows.Next() { err = rows.Scan(&lastBlockNumber, &paths, &customers) if err != nil { log.Println("Error scanning row:", err) - return 0, 0, make([]string, 0), nil, err + return 0, 0, paths, nil, err } } @@ -791,7 +792,6 @@ func (p *PostgreSQLpgx) WriteLabes( conn, err := pool.Acquire(context.Background()) if err != nil { - fmt.Println("Error acquiring connection:", err) return err } @@ -800,7 +800,6 @@ func (p *PostgreSQLpgx) WriteLabes( tx, err := conn.Begin(context.Background()) if err != nil { - fmt.Println("Error beginning transaction:", err) return err } @@ -813,7 +812,8 @@ func (p *PostgreSQLpgx) WriteLabes( } else { err = tx.Commit(context.Background()) if err != nil { - fmt.Println("Error committing transaction:", err) + log.Println("Error committing transaction:", err) + panic(err) } } }() @@ -821,7 +821,7 @@ func (p *PostgreSQLpgx) WriteLabes( if len(transactions) > 0 { err := p.WriteTransactions(tx, blockchain, transactions) if err != nil { - fmt.Println("Error writing transactions:", err) + log.Println("Error writing transactions:", err) return err } } @@ -829,7 +829,7 @@ func (p *PostgreSQLpgx) WriteLabes( if len(events) > 0 { err := p.WriteEvents(tx, blockchain, events) if err != nil { - fmt.Println("Error writing events:", err) + log.Println("Error writing events:", err) return err } } @@ -1452,7 +1452,7 @@ func (p *PostgreSQLpgx) RetrievePathsAndBlockBounds(blockchain string, blockNumb block_number >= $2 and block_number <= $1 ), latest_block_of_path as ( SELECT - block_number as last_block_number + block_number as latest_block_number from %s WHERE @@ -1469,7 +1469,7 @@ func (p *PostgreSQLpgx) RetrievePathsAndBlockBounds(blockchain string, blockNumb order by block_number asc limit 1 ) - select ARRAY_AGG(path) as paths, (SELECT first_block_number FROM earliest_block_of_path) as min_block_number, (SELECT last_block_number FROM latest_block_of_path) as max_block_number from path + select ARRAY_AGG(path) as paths, (SELECT first_block_number FROM earliest_block_of_path) as min_block_number, (SELECT latest_block_number FROM latest_block_of_path) as max_block_number from path `, BlocksTableName(blockchain), BlocksTableName(blockchain), BlocksTableName(blockchain)) err = conn.QueryRow(context.Background(), query, blockNumber, blockNumber-uint64(minBlocksToSync)).Scan(&paths, &minBlockNumber, &maxBlockNumber) diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 4d35f87..b8d67fb 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -476,25 +476,19 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) { } log.Printf("Read %d users updates from the indexer db in range of blocks %d-%d\n", len(updates), d.startBlock, lastBlockOfChank) + // Process customer updates in parallel var wg sync.WaitGroup - var errWg sync.WaitGroup - errWg.Add(1) - sem := make(chan struct{}, d.threads) // Semaphore to limit the number of concurrent goroutines - errChan := make(chan error) // Channel to collect errors from goroutines + // count the number of goroutines that will be running + var totalGoroutines int + for _, update := range updates { + totalGoroutines += len(customerDBConnections[update.CustomerID]) + } - var errs []error - var mu sync.Mutex // To protect access to errs - - go func() { - defer errWg.Done() - for err := range errChan { - mu.Lock() - errs = append(errs, err) - mu.Unlock() - } - }() + sem := make(chan struct{}, d.threads) // Semaphore to control concurrency + errChan := make(chan error, totalGoroutines) // Channel to collect errors from goroutines + var errs []error for _, update := range updates { for instanceId := range customerDBConnections[update.CustomerID] { wg.Add(1) @@ -506,6 +500,10 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) { close(errChan) // Close the channel to signal that all goroutines have finished // Check if there were any errors + for err := range errChan { + errs = append(errs, err) + } + if len(errs) > 0 { var errMsg string for _, e := range errs { @@ -682,23 +680,17 @@ func (d *Synchronizer) HistoricalSyncRef(customerDbUriFlag string, addresses []s // Process customer updates in parallel var wg sync.WaitGroup - var errWg sync.WaitGroup - errWg.Add(1) - sem := make(chan struct{}, d.threads) // Semaphore to control concurrency - errChan := make(chan error) // Channel to collect errors from goroutines + // count the number of goroutines that will be running + var totalGoroutines int + for _, update := range customerUpdates { + totalGoroutines += len(customerDBConnections[update.CustomerID]) + } + + sem := make(chan struct{}, d.threads) // Semaphore to control concurrency + errChan := make(chan error, totalGoroutines) // Channel to collect errors from goroutines var errs []error - var mu sync.Mutex // To protect access to errs - - go func() { - defer errWg.Done() - for err := range errChan { - mu.Lock() - errs = append(errs, err) - mu.Unlock() - } - }() for _, update := range customerUpdates { @@ -713,6 +705,10 @@ func (d *Synchronizer) HistoricalSyncRef(customerDbUriFlag string, addresses []s close(errChan) // Close the channel to signal that all goroutines have finished // Check if there were any errors + for err := range errChan { + errs = append(errs, err) + } + if len(errs) > 0 { var errMsg string for _, e := range errs { From 047bc2bac2e32bc8e5cf6f05f70dc527f9b2edc4 Mon Sep 17 00:00:00 2001 From: Andrey Date: Thu, 21 Nov 2024 12:07:46 +0200 Subject: [PATCH 09/12] Reduce ode duplication. --- storage/aws_bucket.go | 10 ------ storage/filesystem.go | 56 ------------------------------ storage/gcp_storage.go | 63 --------------------------------- storage/storage.go | 67 ++++++++++++++++++++++++++++++++++-- synchronizer/synchronizer.go | 4 +-- 5 files changed, 67 insertions(+), 133 deletions(-) diff --git a/storage/aws_bucket.go b/storage/aws_bucket.go index 244f7ef..fdf5261 100644 --- a/storage/aws_bucket.go +++ b/storage/aws_bucket.go @@ -101,13 +101,3 @@ func (s *S3) ReadBatch(readItems []ReadItem) (map[string][]string, error) { // Implement the ReadBatch method return nil, nil } - -func (s *S3) ReadFiles(keys []string) ([]bytes.Buffer, error) { - // Implement the ReadFiles method - return []bytes.Buffer{}, nil -} - -func (s *S3) ReadFilesAsync(keys []string, threads int) ([]bytes.Buffer, error) { - // Implement the ReadFilesAsync method - return []bytes.Buffer{}, nil -} diff --git a/storage/filesystem.go b/storage/filesystem.go index 3187d1b..463cd95 100644 --- a/storage/filesystem.go +++ b/storage/filesystem.go @@ -9,7 +9,6 @@ import ( "log" "os" "path/filepath" - "sync" ) type FileStorage struct { @@ -145,58 +144,3 @@ func (fs *FileStorage) Delete(key string) error { // Implement the Delete method return nil } - -func (fs *FileStorage) ReadFiles(keys []string) ([]bytes.Buffer, error) { - - var data []bytes.Buffer - - for _, key := range keys { - dataBlock, err := fs.Read(key) - - if err != nil { - return nil, err - } - data = append(data, dataBlock) - - } - return data, nil -} - -func (fs *FileStorage) ReadFilesAsync(keys []string, threads int) ([]bytes.Buffer, error) { - var data []bytes.Buffer - var mu sync.Mutex - var wg sync.WaitGroup - errChan := make(chan error, len(keys)) - - // Semaphore to limit the number of concurrent reads - sem := make(chan struct{}, threads) - - for _, key := range keys { - wg.Add(1) - sem <- struct{}{} - go func(k string) { - defer func() { - <-sem - wg.Done() - }() - bf, err := fs.Read(k) - if err != nil { - errChan <- fmt.Errorf("failed to read file %s: %v", k, err) - return - } - - mu.Lock() - data = append(data, bf) - mu.Unlock() - }(key) - } - - wg.Wait() - close(errChan) - - if len(errChan) > 0 { - return nil, fmt.Errorf("failed to read files: %v", <-errChan) - } - - return data, nil -} diff --git a/storage/gcp_storage.go b/storage/gcp_storage.go index 352bfc8..1488ce1 100644 --- a/storage/gcp_storage.go +++ b/storage/gcp_storage.go @@ -9,7 +9,6 @@ import ( "log" "path/filepath" "strings" - "sync" "time" "cloud.google.com/go/storage" @@ -217,65 +216,3 @@ func (g *GCS) ReadBatch(readItems []ReadItem) (map[string][]string, error) { return result, nil } - -func (g *GCS) ReadFiles(keys []string) ([]bytes.Buffer, error) { - var result []bytes.Buffer - - for _, key := range keys { - buf, err := g.Read(key) - if err != nil { - return nil, fmt.Errorf("failed to read object from bucket %s: %v", key, err) - } - - result = append(result, buf) - } - - return result, nil -} - -func (g *GCS) ReadFilesAsync(keys []string, threads int) ([]bytes.Buffer, error) { - var result []bytes.Buffer - var mu sync.Mutex - var wg sync.WaitGroup - errChan := make(chan error, len(keys)) - - // Semaphore to limit the number of concurrent reads - sem := make(chan struct{}, threads) - - for _, key := range keys { - wg.Add(1) - sem <- struct{}{} - go func(k string) { - defer func() { - <-sem - wg.Done() - }() - - buf, err := g.Read(k) - if err != nil { - errChan <- fmt.Errorf("failed to read object from bucket %s: %v", k, err) - return - } - - mu.Lock() - result = append(result, buf) - mu.Unlock() - }(key) - } - - // Wait for all goroutines to finish - wg.Wait() - close(errChan) - - // Check if any errors occurred - if len(errChan) > 0 { - var errMsgs []string - for err := range errChan { - errMsgs = append(errMsgs, err.Error()) - } - return result, fmt.Errorf("errors occurred during file reads:\n%s", - strings.Join(errMsgs, "\n")) - } - - return result, nil -} diff --git a/storage/storage.go b/storage/storage.go index a45795c..4933a90 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -3,6 +3,9 @@ package storage import ( "bytes" "context" + "fmt" + "strings" + "sync" ) type ListReturnFunc func(any) string @@ -11,8 +14,6 @@ type Storer interface { Save(batchDir, filename string, bf bytes.Buffer) error Read(key string) (bytes.Buffer, error) ReadBatch(readItems []ReadItem) (map[string][]string, error) - ReadFiles(keys []string) ([]bytes.Buffer, error) - ReadFilesAsync(keys []string, threads int) ([]bytes.Buffer, error) Delete(key string) error List(ctx context.Context, delim, blockBatch string, timeout int, returnFunc ListReturnFunc) ([]string, error) } @@ -21,3 +22,65 @@ type ReadItem struct { Key string RowIds []uint64 } + +func ReadFiles(keys []string, storageInstance Storer) ([]bytes.Buffer, error) { + var result []bytes.Buffer + + for _, key := range keys { + buf, err := storageInstance.Read(key) + if err != nil { + return nil, fmt.Errorf("failed to read object from bucket %s: %v", key, err) + } + + result = append(result, buf) + } + + return result, nil +} + +func ReadFilesAsync(keys []string, threads int, storageInstance Storer) ([]bytes.Buffer, error) { + var result []bytes.Buffer + var mu sync.Mutex + var wg sync.WaitGroup + errChan := make(chan error, len(keys)) + + // Semaphore to limit the number of concurrent reads + sem := make(chan struct{}, threads) + + for _, key := range keys { + wg.Add(1) + sem <- struct{}{} + go func(k string) { + defer func() { + <-sem + wg.Done() + }() + + buf, err := storageInstance.Read(k) + if err != nil { + errChan <- fmt.Errorf("failed to read object from bucket %s: %v", k, err) + return + } + + mu.Lock() + result = append(result, buf) + mu.Unlock() + }(key) + } + + // Wait for all goroutines to finish + wg.Wait() + close(errChan) + + // Check if any errors occurred + if len(errChan) > 0 { + var errMsgs []string + for err := range errChan { + errMsgs = append(errMsgs, err.Error()) + } + return result, fmt.Errorf("errors occurred during file reads:\n%s", + strings.Join(errMsgs, "\n")) + } + + return result, nil +} diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index b8d67fb..17d0ea8 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -470,7 +470,7 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) { log.Println("Last block of current chank: ", lastBlockOfChank) // Read the raw data from the storage for current path - rawData, readErr := d.StorageInstance.ReadFilesAsync(paths, d.threads) + rawData, readErr := storage.ReadFilesAsync(paths, d.threads, d.StorageInstance) if readErr != nil { return isEnd, fmt.Errorf("error reading raw data: %w", readErr) } @@ -671,7 +671,7 @@ func (d *Synchronizer) HistoricalSyncRef(customerDbUriFlag string, addresses []s // Read raw data from storage or via RPC var rawData []bytes.Buffer - rawData, err = d.StorageInstance.ReadFilesAsync(paths, d.threads) + rawData, err = storage.ReadFilesAsync(paths, d.threads, d.StorageInstance) if err != nil { return fmt.Errorf("error reading events from storage: %w", err) } From 56d386fba29cdf16b47915c71ccbe377a3758e6b Mon Sep 17 00:00:00 2001 From: Andrey Date: Thu, 21 Nov 2024 12:24:22 +0200 Subject: [PATCH 10/12] Update clients gorutines. --- blockchain/arbitrum_one/arbitrum_one.go | 25 ++++++++++++------- .../arbitrum_sepolia/arbitrum_sepolia.go | 25 ++++++++++++------- blockchain/b3/b3.go | 25 ++++++++++++------- blockchain/b3_sepolia/b3_sepolia.go | 25 ++++++++++++------- blockchain/ethereum/ethereum.go | 25 ++++++++++++------- blockchain/game7/game7.go | 25 ++++++++++++------- .../game7_orbit_arbitrum_sepolia.go | 25 ++++++++++++------- blockchain/game7_testnet/game7_testnet.go | 25 ++++++++++++------- blockchain/imx_zkevm/imx_zkevm.go | 25 ++++++++++++------- .../imx_zkevm_sepolia/imx_zkevm_sepolia.go | 25 ++++++++++++------- blockchain/mantle/mantle.go | 25 ++++++++++++------- blockchain/mantle_sepolia/mantle_sepolia.go | 25 ++++++++++++------- blockchain/polygon/polygon.go | 25 ++++++++++++------- blockchain/ronin/ronin.go | 25 ++++++++++++------- blockchain/ronin_saigon/ronin_saigon.go | 25 ++++++++++++------- blockchain/sepolia/sepolia.go | 25 ++++++++++++------- blockchain/xai/xai.go | 25 ++++++++++++------- blockchain/xai_sepolia/xai_sepolia.go | 25 ++++++++++++------- 18 files changed, 288 insertions(+), 162 deletions(-) diff --git a/blockchain/arbitrum_one/arbitrum_one.go b/blockchain/arbitrum_one/arbitrum_one.go index 3b5194b..ed36ee7 100644 --- a/blockchain/arbitrum_one/arbitrum_one.go +++ b/blockchain/arbitrum_one/arbitrum_one.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/arbitrum_sepolia/arbitrum_sepolia.go b/blockchain/arbitrum_sepolia/arbitrum_sepolia.go index e33e13b..eb23bee 100644 --- a/blockchain/arbitrum_sepolia/arbitrum_sepolia.go +++ b/blockchain/arbitrum_sepolia/arbitrum_sepolia.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/b3/b3.go b/blockchain/b3/b3.go index 3f6b68b..81b0d88 100644 --- a/blockchain/b3/b3.go +++ b/blockchain/b3/b3.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/b3_sepolia/b3_sepolia.go b/blockchain/b3_sepolia/b3_sepolia.go index 7e10d41..12df6fc 100644 --- a/blockchain/b3_sepolia/b3_sepolia.go +++ b/blockchain/b3_sepolia/b3_sepolia.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/ethereum/ethereum.go b/blockchain/ethereum/ethereum.go index dd16978..8b8fbfd 100644 --- a/blockchain/ethereum/ethereum.go +++ b/blockchain/ethereum/ethereum.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/game7/game7.go b/blockchain/game7/game7.go index 394512c..e2f4eea 100644 --- a/blockchain/game7/game7.go +++ b/blockchain/game7/game7.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/game7_orbit_arbitrum_sepolia/game7_orbit_arbitrum_sepolia.go b/blockchain/game7_orbit_arbitrum_sepolia/game7_orbit_arbitrum_sepolia.go index 037cd5e..8c74484 100644 --- a/blockchain/game7_orbit_arbitrum_sepolia/game7_orbit_arbitrum_sepolia.go +++ b/blockchain/game7_orbit_arbitrum_sepolia/game7_orbit_arbitrum_sepolia.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/game7_testnet/game7_testnet.go b/blockchain/game7_testnet/game7_testnet.go index aa1c22c..d2f98e8 100644 --- a/blockchain/game7_testnet/game7_testnet.go +++ b/blockchain/game7_testnet/game7_testnet.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/imx_zkevm/imx_zkevm.go b/blockchain/imx_zkevm/imx_zkevm.go index 8dd2f0c..f9b880c 100644 --- a/blockchain/imx_zkevm/imx_zkevm.go +++ b/blockchain/imx_zkevm/imx_zkevm.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/imx_zkevm_sepolia/imx_zkevm_sepolia.go b/blockchain/imx_zkevm_sepolia/imx_zkevm_sepolia.go index 03b2ae9..3a3af5a 100644 --- a/blockchain/imx_zkevm_sepolia/imx_zkevm_sepolia.go +++ b/blockchain/imx_zkevm_sepolia/imx_zkevm_sepolia.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/mantle/mantle.go b/blockchain/mantle/mantle.go index 8636609..1222076 100644 --- a/blockchain/mantle/mantle.go +++ b/blockchain/mantle/mantle.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/mantle_sepolia/mantle_sepolia.go b/blockchain/mantle_sepolia/mantle_sepolia.go index 19fab7d..d82d870 100644 --- a/blockchain/mantle_sepolia/mantle_sepolia.go +++ b/blockchain/mantle_sepolia/mantle_sepolia.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/polygon/polygon.go b/blockchain/polygon/polygon.go index 17b39d2..69ae2a1 100644 --- a/blockchain/polygon/polygon.go +++ b/blockchain/polygon/polygon.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/ronin/ronin.go b/blockchain/ronin/ronin.go index de79079..8899a56 100644 --- a/blockchain/ronin/ronin.go +++ b/blockchain/ronin/ronin.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/ronin_saigon/ronin_saigon.go b/blockchain/ronin_saigon/ronin_saigon.go index 55d0ddc..f45b56f 100644 --- a/blockchain/ronin_saigon/ronin_saigon.go +++ b/blockchain/ronin_saigon/ronin_saigon.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/sepolia/sepolia.go b/blockchain/sepolia/sepolia.go index 52e722a..895847a 100644 --- a/blockchain/sepolia/sepolia.go +++ b/blockchain/sepolia/sepolia.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/xai/xai.go b/blockchain/xai/xai.go index efc1753..b2dccf4 100644 --- a/blockchain/xai/xai.go +++ b/blockchain/xai/xai.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/xai_sepolia/xai_sepolia.go b/blockchain/xai_sepolia/xai_sepolia.go index b2b6537..d2b5518 100644 --- a/blockchain/xai_sepolia/xai_sepolia.go +++ b/blockchain/xai_sepolia/xai_sepolia.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } From 6479c5f4d41225f1b6d82d56342d5908c050b5df Mon Sep 17 00:00:00 2001 From: Andrey Date: Thu, 21 Nov 2024 12:25:14 +0200 Subject: [PATCH 11/12] Bump version. --- version/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version/version.go b/version/version.go index 686f6ff..3111c63 100644 --- a/version/version.go +++ b/version/version.go @@ -1,3 +1,3 @@ package version -var SeerVersion string = "0.3.11" +var SeerVersion string = "0.3.12" From 7f80736b39df7f9d489af379cf2a70e71e65ec75 Mon Sep 17 00:00:00 2001 From: Andrey Date: Thu, 21 Nov 2024 13:03:35 +0200 Subject: [PATCH 12/12] Decrease min block to sync. --- cmd.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd.go b/cmd.go index cd389f0..4dd3744 100644 --- a/cmd.go +++ b/cmd.go @@ -1079,7 +1079,7 @@ func CreateHistoricalSyncCommand() *cobra.Command { historicalSyncCmd.Flags().StringSliceVar(&addresses, "addresses", []string{}, "The list of addresses to sync") historicalSyncCmd.Flags().BoolVar(&auto, "auto", false, "Set this flag to sync all unfinished historical crawl from the database (default: false)") historicalSyncCmd.Flags().IntVar(&threads, "threads", 5, "Number of go-routines for concurrent crawling (default: 5)") - historicalSyncCmd.Flags().IntVar(&minBlocksToSync, "min-blocks-to-sync", 50, "The minimum number of blocks to sync before the synchronizer starts decoding") + historicalSyncCmd.Flags().IntVar(&minBlocksToSync, "min-blocks-to-sync", 10, "The minimum number of blocks to sync before the synchronizer starts decoding") return historicalSyncCmd }