Skip to content

Commit

Permalink
make committer handle gaps itself
Browse files Browse the repository at this point in the history
  • Loading branch information
iuwqyir committed Oct 1, 2024
1 parent 3fc7b21 commit 884586f
Showing 1 changed file with 47 additions and 7 deletions.
54 changes: 47 additions & 7 deletions internal/orchestrator/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,7 @@ func (c *Committer) getSequentialBlockDataToCommit() ([]common.BlockData, error)
})

if blocksData[0].Block.Number.Cmp(blocksToCommit[0]) != 0 {
// Note: we are missing block(s) in the beginning of the batch in staging, The Failure Recoverer will handle this
// increment the a gap counter in prometheus
metrics.GapCounter.Inc()
// record the first missed block number in prometheus
metrics.MissedBlockNumbers.Set(float64(blocksData[0].Block.Number.Int64()))
return nil, fmt.Errorf("first block number (%s) in commit batch does not match expected (%s)", blocksData[0].Block.Number.String(), blocksToCommit[0].String())
return nil, c.handleGap(blocksToCommit[0], blocksData[0].Block)
}

var sequentialBlockData []common.BlockData
Expand All @@ -133,7 +128,7 @@ func (c *Committer) getSequentialBlockDataToCommit() ([]common.BlockData, error)
for i := 1; i < len(blocksData); i++ {
if blocksData[i].Block.Number.Cmp(expectedBlockNumber) != 0 {
// Note: Gap detected, stop here
log.Warn().Msgf("Gap detected at block %s, stopping commit", expectedBlockNumber.String())
log.Warn().Msgf("Gap detected at block %s, committing until %s", expectedBlockNumber.String(), blocksData[i-1].Block.Number.String())
// increment the a gap counter in prometheus
metrics.GapCounter.Inc()
// record the first missed block number in prometheus
Expand Down Expand Up @@ -234,3 +229,48 @@ func (c *Committer) saveDataToMainStorage(blockData []common.BlockData) error {

return nil
}

func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBlock common.Block) error {
// increment the a gap counter in prometheus
metrics.GapCounter.Inc()
// record the first missed block number in prometheus
metrics.MissedBlockNumbers.Set(float64(expectedStartBlockNumber.Int64()))

missingBlockCount := new(big.Int).Sub(actualFirstBlock.Number, expectedStartBlockNumber).Int64()
missingBlockNumbers := make([]*big.Int, missingBlockCount)
for i := int64(0); i < missingBlockCount; i++ {
missingBlockNumber := new(big.Int).Add(expectedStartBlockNumber, big.NewInt(i))
missingBlockNumbers[i] = missingBlockNumber
}
log.Debug().Msgf("Detected %d missing blocks between blocks %s and %s", missingBlockCount, expectedStartBlockNumber.String(), actualFirstBlock.Number.String())

existingBlockFailures, err := c.storage.OrchestratorStorage.GetBlockFailures(storage.QueryFilter{BlockNumbers: missingBlockNumbers, ChainId: c.rpc.ChainID})
if err != nil {
return fmt.Errorf("error getting block failures while handling gap: %v", err)
}

existingBlockFailuresMap := make(map[string]*common.BlockFailure)
for _, failure := range existingBlockFailures {
blockNumberStr := failure.BlockNumber.String()
existingBlockFailuresMap[blockNumberStr] = &failure
}

blockFailures := make([]common.BlockFailure, 0)
for _, blockNumber := range missingBlockNumbers {
blockNumberStr := blockNumber.String()
if _, ok := existingBlockFailuresMap[blockNumberStr]; !ok {
blockFailures = append(blockFailures, common.BlockFailure{
BlockNumber: blockNumber,
ChainId: c.rpc.ChainID,
FailureTime: time.Now(),
FailureCount: 1,
FailureReason: "Gap detected for this block",
})
}
}
log.Debug().Msgf("Storing %d block failures while handling gap", len(blockFailures))
if err := c.storage.OrchestratorStorage.StoreBlockFailures(blockFailures); err != nil {
return fmt.Errorf("error storing block failures while handling gap: %v", err)
}
return fmt.Errorf("first block number (%s) in commit batch does not match expected (%s)", actualFirstBlock.Number.String(), expectedStartBlockNumber.String())
}

0 comments on commit 884586f

Please sign in to comment.