diff --git a/internal/orchestrator/poller.go b/internal/orchestrator/poller.go index 25ae3bb..031dae2 100644 --- a/internal/orchestrator/poller.go +++ b/internal/orchestrator/poller.go @@ -92,17 +92,17 @@ func (p *Poller) Start() { endBlock := blockNumbers[len(blockNumbers)-1] if endBlock != nil { p.lastPolledBlock = endBlock - if p.reachedPollLimit() { - log.Debug().Msg("Reached poll limit, exiting poller") - ticker.Stop() - return - } } log.Debug().Msgf("Polling %d blocks starting from %s to %s", len(blockNumbers), blockNumbers[0], endBlock) worker := worker.NewWorker(p.rpc) results := worker.Run(blockNumbers) p.handleWorkerResults(results) + if p.reachedPollLimit(endBlock) { + log.Debug().Msg("Reached poll limit, exiting poller") + ticker.Stop() + return + } } }() } @@ -115,28 +115,26 @@ func (p *Poller) Start() { select {} } -func (p *Poller) reachedPollLimit() bool { - return p.pollUntilBlock != nil && p.pollUntilBlock.Sign() > 0 && p.lastPolledBlock.Cmp(p.pollUntilBlock) >= 0 +func (p *Poller) reachedPollLimit(blockNumber *big.Int) bool { + return p.pollUntilBlock.Sign() > 0 && blockNumber.Cmp(p.pollUntilBlock) >= 0 } func (p *Poller) getBlockRange() ([]*big.Int, error) { - latestBlockUint64, err := p.rpc.EthClient.BlockNumber(context.Background()) + latestBlock, err := p.getLatestBlockNumber() if err != nil { - return nil, fmt.Errorf("failed to get latest block number: %v", err) + return nil, err } - latestBlock := new(big.Int).SetUint64(latestBlockUint64) - log.Debug().Msgf("Last polled block: %s", p.lastPolledBlock.String()) startBlock := new(big.Int).Add(p.lastPolledBlock, big.NewInt(1)) - endBlock := new(big.Int).Add(startBlock, big.NewInt(p.blocksPerPoll-1)) - if startBlock.Cmp(latestBlock) > 0 { log.Debug().Msgf("Start block %s is greater than latest block %s, skipping", startBlock, latestBlock) return nil, nil } - if endBlock.Cmp(latestBlock) > 0 { - endBlock = latestBlock + endBlock := p.getEndBlockForRange(startBlock, latestBlock) + if startBlock.Cmp(endBlock) > 0 { + log.Debug().Msgf("Invalid range: start block %s is greater than end block %s, skipping", startBlock, endBlock) + return nil, nil } blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1 @@ -148,6 +146,26 @@ func (p *Poller) getBlockRange() ([]*big.Int, error) { return blockNumbers, nil } +func (p *Poller) getLatestBlockNumber() (*big.Int, error) { + latestBlockUint64, err := p.rpc.EthClient.BlockNumber(context.Background()) + if err != nil { + return nil, fmt.Errorf("failed to get latest block number: %v", err) + } + return new(big.Int).SetUint64(latestBlockUint64), nil +} + +func (p *Poller) getEndBlockForRange(startBlock *big.Int, latestBlock *big.Int) *big.Int { + endBlock := new(big.Int).Add(startBlock, big.NewInt(p.blocksPerPoll-1)) + if endBlock.Cmp(latestBlock) > 0 { + endBlock = latestBlock + } + if p.reachedPollLimit(endBlock) { + log.Debug().Msgf("End block %s is greater than poll until block %s, setting to poll until block", endBlock, p.pollUntilBlock) + endBlock = p.pollUntilBlock + } + return endBlock +} + func (p *Poller) handleWorkerResults(results []worker.WorkerResult) { var successfulResults []worker.WorkerResult var failedResults []worker.WorkerResult