Skip to content

Commit

Permalink
fix poller leaving gaps if until block is configured
Browse files Browse the repository at this point in the history
  • Loading branch information
iuwqyir committed Oct 2, 2024
1 parent 621d0c1 commit d75b7fd
Showing 1 changed file with 33 additions and 15 deletions.
48 changes: 33 additions & 15 deletions internal/orchestrator/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,17 +92,17 @@ func (p *Poller) Start() {
endBlock := blockNumbers[len(blockNumbers)-1]
if endBlock != nil {
p.lastPolledBlock = endBlock
if p.reachedPollLimit() {
log.Debug().Msg("Reached poll limit, exiting poller")
ticker.Stop()
return
}
}
log.Debug().Msgf("Polling %d blocks starting from %s to %s", len(blockNumbers), blockNumbers[0], endBlock)

worker := worker.NewWorker(p.rpc)
results := worker.Run(blockNumbers)
p.handleWorkerResults(results)
if p.reachedPollLimit(endBlock) {
log.Debug().Msg("Reached poll limit, exiting poller")
ticker.Stop()
return
}
}
}()
}
Expand All @@ -115,28 +115,26 @@ func (p *Poller) Start() {
select {}
}

func (p *Poller) reachedPollLimit() bool {
return p.pollUntilBlock != nil && p.pollUntilBlock.Sign() > 0 && p.lastPolledBlock.Cmp(p.pollUntilBlock) >= 0
func (p *Poller) reachedPollLimit(blockNumber *big.Int) bool {
return p.pollUntilBlock.Sign() > 0 && blockNumber.Cmp(p.pollUntilBlock) >= 0
}

func (p *Poller) getBlockRange() ([]*big.Int, error) {
latestBlockUint64, err := p.rpc.EthClient.BlockNumber(context.Background())
latestBlock, err := p.getLatestBlockNumber()
if err != nil {
return nil, fmt.Errorf("failed to get latest block number: %v", err)
return nil, err
}
latestBlock := new(big.Int).SetUint64(latestBlockUint64)

log.Debug().Msgf("Last polled block: %s", p.lastPolledBlock.String())

startBlock := new(big.Int).Add(p.lastPolledBlock, big.NewInt(1))
endBlock := new(big.Int).Add(startBlock, big.NewInt(p.blocksPerPoll-1))

if startBlock.Cmp(latestBlock) > 0 {
log.Debug().Msgf("Start block %s is greater than latest block %s, skipping", startBlock, latestBlock)
return nil, nil
}
if endBlock.Cmp(latestBlock) > 0 {
endBlock = latestBlock
endBlock := p.getEndBlockForRange(startBlock, latestBlock)
if startBlock.Cmp(endBlock) > 0 {
log.Debug().Msgf("Invalid range: start block %s is greater than end block %s, skipping", startBlock, endBlock)
return nil, nil
}

blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1
Expand All @@ -148,6 +146,26 @@ func (p *Poller) getBlockRange() ([]*big.Int, error) {
return blockNumbers, nil
}

func (p *Poller) getLatestBlockNumber() (*big.Int, error) {
latestBlockUint64, err := p.rpc.EthClient.BlockNumber(context.Background())
if err != nil {
return nil, fmt.Errorf("failed to get latest block number: %v", err)
}
return new(big.Int).SetUint64(latestBlockUint64), nil
}

func (p *Poller) getEndBlockForRange(startBlock *big.Int, latestBlock *big.Int) *big.Int {
endBlock := new(big.Int).Add(startBlock, big.NewInt(p.blocksPerPoll-1))
if endBlock.Cmp(latestBlock) > 0 {
endBlock = latestBlock
}
if p.reachedPollLimit(endBlock) {
log.Debug().Msgf("End block %s is greater than poll until block %s, setting to poll until block", endBlock, p.pollUntilBlock)
endBlock = p.pollUntilBlock
}
return endBlock
}

func (p *Poller) handleWorkerResults(results []worker.WorkerResult) {
var successfulResults []worker.WorkerResult
var failedResults []worker.WorkerResult
Expand Down

0 comments on commit d75b7fd

Please sign in to comment.