Skip to content

Commit

Permalink
Merge pull request #103 from G7DAO/batch-reading
Browse files Browse the repository at this point in the history
Batch reading
  • Loading branch information
Andrei-Dolgolev authored Nov 21, 2024
2 parents 2548772 + 7f80736 commit 52e0735
Show file tree
Hide file tree
Showing 24 changed files with 563 additions and 244 deletions.
25 changes: 16 additions & 9 deletions blockchain/arbitrum_one/arbitrum_one.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,20 +225,20 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm
// FetchBlocksInRangeAsync fetches blocks within a specified range concurrently.
func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) {
var (
blocks []*seer_common.BlockJson

mu sync.Mutex
wg sync.WaitGroup
ctx = context.Background()
blocks []*seer_common.BlockJson
collectedErrors []error
mu sync.Mutex
wg sync.WaitGroup
ctx = context.Background()
)

var blockNumbersRange []*big.Int
for i := new(big.Int).Set(from); i.Cmp(to) <= 0; i.Add(i, big.NewInt(1)) {
blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i))
}

sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency
errChan := make(chan error, 1)
sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency
errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines

for _, b := range blockNumbersRange {
wg.Add(1)
Expand Down Expand Up @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque
close(sem)
close(errChan)

if err := <-errChan; err != nil {
return nil, err
for err := range errChan {
collectedErrors = append(collectedErrors, err)
}

if len(collectedErrors) > 0 {
var errStrings []string
for _, err := range collectedErrors {
errStrings = append(errStrings, err.Error())
}
return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; "))
}
return blocks, nil
}

Expand Down
25 changes: 16 additions & 9 deletions blockchain/arbitrum_sepolia/arbitrum_sepolia.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,20 +225,20 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm
// FetchBlocksInRangeAsync fetches blocks within a specified range concurrently.
func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) {
var (
blocks []*seer_common.BlockJson

mu sync.Mutex
wg sync.WaitGroup
ctx = context.Background()
blocks []*seer_common.BlockJson
collectedErrors []error
mu sync.Mutex
wg sync.WaitGroup
ctx = context.Background()
)

var blockNumbersRange []*big.Int
for i := new(big.Int).Set(from); i.Cmp(to) <= 0; i.Add(i, big.NewInt(1)) {
blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i))
}

sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency
errChan := make(chan error, 1)
sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency
errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines

for _, b := range blockNumbersRange {
wg.Add(1)
Expand Down Expand Up @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque
close(sem)
close(errChan)

if err := <-errChan; err != nil {
return nil, err
for err := range errChan {
collectedErrors = append(collectedErrors, err)
}

if len(collectedErrors) > 0 {
var errStrings []string
for _, err := range collectedErrors {
errStrings = append(errStrings, err.Error())
}
return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; "))
}
return blocks, nil
}

Expand Down
25 changes: 16 additions & 9 deletions blockchain/b3/b3.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,20 +225,20 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm
// FetchBlocksInRangeAsync fetches blocks within a specified range concurrently.
func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) {
var (
blocks []*seer_common.BlockJson

mu sync.Mutex
wg sync.WaitGroup
ctx = context.Background()
blocks []*seer_common.BlockJson
collectedErrors []error
mu sync.Mutex
wg sync.WaitGroup
ctx = context.Background()
)

var blockNumbersRange []*big.Int
for i := new(big.Int).Set(from); i.Cmp(to) <= 0; i.Add(i, big.NewInt(1)) {
blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i))
}

sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency
errChan := make(chan error, 1)
sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency
errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines

for _, b := range blockNumbersRange {
wg.Add(1)
Expand Down Expand Up @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque
close(sem)
close(errChan)

if err := <-errChan; err != nil {
return nil, err
for err := range errChan {
collectedErrors = append(collectedErrors, err)
}

if len(collectedErrors) > 0 {
var errStrings []string
for _, err := range collectedErrors {
errStrings = append(errStrings, err.Error())
}
return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; "))
}
return blocks, nil
}

Expand Down
25 changes: 16 additions & 9 deletions blockchain/b3_sepolia/b3_sepolia.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,20 +225,20 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm
// FetchBlocksInRangeAsync fetches blocks within a specified range concurrently.
func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) {
var (
blocks []*seer_common.BlockJson

mu sync.Mutex
wg sync.WaitGroup
ctx = context.Background()
blocks []*seer_common.BlockJson
collectedErrors []error
mu sync.Mutex
wg sync.WaitGroup
ctx = context.Background()
)

var blockNumbersRange []*big.Int
for i := new(big.Int).Set(from); i.Cmp(to) <= 0; i.Add(i, big.NewInt(1)) {
blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i))
}

sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency
errChan := make(chan error, 1)
sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency
errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines

for _, b := range blockNumbersRange {
wg.Add(1)
Expand Down Expand Up @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque
close(sem)
close(errChan)

if err := <-errChan; err != nil {
return nil, err
for err := range errChan {
collectedErrors = append(collectedErrors, err)
}

if len(collectedErrors) > 0 {
var errStrings []string
for _, err := range collectedErrors {
errStrings = append(errStrings, err.Error())
}
return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; "))
}
return blocks, nil
}

Expand Down
15 changes: 11 additions & 4 deletions blockchain/blockchain.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm
func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) {
var (
blocks []*seer_common.BlockJson

collectedErrors []error
mu sync.Mutex
wg sync.WaitGroup
ctx = context.Background()
Expand All @@ -238,7 +238,7 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque
}

sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency
errChan := make(chan error, 1)
errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines

for _, b := range blockNumbersRange {
wg.Add(1)
Expand Down Expand Up @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque
close(sem)
close(errChan)

if err := <-errChan; err != nil {
return nil, err
for err := range errChan {
collectedErrors = append(collectedErrors, err)
}

if len(collectedErrors) > 0 {
var errStrings []string
for _, err := range collectedErrors {
errStrings = append(errStrings, err.Error())
}
return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; "))
}
return blocks, nil
}

Expand Down
25 changes: 16 additions & 9 deletions blockchain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,20 +225,20 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm
// FetchBlocksInRangeAsync fetches blocks within a specified range concurrently.
func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) {
var (
blocks []*seer_common.BlockJson

mu sync.Mutex
wg sync.WaitGroup
ctx = context.Background()
blocks []*seer_common.BlockJson
collectedErrors []error
mu sync.Mutex
wg sync.WaitGroup
ctx = context.Background()
)

var blockNumbersRange []*big.Int
for i := new(big.Int).Set(from); i.Cmp(to) <= 0; i.Add(i, big.NewInt(1)) {
blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i))
}

sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency
errChan := make(chan error, 1)
sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency
errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines

for _, b := range blockNumbersRange {
wg.Add(1)
Expand Down Expand Up @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque
close(sem)
close(errChan)

if err := <-errChan; err != nil {
return nil, err
for err := range errChan {
collectedErrors = append(collectedErrors, err)
}

if len(collectedErrors) > 0 {
var errStrings []string
for _, err := range collectedErrors {
errStrings = append(errStrings, err.Error())
}
return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; "))
}
return blocks, nil
}

Expand Down
25 changes: 16 additions & 9 deletions blockchain/game7/game7.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,20 +225,20 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm
// FetchBlocksInRangeAsync fetches blocks within a specified range concurrently.
func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) {
var (
blocks []*seer_common.BlockJson

mu sync.Mutex
wg sync.WaitGroup
ctx = context.Background()
blocks []*seer_common.BlockJson
collectedErrors []error
mu sync.Mutex
wg sync.WaitGroup
ctx = context.Background()
)

var blockNumbersRange []*big.Int
for i := new(big.Int).Set(from); i.Cmp(to) <= 0; i.Add(i, big.NewInt(1)) {
blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i))
}

sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency
errChan := make(chan error, 1)
sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency
errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines

for _, b := range blockNumbersRange {
wg.Add(1)
Expand Down Expand Up @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque
close(sem)
close(errChan)

if err := <-errChan; err != nil {
return nil, err
for err := range errChan {
collectedErrors = append(collectedErrors, err)
}

if len(collectedErrors) > 0 {
var errStrings []string
for _, err := range collectedErrors {
errStrings = append(errStrings, err.Error())
}
return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; "))
}
return blocks, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,20 +225,20 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm
// FetchBlocksInRangeAsync fetches blocks within a specified range concurrently.
func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) {
var (
blocks []*seer_common.BlockJson

mu sync.Mutex
wg sync.WaitGroup
ctx = context.Background()
blocks []*seer_common.BlockJson
collectedErrors []error
mu sync.Mutex
wg sync.WaitGroup
ctx = context.Background()
)

var blockNumbersRange []*big.Int
for i := new(big.Int).Set(from); i.Cmp(to) <= 0; i.Add(i, big.NewInt(1)) {
blockNumbersRange = append(blockNumbersRange, new(big.Int).Set(i))
}

sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency
errChan := make(chan error, 1)
sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency
errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines

for _, b := range blockNumbersRange {
wg.Add(1)
Expand Down Expand Up @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque
close(sem)
close(errChan)

if err := <-errChan; err != nil {
return nil, err
for err := range errChan {
collectedErrors = append(collectedErrors, err)
}

if len(collectedErrors) > 0 {
var errStrings []string
for _, err := range collectedErrors {
errStrings = append(errStrings, err.Error())
}
return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; "))
}
return blocks, nil
}

Expand Down
Loading

0 comments on commit 52e0735

Please sign in to comment.