From 621d0c178c8e4a255baa13c283feab035f5a7f0b Mon Sep 17 00:00:00 2001 From: iuwqyir Date: Wed, 2 Oct 2024 15:18:21 +0300 Subject: [PATCH] poller fixes --- internal/orchestrator/poller.go | 8 ++++++-- internal/worker/serializer.go | 15 +++++++++++---- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/internal/orchestrator/poller.go b/internal/orchestrator/poller.go index 7eed04e..25ae3bb 100644 --- a/internal/orchestrator/poller.go +++ b/internal/orchestrator/poller.go @@ -98,7 +98,7 @@ func (p *Poller) Start() { return } } - log.Debug().Msgf("Polling blocks %s to %s", blockNumbers[0], endBlock) + log.Debug().Msgf("Polling %d blocks starting from %s to %s", len(blockNumbers), blockNumbers[0], endBlock) worker := worker.NewWorker(p.rpc) results := worker.Run(blockNumbers) @@ -131,11 +131,15 @@ func (p *Poller) getBlockRange() ([]*big.Int, error) { startBlock := new(big.Int).Add(p.lastPolledBlock, big.NewInt(1)) endBlock := new(big.Int).Add(startBlock, big.NewInt(p.blocksPerPoll-1)) + if startBlock.Cmp(latestBlock) > 0 { + log.Debug().Msgf("Start block %s is greater than latest block %s, skipping", startBlock, latestBlock) + return nil, nil + } if endBlock.Cmp(latestBlock) > 0 { endBlock = latestBlock } - blockCount := endBlock.Sub(endBlock, startBlock).Int64() + 1 + blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1 blockNumbers := make([]*big.Int, blockCount) for i := int64(0); i < blockCount; i++ { blockNumbers[i] = new(big.Int).Add(startBlock, big.NewInt(i)) diff --git a/internal/worker/serializer.go b/internal/worker/serializer.go index 208fe96..e4f9d9d 100644 --- a/internal/worker/serializer.go +++ b/internal/worker/serializer.go @@ -2,6 +2,7 @@ package worker import ( "encoding/json" + "fmt" "math/big" "strconv" @@ -10,7 +11,7 @@ import ( ) func SerializeWorkerResults(chainId *big.Int, blocks []BatchFetchResult[RawBlock], logs []BatchFetchResult[RawLogs], traces []BatchFetchResult[RawTraces]) []WorkerResult { - results := make([]WorkerResult, len(blocks)) + results := make([]WorkerResult, 0, len(blocks)) rawLogsMap := make(map[string]BatchFetchResult[RawLogs]) for _, rawLogs := range logs { @@ -22,14 +23,20 @@ func SerializeWorkerResults(chainId *big.Int, blocks []BatchFetchResult[RawBlock rawTracesMap[rawTraces.BlockNumber.String()] = rawTraces } - for i, rawBlock := range blocks { + for _, rawBlock := range blocks { result := WorkerResult{ BlockNumber: rawBlock.BlockNumber, } + if rawBlock.Result == nil { + log.Warn().Msgf("Received a nil block result for block %s.", rawBlock.BlockNumber.String()) + result.Error = fmt.Errorf("received a nil block result from RPC") + results = append(results, result) + continue + } if rawBlock.Error != nil { result.Error = rawBlock.Error - results[i] = result + results = append(results, result) continue } @@ -55,7 +62,7 @@ func SerializeWorkerResults(chainId *big.Int, blocks []BatchFetchResult[RawBlock } } - results[i] = result + results = append(results, result) } return results