Skip to content

Commit

Permalink
Update clients gorutines.
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrey committed Nov 21, 2024
1 parent 047bc2b commit 56d386f
Show file tree
Hide file tree
Showing 18 changed files with 288 additions and 162 deletions.
25 changes: 16 additions & 9 deletions blockchain/arbitrum_one/arbitrum_one.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,20 +225,20 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm
// FetchBlocksInRangeAsync fetches blocks within a specified range concurrently.
func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) {
var (
blocks []*seer_common.BlockJson

mu sync.Mutex
wg sync.WaitGroup
ctx = context.Background()
blocks []*seer_common.BlockJson
collectedErrors []error
mu sync.Mutex
wg sync.WaitGroup
ctx = context.Background()
)

var blockNumbersRange []*big.Int
for i := new(big.Int).Set(from); i.Cmp(to) <= 0; i.Add(i, big.NewInt(1)) {
blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i))
}

sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency
errChan := make(chan error, 1)
sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency
errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines

for _, b := range blockNumbersRange {
wg.Add(1)
Expand Down Expand Up @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque
close(sem)
close(errChan)

if err := <-errChan; err != nil {
return nil, err
for err := range errChan {
collectedErrors = append(collectedErrors, err)
}

if len(collectedErrors) > 0 {
var errStrings []string
for _, err := range collectedErrors {
errStrings = append(errStrings, err.Error())
}
return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; "))
}
return blocks, nil
}

Expand Down
25 changes: 16 additions & 9 deletions blockchain/arbitrum_sepolia/arbitrum_sepolia.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,20 +225,20 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm
// FetchBlocksInRangeAsync fetches blocks within a specified range concurrently.
func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) {
var (
blocks []*seer_common.BlockJson

mu sync.Mutex
wg sync.WaitGroup
ctx = context.Background()
blocks []*seer_common.BlockJson
collectedErrors []error
mu sync.Mutex
wg sync.WaitGroup
ctx = context.Background()
)

var blockNumbersRange []*big.Int
for i := new(big.Int).Set(from); i.Cmp(to) <= 0; i.Add(i, big.NewInt(1)) {
blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i))
}

sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency
errChan := make(chan error, 1)
sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency
errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines

for _, b := range blockNumbersRange {
wg.Add(1)
Expand Down Expand Up @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque
close(sem)
close(errChan)

if err := <-errChan; err != nil {
return nil, err
for err := range errChan {
collectedErrors = append(collectedErrors, err)
}

if len(collectedErrors) > 0 {
var errStrings []string
for _, err := range collectedErrors {
errStrings = append(errStrings, err.Error())
}
return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; "))
}
return blocks, nil
}

Expand Down
25 changes: 16 additions & 9 deletions blockchain/b3/b3.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,20 +225,20 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm
// FetchBlocksInRangeAsync fetches blocks within a specified range concurrently.
func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) {
var (
blocks []*seer_common.BlockJson

mu sync.Mutex
wg sync.WaitGroup
ctx = context.Background()
blocks []*seer_common.BlockJson
collectedErrors []error
mu sync.Mutex
wg sync.WaitGroup
ctx = context.Background()
)

var blockNumbersRange []*big.Int
for i := new(big.Int).Set(from); i.Cmp(to) <= 0; i.Add(i, big.NewInt(1)) {
blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i))
}

sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency
errChan := make(chan error, 1)
sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency
errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines

for _, b := range blockNumbersRange {
wg.Add(1)
Expand Down Expand Up @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque
close(sem)
close(errChan)

if err := <-errChan; err != nil {
return nil, err
for err := range errChan {
collectedErrors = append(collectedErrors, err)
}

if len(collectedErrors) > 0 {
var errStrings []string
for _, err := range collectedErrors {
errStrings = append(errStrings, err.Error())
}
return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; "))
}
return blocks, nil
}

Expand Down
25 changes: 16 additions & 9 deletions blockchain/b3_sepolia/b3_sepolia.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,20 +225,20 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm
// FetchBlocksInRangeAsync fetches blocks within a specified range concurrently.
func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) {
var (
blocks []*seer_common.BlockJson

mu sync.Mutex
wg sync.WaitGroup
ctx = context.Background()
blocks []*seer_common.BlockJson
collectedErrors []error
mu sync.Mutex
wg sync.WaitGroup
ctx = context.Background()
)

var blockNumbersRange []*big.Int
for i := new(big.Int).Set(from); i.Cmp(to) <= 0; i.Add(i, big.NewInt(1)) {
blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i))
}

sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency
errChan := make(chan error, 1)
sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency
errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines

for _, b := range blockNumbersRange {
wg.Add(1)
Expand Down Expand Up @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque
close(sem)
close(errChan)

if err := <-errChan; err != nil {
return nil, err
for err := range errChan {
collectedErrors = append(collectedErrors, err)
}

if len(collectedErrors) > 0 {
var errStrings []string
for _, err := range collectedErrors {
errStrings = append(errStrings, err.Error())
}
return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; "))
}
return blocks, nil
}

Expand Down
25 changes: 16 additions & 9 deletions blockchain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,20 +225,20 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm
// FetchBlocksInRangeAsync fetches blocks within a specified range concurrently.
func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) {
var (
blocks []*seer_common.BlockJson

mu sync.Mutex
wg sync.WaitGroup
ctx = context.Background()
blocks []*seer_common.BlockJson
collectedErrors []error
mu sync.Mutex
wg sync.WaitGroup
ctx = context.Background()
)

var blockNumbersRange []*big.Int
for i := new(big.Int).Set(from); i.Cmp(to) <= 0; i.Add(i, big.NewInt(1)) {
blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i))
}

sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency
errChan := make(chan error, 1)
sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency
errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines

for _, b := range blockNumbersRange {
wg.Add(1)
Expand Down Expand Up @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque
close(sem)
close(errChan)

if err := <-errChan; err != nil {
return nil, err
for err := range errChan {
collectedErrors = append(collectedErrors, err)
}

if len(collectedErrors) > 0 {
var errStrings []string
for _, err := range collectedErrors {
errStrings = append(errStrings, err.Error())
}
return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; "))
}
return blocks, nil
}

Expand Down
25 changes: 16 additions & 9 deletions blockchain/game7/game7.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,20 +225,20 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm
// FetchBlocksInRangeAsync fetches blocks within a specified range concurrently.
func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) {
var (
blocks []*seer_common.BlockJson

mu sync.Mutex
wg sync.WaitGroup
ctx = context.Background()
blocks []*seer_common.BlockJson
collectedErrors []error
mu sync.Mutex
wg sync.WaitGroup
ctx = context.Background()
)

var blockNumbersRange []*big.Int
for i := new(big.Int).Set(from); i.Cmp(to) <= 0; i.Add(i, big.NewInt(1)) {
blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i))
}

sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency
errChan := make(chan error, 1)
sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency
errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines

for _, b := range blockNumbersRange {
wg.Add(1)
Expand Down Expand Up @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque
close(sem)
close(errChan)

if err := <-errChan; err != nil {
return nil, err
for err := range errChan {
collectedErrors = append(collectedErrors, err)
}

if len(collectedErrors) > 0 {
var errStrings []string
for _, err := range collectedErrors {
errStrings = append(errStrings, err.Error())
}
return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; "))
}
return blocks, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,20 +225,20 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm
// FetchBlocksInRangeAsync fetches blocks within a specified range concurrently.
func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) {
var (
blocks []*seer_common.BlockJson

mu sync.Mutex
wg sync.WaitGroup
ctx = context.Background()
blocks []*seer_common.BlockJson
collectedErrors []error
mu sync.Mutex
wg sync.WaitGroup
ctx = context.Background()
)

var blockNumbersRange []*big.Int
for i := new(big.Int).Set(from); i.Cmp(to) <= 0; i.Add(i, big.NewInt(1)) {
blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i))
}

sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency
errChan := make(chan error, 1)
sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency
errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines

for _, b := range blockNumbersRange {
wg.Add(1)
Expand Down Expand Up @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque
close(sem)
close(errChan)

if err := <-errChan; err != nil {
return nil, err
for err := range errChan {
collectedErrors = append(collectedErrors, err)
}

if len(collectedErrors) > 0 {
var errStrings []string
for _, err := range collectedErrors {
errStrings = append(errStrings, err.Error())
}
return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; "))
}
return blocks, nil
}

Expand Down
25 changes: 16 additions & 9 deletions blockchain/game7_testnet/game7_testnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,20 +225,20 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm
// FetchBlocksInRangeAsync fetches blocks within a specified range concurrently.
func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) {
var (
blocks []*seer_common.BlockJson

mu sync.Mutex
wg sync.WaitGroup
ctx = context.Background()
blocks []*seer_common.BlockJson
collectedErrors []error
mu sync.Mutex
wg sync.WaitGroup
ctx = context.Background()
)

var blockNumbersRange []*big.Int
for i := new(big.Int).Set(from); i.Cmp(to) <= 0; i.Add(i, big.NewInt(1)) {
blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i))
}

sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency
errChan := make(chan error, 1)
sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency
errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines

for _, b := range blockNumbersRange {
wg.Add(1)
Expand Down Expand Up @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque
close(sem)
close(errChan)

if err := <-errChan; err != nil {
return nil, err
for err := range errChan {
collectedErrors = append(collectedErrors, err)
}

if len(collectedErrors) > 0 {
var errStrings []string
for _, err := range collectedErrors {
errStrings = append(errStrings, err.Error())
}
return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; "))
}
return blocks, nil
}

Expand Down
Loading

0 comments on commit 56d386f

Please sign in to comment.