diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index 601fd23..9549068 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -118,12 +118,7 @@ func (c *Committer) getSequentialBlockDataToCommit() ([]common.BlockData, error) }) if blocksData[0].Block.Number.Cmp(blocksToCommit[0]) != 0 { - // Note: we are missing block(s) in the beginning of the batch in staging, The Failure Recoverer will handle this - // increment the a gap counter in prometheus - metrics.GapCounter.Inc() - // record the first missed block number in prometheus - metrics.MissedBlockNumbers.Set(float64(blocksData[0].Block.Number.Int64())) - return nil, fmt.Errorf("first block number (%s) in commit batch does not match expected (%s)", blocksData[0].Block.Number.String(), blocksToCommit[0].String()) + return nil, c.handleGap(blocksToCommit[0], blocksData[0].Block) } var sequentialBlockData []common.BlockData @@ -133,7 +128,7 @@ func (c *Committer) getSequentialBlockDataToCommit() ([]common.BlockData, error) for i := 1; i < len(blocksData); i++ { if blocksData[i].Block.Number.Cmp(expectedBlockNumber) != 0 { // Note: Gap detected, stop here - log.Warn().Msgf("Gap detected at block %s, stopping commit", expectedBlockNumber.String()) + log.Warn().Msgf("Gap detected at block %s, committing until %s", expectedBlockNumber.String(), blocksData[i-1].Block.Number.String()) // increment the a gap counter in prometheus metrics.GapCounter.Inc() // record the first missed block number in prometheus @@ -234,3 +229,48 @@ func (c *Committer) saveDataToMainStorage(blockData []common.BlockData) error { return nil } + +func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBlock common.Block) error { + // increment the a gap counter in prometheus + metrics.GapCounter.Inc() + // record the first missed block number in prometheus + metrics.MissedBlockNumbers.Set(float64(expectedStartBlockNumber.Int64())) + + missingBlockCount := new(big.Int).Sub(actualFirstBlock.Number, expectedStartBlockNumber).Int64() + missingBlockNumbers := make([]*big.Int, missingBlockCount) + for i := int64(0); i < missingBlockCount; i++ { + missingBlockNumber := new(big.Int).Add(expectedStartBlockNumber, big.NewInt(i)) + missingBlockNumbers[i] = missingBlockNumber + } + log.Debug().Msgf("Detected %d missing blocks between blocks %s and %s", missingBlockCount, expectedStartBlockNumber.String(), actualFirstBlock.Number.String()) + + existingBlockFailures, err := c.storage.OrchestratorStorage.GetBlockFailures(storage.QueryFilter{BlockNumbers: missingBlockNumbers, ChainId: c.rpc.ChainID}) + if err != nil { + return fmt.Errorf("error getting block failures while handling gap: %v", err) + } + + existingBlockFailuresMap := make(map[string]*common.BlockFailure) + for _, failure := range existingBlockFailures { + blockNumberStr := failure.BlockNumber.String() + existingBlockFailuresMap[blockNumberStr] = &failure + } + + blockFailures := make([]common.BlockFailure, 0) + for _, blockNumber := range missingBlockNumbers { + blockNumberStr := blockNumber.String() + if _, ok := existingBlockFailuresMap[blockNumberStr]; !ok { + blockFailures = append(blockFailures, common.BlockFailure{ + BlockNumber: blockNumber, + ChainId: c.rpc.ChainID, + FailureTime: time.Now(), + FailureCount: 1, + FailureReason: "Gap detected for this block", + }) + } + } + log.Debug().Msgf("Storing %d block failures while handling gap", len(blockFailures)) + if err := c.storage.OrchestratorStorage.StoreBlockFailures(blockFailures); err != nil { + return fmt.Errorf("error storing block failures while handling gap: %v", err) + } + return fmt.Errorf("first block number (%s) in commit batch does not match expected (%s)", actualFirstBlock.Number.String(), expectedStartBlockNumber.String()) +}