diff --git a/blockchain/arbitrum_one/arbitrum_one.go b/blockchain/arbitrum_one/arbitrum_one.go index 3b5194b..ed36ee7 100644 --- a/blockchain/arbitrum_one/arbitrum_one.go +++ b/blockchain/arbitrum_one/arbitrum_one.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/arbitrum_sepolia/arbitrum_sepolia.go b/blockchain/arbitrum_sepolia/arbitrum_sepolia.go index e33e13b..eb23bee 100644 --- a/blockchain/arbitrum_sepolia/arbitrum_sepolia.go +++ b/blockchain/arbitrum_sepolia/arbitrum_sepolia.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/b3/b3.go b/blockchain/b3/b3.go index 3f6b68b..81b0d88 100644 --- a/blockchain/b3/b3.go +++ b/blockchain/b3/b3.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/b3_sepolia/b3_sepolia.go b/blockchain/b3_sepolia/b3_sepolia.go index 7e10d41..12df6fc 100644 --- a/blockchain/b3_sepolia/b3_sepolia.go +++ b/blockchain/b3_sepolia/b3_sepolia.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/blockchain.go.tmpl b/blockchain/blockchain.go.tmpl index 8b0a4f1..e2dfcff 100644 --- a/blockchain/blockchain.go.tmpl +++ b/blockchain/blockchain.go.tmpl @@ -226,7 +226,7 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( blocks []*seer_common.BlockJson - + collectedErrors []error mu sync.Mutex wg sync.WaitGroup ctx = context.Background() @@ -238,7 +238,7 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque } sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/ethereum/ethereum.go b/blockchain/ethereum/ethereum.go index dd16978..8b8fbfd 100644 --- a/blockchain/ethereum/ethereum.go +++ b/blockchain/ethereum/ethereum.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/game7/game7.go b/blockchain/game7/game7.go index 394512c..e2f4eea 100644 --- a/blockchain/game7/game7.go +++ b/blockchain/game7/game7.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/game7_orbit_arbitrum_sepolia/game7_orbit_arbitrum_sepolia.go b/blockchain/game7_orbit_arbitrum_sepolia/game7_orbit_arbitrum_sepolia.go index 037cd5e..8c74484 100644 --- a/blockchain/game7_orbit_arbitrum_sepolia/game7_orbit_arbitrum_sepolia.go +++ b/blockchain/game7_orbit_arbitrum_sepolia/game7_orbit_arbitrum_sepolia.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/game7_testnet/game7_testnet.go b/blockchain/game7_testnet/game7_testnet.go index aa1c22c..d2f98e8 100644 --- a/blockchain/game7_testnet/game7_testnet.go +++ b/blockchain/game7_testnet/game7_testnet.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/imx_zkevm/imx_zkevm.go b/blockchain/imx_zkevm/imx_zkevm.go index 8dd2f0c..f9b880c 100644 --- a/blockchain/imx_zkevm/imx_zkevm.go +++ b/blockchain/imx_zkevm/imx_zkevm.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/imx_zkevm_sepolia/imx_zkevm_sepolia.go b/blockchain/imx_zkevm_sepolia/imx_zkevm_sepolia.go index 03b2ae9..3a3af5a 100644 --- a/blockchain/imx_zkevm_sepolia/imx_zkevm_sepolia.go +++ b/blockchain/imx_zkevm_sepolia/imx_zkevm_sepolia.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/mantle/mantle.go b/blockchain/mantle/mantle.go index 8636609..1222076 100644 --- a/blockchain/mantle/mantle.go +++ b/blockchain/mantle/mantle.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/mantle_sepolia/mantle_sepolia.go b/blockchain/mantle_sepolia/mantle_sepolia.go index 19fab7d..d82d870 100644 --- a/blockchain/mantle_sepolia/mantle_sepolia.go +++ b/blockchain/mantle_sepolia/mantle_sepolia.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/polygon/polygon.go b/blockchain/polygon/polygon.go index 17b39d2..69ae2a1 100644 --- a/blockchain/polygon/polygon.go +++ b/blockchain/polygon/polygon.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/ronin/ronin.go b/blockchain/ronin/ronin.go index de79079..8899a56 100644 --- a/blockchain/ronin/ronin.go +++ b/blockchain/ronin/ronin.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/ronin_saigon/ronin_saigon.go b/blockchain/ronin_saigon/ronin_saigon.go index 55d0ddc..f45b56f 100644 --- a/blockchain/ronin_saigon/ronin_saigon.go +++ b/blockchain/ronin_saigon/ronin_saigon.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/sepolia/sepolia.go b/blockchain/sepolia/sepolia.go index 52e722a..895847a 100644 --- a/blockchain/sepolia/sepolia.go +++ b/blockchain/sepolia/sepolia.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/xai/xai.go b/blockchain/xai/xai.go index efc1753..b2dccf4 100644 --- a/blockchain/xai/xai.go +++ b/blockchain/xai/xai.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/blockchain/xai_sepolia/xai_sepolia.go b/blockchain/xai_sepolia/xai_sepolia.go index b2b6537..d2b5518 100644 --- a/blockchain/xai_sepolia/xai_sepolia.go +++ b/blockchain/xai_sepolia/xai_sepolia.go @@ -225,11 +225,11 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm // FetchBlocksInRangeAsync fetches blocks within a specified range concurrently. func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) { var ( - blocks []*seer_common.BlockJson - - mu sync.Mutex - wg sync.WaitGroup - ctx = context.Background() + blocks []*seer_common.BlockJson + collectedErrors []error + mu sync.Mutex + wg sync.WaitGroup + ctx = context.Background() ) var blockNumbersRange []*big.Int @@ -237,8 +237,8 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i)) } - sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency - errChan := make(chan error, 1) + sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency + errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines for _, b := range blockNumbersRange { wg.Add(1) @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque close(sem) close(errChan) - if err := <-errChan; err != nil { - return nil, err + for err := range errChan { + collectedErrors = append(collectedErrors, err) } + if len(collectedErrors) > 0 { + var errStrings []string + for _, err := range collectedErrors { + errStrings = append(errStrings, err.Error()) + } + return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; ")) + } return blocks, nil } diff --git a/cmd.go b/cmd.go index badd1e5..4dd3744 100644 --- a/cmd.go +++ b/cmd.go @@ -296,7 +296,7 @@ func CreateCrawlerCommand() *cobra.Command { func CreateSynchronizerCommand() *cobra.Command { var startBlock, endBlock, batchSize uint64 - var timeout, threads, cycleTickerWaitTime int + var timeout, threads, cycleTickerWaitTime, minBlocksToSync int var chain, baseDir, customerDbUriFlag string synchronizerCmd := &cobra.Command{ @@ -337,14 +337,14 @@ func CreateSynchronizerCommand() *cobra.Command { RunE: func(cmd *cobra.Command, args []string) error { indexer.InitDBConnection() - newSynchronizer, synchonizerErr := synchronizer.NewSynchronizer(chain, baseDir, startBlock, endBlock, batchSize, timeout, threads) + newSynchronizer, synchonizerErr := synchronizer.NewSynchronizer(chain, baseDir, startBlock, endBlock, batchSize, timeout, threads, minBlocksToSync) if synchonizerErr != nil { return synchonizerErr } latestBlockNumber, latestErr := newSynchronizer.Client.GetLatestBlockNumber() if latestErr != nil { - return fmt.Errorf("Failed to get latest block number: %v", latestErr) + return fmt.Errorf("failed to get latest block number: %v", latestErr) } if startBlock > latestBlockNumber.Uint64() { @@ -368,6 +368,7 @@ func CreateSynchronizerCommand() *cobra.Command { synchronizerCmd.Flags().StringVar(&customerDbUriFlag, "customer-db-uri", "", "Set customer database URI for development. This workflow bypass fetching customer IDs and its database URL connection strings from mdb-v3-controller API") synchronizerCmd.Flags().IntVar(&threads, "threads", 5, "Number of go-routines for concurrent decoding") synchronizerCmd.Flags().IntVar(&cycleTickerWaitTime, "cycle-ticker-wait-time", 10, "The wait time for the synchronizer in seconds before it try to start new cycle") + synchronizerCmd.Flags().IntVar(&minBlocksToSync, "min-blocks-to-sync", 10, "The minimum number of blocks to sync before the synchronizer starts decoding") return synchronizerCmd } @@ -1011,7 +1012,7 @@ func CreateHistoricalSyncCommand() *cobra.Command { var chain, baseDir, customerDbUriFlag string var addresses, customerIds []string var startBlock, endBlock, batchSize uint64 - var timeout, threads int + var timeout, threads, minBlocksToSync int var auto bool historicalSyncCmd := &cobra.Command{ @@ -1038,6 +1039,11 @@ func CreateHistoricalSyncCommand() *cobra.Command { return syncErr } + blockchainErr := seer_blockchain.CheckVariablesForBlockchains() + if blockchainErr != nil { + return blockchainErr + } + if chain == "" { return fmt.Errorf("blockchain is required via --chain") } @@ -1047,7 +1053,7 @@ func CreateHistoricalSyncCommand() *cobra.Command { RunE: func(cmd *cobra.Command, args []string) error { indexer.InitDBConnection() - newSynchronizer, synchonizerErr := synchronizer.NewSynchronizer(chain, baseDir, startBlock, endBlock, batchSize, timeout, threads) + newSynchronizer, synchonizerErr := synchronizer.NewSynchronizer(chain, baseDir, startBlock, endBlock, batchSize, timeout, threads, minBlocksToSync) if synchonizerErr != nil { return synchonizerErr } @@ -1073,6 +1079,7 @@ func CreateHistoricalSyncCommand() *cobra.Command { historicalSyncCmd.Flags().StringSliceVar(&addresses, "addresses", []string{}, "The list of addresses to sync") historicalSyncCmd.Flags().BoolVar(&auto, "auto", false, "Set this flag to sync all unfinished historical crawl from the database (default: false)") historicalSyncCmd.Flags().IntVar(&threads, "threads", 5, "Number of go-routines for concurrent crawling (default: 5)") + historicalSyncCmd.Flags().IntVar(&minBlocksToSync, "min-blocks-to-sync", 10, "The minimum number of blocks to sync before the synchronizer starts decoding") return historicalSyncCmd } diff --git a/indexer/db.go b/indexer/db.go index 20bac76..474065d 100644 --- a/indexer/db.go +++ b/indexer/db.go @@ -551,14 +551,16 @@ func (p *PostgreSQLpgx) GetCustomersIDs(blockchain string) ([]string, error) { return customerIds, nil } -func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, customerIds []string) (uint64, uint64, string, []CustomerUpdates, error) { +func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, customerIds []string, minBlocksToSync int) (uint64, uint64, []string, []CustomerUpdates, error) { pool := p.GetPool() conn, err := pool.Acquire(context.Background()) + var paths []string + if err != nil { - return 0, 0, "", nil, err + return 0, 0, paths, nil, err } defer conn.Release() @@ -567,20 +569,20 @@ func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, custome query := fmt.Sprintf(`WITH path as ( SELECT - path + path, + block_number from %s WHERE - block_number = $1 + block_number >= $1 and block_number <= $1 + $3 ), latest_block_of_path as ( SELECT - block_number as block_number, - path as path + block_number as latest_block_number from %s WHERE - path = (SELECT path from path) + path = (SELECT path FROM path ORDER BY block_number DESC LIMIT 1) order by block_number desc limit 1 ), @@ -627,29 +629,28 @@ func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, custome customer_id ) SELECT - block_number, - path, + latest_block_number, + (SELECT array_agg(path) FROM path) as paths, (SELECT json_agg(json_build_object(customer_id, abis)) FROM reformatted_jobs) as jobs FROM latest_block_of_path `, blocksTableName, blocksTableName) - rows, err := conn.Query(context.Background(), query, fromBlock, blockchain) + rows, err := conn.Query(context.Background(), query, fromBlock, blockchain, minBlocksToSync) if err != nil { log.Println("Error querying abi jobs from database", err) - return 0, 0, "", nil, err + return 0, 0, paths, nil, err } var customers []map[string]map[string]map[string]*AbiEntry - var path string var firstBlockNumber, lastBlockNumber uint64 for rows.Next() { - err = rows.Scan(&lastBlockNumber, &path, &customers) + err = rows.Scan(&lastBlockNumber, &paths, &customers) if err != nil { log.Println("Error scanning row:", err) - return 0, 0, "", nil, err + return 0, 0, paths, nil, err } } @@ -668,7 +669,7 @@ func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, custome } - return firstBlockNumber, lastBlockNumber, path, customerUpdates, nil + return firstBlockNumber, lastBlockNumber, paths, customerUpdates, nil } @@ -803,19 +804,24 @@ func (p *PostgreSQLpgx) WriteLabes( } defer func() { - if err := recover(); err != nil { + if pErr := recover(); pErr != nil { tx.Rollback(context.Background()) - panic(err) + panic(pErr) } else if err != nil { tx.Rollback(context.Background()) } else { err = tx.Commit(context.Background()) + if err != nil { + log.Println("Error committing transaction:", err) + panic(err) + } } }() if len(transactions) > 0 { err := p.WriteTransactions(tx, blockchain, transactions) if err != nil { + log.Println("Error writing transactions:", err) return err } } @@ -823,11 +829,12 @@ func (p *PostgreSQLpgx) WriteLabes( if len(events) > 0 { err := p.WriteEvents(tx, blockchain, events) if err != nil { + log.Println("Error writing events:", err) return err } } - return nil + return err } func (p *PostgreSQLpgx) WriteEvents(tx pgx.Tx, blockchain string, events []EventLabel) error { @@ -1418,6 +1425,70 @@ func (p *PostgreSQLpgx) FindBatchPath(blockchain string, blockNumber uint64) (st } +func (p *PostgreSQLpgx) RetrievePathsAndBlockBounds(blockchain string, blockNumber uint64, minBlocksToSync int) ([]string, uint64, uint64, error) { + pool := p.GetPool() + + conn, err := pool.Acquire(context.Background()) + + if err != nil { + return nil, 0, 0, err + } + + defer conn.Release() + + var paths []string + + var minBlockNumber uint64 + + var maxBlockNumber uint64 + + query := fmt.Sprintf(`WITH path as ( + SELECT + path, + block_number + from + %s + WHERE + block_number >= $2 and block_number <= $1 + ), latest_block_of_path as ( + SELECT + block_number as latest_block_number + from + %s + WHERE + path = (SELECT path FROM path ORDER BY block_number DESC LIMIT 1) + order by block_number desc + limit 1 + ), earliest_block_of_path as ( + SELECT + block_number as first_block_number + from + %s + WHERE + path = (SELECT path FROM path ORDER BY block_number ASC LIMIT 1) + order by block_number asc + limit 1 + ) + select ARRAY_AGG(path) as paths, (SELECT first_block_number FROM earliest_block_of_path) as min_block_number, (SELECT latest_block_number FROM latest_block_of_path) as max_block_number from path + `, BlocksTableName(blockchain), BlocksTableName(blockchain), BlocksTableName(blockchain)) + + err = conn.QueryRow(context.Background(), query, blockNumber, blockNumber-uint64(minBlocksToSync)).Scan(&paths, &minBlockNumber, &maxBlockNumber) + + if err != nil { + if err == pgx.ErrNoRows { + // Blocks not indexed yet + return nil, 0, 0, nil + } + return nil, + 0, + 0, + err + } + + return paths, minBlockNumber, maxBlockNumber, nil + +} + func (p *PostgreSQLpgx) GetAbiJobsWithoutDeployBlocks(blockchain string) (map[string]map[string][]string, error) { pool := p.GetPool() diff --git a/storage/storage.go b/storage/storage.go index a7874f5..4933a90 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -3,6 +3,9 @@ package storage import ( "bytes" "context" + "fmt" + "strings" + "sync" ) type ListReturnFunc func(any) string @@ -19,3 +22,65 @@ type ReadItem struct { Key string RowIds []uint64 } + +func ReadFiles(keys []string, storageInstance Storer) ([]bytes.Buffer, error) { + var result []bytes.Buffer + + for _, key := range keys { + buf, err := storageInstance.Read(key) + if err != nil { + return nil, fmt.Errorf("failed to read object from bucket %s: %v", key, err) + } + + result = append(result, buf) + } + + return result, nil +} + +func ReadFilesAsync(keys []string, threads int, storageInstance Storer) ([]bytes.Buffer, error) { + var result []bytes.Buffer + var mu sync.Mutex + var wg sync.WaitGroup + errChan := make(chan error, len(keys)) + + // Semaphore to limit the number of concurrent reads + sem := make(chan struct{}, threads) + + for _, key := range keys { + wg.Add(1) + sem <- struct{}{} + go func(k string) { + defer func() { + <-sem + wg.Done() + }() + + buf, err := storageInstance.Read(k) + if err != nil { + errChan <- fmt.Errorf("failed to read object from bucket %s: %v", k, err) + return + } + + mu.Lock() + result = append(result, buf) + mu.Unlock() + }(key) + } + + // Wait for all goroutines to finish + wg.Wait() + close(errChan) + + // Check if any errors occurred + if len(errChan) > 0 { + var errMsgs []string + for err := range errChan { + errMsgs = append(errMsgs, err.Error()) + } + return result, fmt.Errorf("errors occurred during file reads:\n%s", + strings.Join(errMsgs, "\n")) + } + + return result, nil +} diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 8905db8..17d0ea8 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -25,17 +25,18 @@ type Synchronizer struct { Client seer_blockchain.BlockchainClient StorageInstance storage.Storer - blockchain string - startBlock uint64 - endBlock uint64 - batchSize uint64 - baseDir string - basePath string - threads int + blockchain string + startBlock uint64 + endBlock uint64 + batchSize uint64 + baseDir string + basePath string + threads int + minBlocksToSync int } // NewSynchronizer creates a new synchronizer instance with the given blockchain handler. -func NewSynchronizer(blockchain, baseDir string, startBlock, endBlock, batchSize uint64, timeout int, threads int) (*Synchronizer, error) { +func NewSynchronizer(blockchain, baseDir string, startBlock, endBlock, batchSize uint64, timeout int, threads int, minBlocksToSync int) (*Synchronizer, error) { var synchronizer Synchronizer basePath := filepath.Join(baseDir, crawler.SeerCrawlerStoragePrefix, "data", blockchain) @@ -61,13 +62,14 @@ func NewSynchronizer(blockchain, baseDir string, startBlock, endBlock, batchSize Client: client, StorageInstance: storageInstance, - blockchain: blockchain, - startBlock: startBlock, - endBlock: endBlock, - batchSize: batchSize, - baseDir: baseDir, - basePath: basePath, - threads: threads, + blockchain: blockchain, + startBlock: startBlock, + endBlock: endBlock, + batchSize: batchSize, + baseDir: baseDir, + basePath: basePath, + threads: threads, + minBlocksToSync: minBlocksToSync, } return &synchronizer, nil @@ -452,7 +454,7 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) { // Read updates from the indexer db // This function will return a list of customer updates 1 update is 1 customer - _, lastBlockOfChank, path, updates, err := indexer.DBConnection.ReadUpdates(d.blockchain, d.startBlock, customerIds) + _, lastBlockOfChank, paths, updates, err := indexer.DBConnection.ReadUpdates(d.blockchain, d.startBlock, customerIds, d.minBlocksToSync) if err != nil { return isEnd, fmt.Errorf("error reading updates: %w", err) } @@ -463,22 +465,30 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) { } if crawler.SEER_CRAWLER_DEBUG { - log.Printf("Read batch key: %s", path) + log.Printf("Read batch key: %s", paths) } log.Println("Last block of current chank: ", lastBlockOfChank) // Read the raw data from the storage for current path - rawData, readErr := d.StorageInstance.Read(path) + rawData, readErr := storage.ReadFilesAsync(paths, d.threads, d.StorageInstance) if readErr != nil { return isEnd, fmt.Errorf("error reading raw data: %w", readErr) } log.Printf("Read %d users updates from the indexer db in range of blocks %d-%d\n", len(updates), d.startBlock, lastBlockOfChank) + // Process customer updates in parallel var wg sync.WaitGroup - sem := make(chan struct{}, 5) // Semaphore to control concurrency - errChan := make(chan error, 1) // Buffered channel for error handling + // count the number of goroutines that will be running + var totalGoroutines int + for _, update := range updates { + totalGoroutines += len(customerDBConnections[update.CustomerID]) + } + + sem := make(chan struct{}, d.threads) // Semaphore to control concurrency + errChan := make(chan error, totalGoroutines) // Channel to collect errors from goroutines + var errs []error for _, update := range updates { for instanceId := range customerDBConnections[update.CustomerID] { wg.Add(1) @@ -487,13 +497,19 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) { } wg.Wait() - - close(sem) close(errChan) // Close the channel to signal that all goroutines have finished - // Check for errors from goroutines - if err := <-errChan; err != nil { - return isEnd, fmt.Errorf("error processing customer updates: %w", err) + // Check if there were any errors + for err := range errChan { + errs = append(errs, err) + } + + if len(errs) > 0 { + var errMsg string + for _, e := range errs { + errMsg += e.Error() + "\n" + } + return isEnd, fmt.Errorf("errors processing customer updates:\n%s", errMsg) } d.startBlock = lastBlockOfChank + 1 @@ -635,16 +651,16 @@ func (d *Synchronizer) HistoricalSyncRef(customerDbUriFlag string, addresses []s } // Determine the processing strategy (RPC or storage) - var path string + var paths []string var firstBlockOfChunk uint64 for { - path, firstBlockOfChunk, _, err = indexer.DBConnection.FindBatchPath(d.blockchain, d.startBlock) + paths, firstBlockOfChunk, _, err = indexer.DBConnection.RetrievePathsAndBlockBounds(d.blockchain, d.startBlock, d.minBlocksToSync) if err != nil { return fmt.Errorf("error finding batch path: %w", err) } - if path != "" { + if paths != nil { d.endBlock = firstBlockOfChunk break } @@ -654,8 +670,8 @@ func (d *Synchronizer) HistoricalSyncRef(customerDbUriFlag string, addresses []s } // Read raw data from storage or via RPC - var rawData bytes.Buffer - rawData, err = d.StorageInstance.Read(path) + var rawData []bytes.Buffer + rawData, err = storage.ReadFilesAsync(paths, d.threads, d.StorageInstance) if err != nil { return fmt.Errorf("error reading events from storage: %w", err) } @@ -664,8 +680,17 @@ func (d *Synchronizer) HistoricalSyncRef(customerDbUriFlag string, addresses []s // Process customer updates in parallel var wg sync.WaitGroup - sem := make(chan struct{}, d.threads) // Semaphore to control concurrency - errChan := make(chan error, 1) // Buffered channel for error handling + + // count the number of goroutines that will be running + var totalGoroutines int + for _, update := range customerUpdates { + totalGoroutines += len(customerDBConnections[update.CustomerID]) + } + + sem := make(chan struct{}, d.threads) // Semaphore to control concurrency + errChan := make(chan error, totalGoroutines) // Channel to collect errors from goroutines + + var errs []error for _, update := range customerUpdates { @@ -677,19 +702,25 @@ func (d *Synchronizer) HistoricalSyncRef(customerDbUriFlag string, addresses []s } wg.Wait() - close(sem) close(errChan) // Close the channel to signal that all goroutines have finished - // Check for errors from goroutines - select { - case err := <-errChan: - log.Printf("Error processing customer updates: %v", err) - return err - default: + // Check if there were any errors + for err := range errChan { + errs = append(errs, err) + } + + if len(errs) > 0 { + var errMsg string + for _, e := range errs { + errMsg += e.Error() + "\n" + } + return fmt.Errorf("errors processing customer updates:\n%s", errMsg) } d.startBlock = d.endBlock - 1 + fmt.Printf("Processed %d customer updates for block range %d-%d\n", len(customerUpdates), d.startBlock, d.endBlock) + if isCycleFinished || d.startBlock == 0 { if autoJobs { for address, abisInfo := range addressesAbisInfo { @@ -713,7 +744,7 @@ func (d *Synchronizer) HistoricalSyncRef(customerDbUriFlag string, addresses []s func (d *Synchronizer) processProtoCustomerUpdate( update indexer.CustomerUpdates, - rawData bytes.Buffer, + rawDataList []bytes.Buffer, customerDBConnections map[string]map[int]CustomerDBConnection, id int, sem chan struct{}, @@ -723,37 +754,49 @@ func (d *Synchronizer) processProtoCustomerUpdate( // Decode input raw proto data using ABIs // Write decoded data to the user Database - defer wg.Done() - sem <- struct{}{} // Acquire semaphore + defer func() { + if r := recover(); r != nil { + errChan <- fmt.Errorf("panic in goroutine for customer %s, instance %d: %v", update.CustomerID, id, r) + } + wg.Done() + }() + + sem <- struct{}{} // Acquire semaphore + defer func() { <-sem }() // Release semaphore customer, exists := customerDBConnections[update.CustomerID][id] if !exists { errChan <- fmt.Errorf("no DB connection for customer %s", update.CustomerID) - <-sem // Release semaphore return } conn, err := customer.Pgx.GetPool().Acquire(context.Background()) if err != nil { errChan <- fmt.Errorf("error acquiring connection for customer %s: %w", update.CustomerID, err) - <-sem // Release semaphore return } defer conn.Release() - decodedEvents, decodedTransactions, err := d.Client.DecodeProtoEntireBlockToLabels(&rawData, update.Abis, d.threads) - if err != nil { - errChan <- fmt.Errorf("error %s: %w", update.CustomerID, err) - <-sem // Release semaphore - return - } - err = customer.Pgx.WriteLabes(d.blockchain, decodedTransactions, decodedEvents) + var listDecodedEvents []indexer.EventLabel + var listDecodedTransactions []indexer.TransactionLabel + + for _, rawData := range rawDataList { + // Decode the raw data to transactions + decodedEvents, decodedTransactions, err := d.Client.DecodeProtoEntireBlockToLabels(&rawData, update.Abis, d.threads) + + listDecodedEvents = append(listDecodedEvents, decodedEvents...) + listDecodedTransactions = append(listDecodedTransactions, decodedTransactions...) + + if err != nil { + errChan <- fmt.Errorf("error decoding data for customer %s: %w", update.CustomerID, err) + return + } + + } + err = customer.Pgx.WriteLabes(d.blockchain, listDecodedTransactions, listDecodedEvents) if err != nil { errChan <- fmt.Errorf("error writing labels for customer %s: %w", update.CustomerID, err) - <-sem // Release semaphore return } - - <-sem // Release semaphore } diff --git a/version/version.go b/version/version.go index 686f6ff..3111c63 100644 --- a/version/version.go +++ b/version/version.go @@ -1,3 +1,3 @@ package version -var SeerVersion string = "0.3.11" +var SeerVersion string = "0.3.12"