Skip to content

Commit

Permalink
handle reorgs
Browse files Browse the repository at this point in the history
  • Loading branch information
iuwqyir committed Oct 10, 2024
1 parent d96a682 commit cdaf53e
Show file tree
Hide file tree
Showing 6 changed files with 298 additions and 0 deletions.
55 changes: 55 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,61 @@ committer:
blocksPerCommit: 1000
```

#### Reorg Handler
Whether to enable the reorg handler. Default is `true`.

cmd: `--reorgHandler-enabled`
env: `REORGHANDLER_ENABLED`
yaml:
```yaml
reorgHandler:
enabled: true
```

#### Reorg Handler Interval
Reorg handler trigger interval in milliseconds. Default is `1000`.

cmd: `--reorgHandler-interval`
env: `REORGHANDLER_INTERVAL`
yaml:
```yaml
reorgHandler:
interval: 3000
```

#### Reorg Handler Blocks Per Scan
How many blocks to scan for reorgs. Default is `100`.

cmd: `--reorgHandler-blocks-per-scan`
env: `REORGHANDLER_BLOCKSPERSCAN`
yaml:
```yaml
reorgHandler:
blocksPerScan: 1000
```

#### Reorg Handler From Block
From which block to start scanning for reorgs. Default is `0`.

cmd: `--reorgHandler-from-block`
env: `REORGHANDLER_FROMBLOCK`
yaml:
```yaml
reorgHandler:
fromBlock: 20000000
```

#### Reorg Handler Force From Block
Whether to force the reorg handler to start from the block specified in `reorgHandler-from-block`. Default is `false`.

cmd: `--reorgHandler-force-from-block`
env: `REORGHANDLER_FORCEFROMBLOCK`
yaml:
```yaml
reorgHandler:
forceFromBlock: true
```

#### Failure Recoverer
Whether to enable the failure recoverer. Default is `true`.

Expand Down
10 changes: 10 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ func init() {
rootCmd.PersistentFlags().Bool("committer-enabled", true, "Toggle committer")
rootCmd.PersistentFlags().Int("committer-blocks-per-commit", 10, "How many blocks to commit each interval")
rootCmd.PersistentFlags().Int("committer-interval", 1000, "How often to commit blocks in milliseconds")
rootCmd.PersistentFlags().Bool("reorgHandler-enabled", true, "Toggle reorg handler")
rootCmd.PersistentFlags().Int("reorgHandler-interval", 1000, "How often to run reorg handler in milliseconds")
rootCmd.PersistentFlags().Int("reorgHandler-blocks-per-scan", 100, "How many blocks to scan for reorgs")
rootCmd.PersistentFlags().Int("reorgHandler-from-block", 0, "From which block to start scanning for reorgs")
rootCmd.PersistentFlags().Bool("reorgHandler-force-from-block", false, "Force the reorg handler to start from the block specified in `reorgHandler-from-block`")
rootCmd.PersistentFlags().Bool("failure-recoverer-enabled", true, "Toggle failure recoverer")
rootCmd.PersistentFlags().Int("failure-recoverer-blocks-per-run", 10, "How many blocks to run failure recoverer for")
rootCmd.PersistentFlags().Int("failure-recoverer-interval", 1000, "How often to run failure recoverer in milliseconds")
Expand Down Expand Up @@ -98,6 +103,11 @@ func init() {
viper.BindPFlag("committer.enabled", rootCmd.PersistentFlags().Lookup("committer-enabled"))
viper.BindPFlag("committer.blocksPerCommit", rootCmd.PersistentFlags().Lookup("committer-blocks-per-commit"))
viper.BindPFlag("committer.interval", rootCmd.PersistentFlags().Lookup("committer-interval"))
viper.BindPFlag("reorgHandler.enabled", rootCmd.PersistentFlags().Lookup("reorgHandler-enabled"))
viper.BindPFlag("reorgHandler.interval", rootCmd.PersistentFlags().Lookup("reorgHandler-interval"))
viper.BindPFlag("reorgHandler.blocksPerScan", rootCmd.PersistentFlags().Lookup("reorgHandler-blocks-per-scan"))
viper.BindPFlag("reorgHandler.fromBlock", rootCmd.PersistentFlags().Lookup("reorgHandler-from-block"))
viper.BindPFlag("reorgHandler.forceFromBlock", rootCmd.PersistentFlags().Lookup("reorgHandler-force-from-block"))
viper.BindPFlag("failureRecoverer.enabled", rootCmd.PersistentFlags().Lookup("failure-recoverer-enabled"))
viper.BindPFlag("failureRecoverer.blocksPerRun", rootCmd.PersistentFlags().Lookup("failure-recoverer-blocks-per-run"))
viper.BindPFlag("failureRecoverer.interval", rootCmd.PersistentFlags().Lookup("failure-recoverer-interval"))
Expand Down
9 changes: 9 additions & 0 deletions configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ type CommitterConfig struct {
BlocksPerCommit int `mapstructure:"blocksPerCommit"`
}

type ReorgHandlerConfig struct {
Enabled bool `mapstructure:"enabled"`
Interval int `mapstructure:"interval"`
BlocksPerScan int `mapstructure:"blocksPerScan"`
FromBlock int `mapstructure:"fromBlock"`
ForceFromBlock bool `mapstructure:"forceFromBlock"`
}

type FailureRecovererConfig struct {
Enabled bool `mapstructure:"enabled"`
Interval int `mapstructure:"interval"`
Expand Down Expand Up @@ -101,6 +109,7 @@ type Config struct {
Poller PollerConfig `mapstructure:"poller"`
Committer CommitterConfig `mapstructure:"committer"`
FailureRecoverer FailureRecovererConfig `mapstructure:"failureRecoverer"`
ReorgHandler ReorgHandlerConfig `mapstructure:"reorgHandler"`
Storage StorageConfig `mapstructure:"storage"`
API APIConfig `mapstructure:"api"`
}
Expand Down
13 changes: 13 additions & 0 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,16 @@ var (
Help: "The first block number in the failure recoverer batch",
})
)

// Reorg Handler Metrics
var (
ReorgHandlerLastCheckedBlock = promauto.NewGauge(prometheus.GaugeOpts{
Name: "reorg_handler_last_checked_block",
Help: "The last block number that the reorg handler checked",
})

ReorgCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "reorg_handler_reorg_counter",
Help: "The number of reorgs detected",
})
)
11 changes: 11 additions & 0 deletions internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Orchestrator struct {
pollerEnabled bool
failureRecovererEnabled bool
committerEnabled bool
reorgHandlerEnabled bool
}

func NewOrchestrator(rpc rpc.Client) (*Orchestrator, error) {
Expand All @@ -28,6 +29,7 @@ func NewOrchestrator(rpc rpc.Client) (*Orchestrator, error) {
pollerEnabled: config.Cfg.Poller.Enabled,
failureRecovererEnabled: config.Cfg.FailureRecoverer.Enabled,
committerEnabled: config.Cfg.Committer.Enabled,
reorgHandlerEnabled: config.Cfg.ReorgHandler.Enabled,
}, nil
}

Expand Down Expand Up @@ -61,6 +63,15 @@ func (o *Orchestrator) Start() {
}()
}

if o.reorgHandlerEnabled {
wg.Add(1)
go func() {
defer wg.Done()
reorgHandler := NewReorgHandler(o.rpc, o.storage)
reorgHandler.Start()
}()
}

// The chain tracker is always running
wg.Add(1)
go func() {
Expand Down
200 changes: 200 additions & 0 deletions internal/orchestrator/reorg_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package orchestrator

import (
"fmt"
"math/big"
"time"

"github.com/rs/zerolog/log"
config "github.com/thirdweb-dev/indexer/configs"
"github.com/thirdweb-dev/indexer/internal/common"
"github.com/thirdweb-dev/indexer/internal/metrics"
"github.com/thirdweb-dev/indexer/internal/rpc"
"github.com/thirdweb-dev/indexer/internal/storage"
"github.com/thirdweb-dev/indexer/internal/worker"
)

type ReorgHandler struct {
rpc rpc.Client
storage storage.IStorage
triggerInterval int
blocksPerScan int
lastCheckedBlock *big.Int
worker *worker.Worker
}

const DEFAULT_REORG_HANDLER_INTERVAL = 1000
const DEFAULT_REORG_HANDLER_BLOCKS_PER_SCAN = 100

func NewReorgHandler(rpc rpc.Client, storage storage.IStorage) *ReorgHandler {
triggerInterval := config.Cfg.ReorgHandler.Interval
if triggerInterval == 0 {
triggerInterval = DEFAULT_REORG_HANDLER_INTERVAL
}
blocksPerScan := config.Cfg.ReorgHandler.BlocksPerScan
if blocksPerScan == 0 {
blocksPerScan = DEFAULT_REORG_HANDLER_BLOCKS_PER_SCAN
}
return &ReorgHandler{
rpc: rpc,
storage: storage,
worker: worker.NewWorker(rpc),
triggerInterval: triggerInterval,
blocksPerScan: blocksPerScan,
lastCheckedBlock: getInitialCheckedBlockNumber(storage, rpc.ChainID),
}
}

func getInitialCheckedBlockNumber(storage storage.IStorage, chainId *big.Int) *big.Int {
bn := big.NewInt(int64(config.Cfg.ReorgHandler.FromBlock))
if !config.Cfg.ReorgHandler.ForceFromBlock {
storedFromBlock, err := storage.OrchestratorStorage.GetLastReorgCheckedBlockNumber(chainId)
if err != nil {
log.Debug().Err(err).Msgf("Error getting last reorg checked block number, using configured: %s", bn)
return bn
}
if storedFromBlock.Sign() <= 0 {
log.Debug().Msgf("Last reorg checked block number not found, using configured: %s", bn)
return bn
}
log.Debug().Msgf("Last reorg checked block number found, using: %s", storedFromBlock)
return storedFromBlock
}
log.Debug().Msgf("Force from block reorg check flag set, using configured: %s", bn)
return bn
}

func (rh *ReorgHandler) Start() {
interval := time.Duration(rh.triggerInterval) * time.Millisecond
ticker := time.NewTicker(interval)

log.Debug().Msgf("Reorg handler running")
go func() {
for range ticker.C {
lookbackFrom := new(big.Int).Add(rh.lastCheckedBlock, big.NewInt(int64(rh.blocksPerScan)))
blockHeaders, err := rh.storage.MainStorage.LookbackBlockHeaders(rh.rpc.ChainID, rh.blocksPerScan, lookbackFrom)
if err != nil {
log.Error().Err(err).Msg("Error getting recent block headers")
continue
}
if len(blockHeaders) == 0 {
log.Warn().Msg("No block headers found")
continue
}
mostRecentBlockHeader := blockHeaders[0]
reorgEndIndex := findReorgEndIndex(blockHeaders)
if reorgEndIndex == -1 {
rh.lastCheckedBlock = mostRecentBlockHeader.Number
rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber(rh.rpc.ChainID, mostRecentBlockHeader.Number)
metrics.ReorgHandlerLastCheckedBlock.Set(float64(mostRecentBlockHeader.Number.Int64()))
continue
}
metrics.ReorgCounter.Inc()
forkPoint, err := rh.findForkPoint(blockHeaders[reorgEndIndex:])
if err != nil {
log.Error().Err(err).Msg("Error while finding fork point")
continue
}
err = rh.handleReorg(forkPoint, lookbackFrom)
if err != nil {
log.Error().Err(err).Msg("Error while handling reorg")
continue
}
rh.lastCheckedBlock = mostRecentBlockHeader.Number
rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber(rh.rpc.ChainID, mostRecentBlockHeader.Number)
metrics.ReorgHandlerLastCheckedBlock.Set(float64(mostRecentBlockHeader.Number.Int64()))
}
}()

// Keep the program running (otherwise it will exit)
select {}
}

func findReorgEndIndex(reversedBlockHeaders []common.BlockHeader) (index int) {
for i := 0; i < len(reversedBlockHeaders)-1; i++ {
currentBlock := reversedBlockHeaders[i]
previousBlock := reversedBlockHeaders[i+1]

if currentBlock.ParentHash != previousBlock.Hash {
log.Debug().
Str("currentBlockNumber", currentBlock.Number.String()).
Str("currentBlockHash", currentBlock.Hash).
Str("currentBlockParentHash", currentBlock.ParentHash).
Str("previousBlockNumber", previousBlock.Number.String()).
Str("previousBlockHash", previousBlock.Hash).
Msg("Reorg detected: parent hash mismatch")
return i
}
}
return -1
}

func (rh *ReorgHandler) findForkPoint(reversedBlockHeaders []common.BlockHeader) (forkPoint *big.Int, err error) {
newBlocksByNumber, err := rh.getNewBlocksByNumber(reversedBlockHeaders)
if err != nil {
return nil, err
}

for i := 0; i < len(reversedBlockHeaders)-1; i++ {
blockHeader := reversedBlockHeaders[i]
block, ok := (*newBlocksByNumber)[blockHeader.Number.String()]
if !ok {
return nil, fmt.Errorf("block not found: %s", blockHeader.Number.String())
}
if block.Hash == blockHeader.Hash {
previousBlock := reversedBlockHeaders[i+1]
return previousBlock.Number, nil
}
}
lookbackFrom := reversedBlockHeaders[len(reversedBlockHeaders)-1].Number
nextHeadersBatch, err := rh.storage.MainStorage.LookbackBlockHeaders(rh.rpc.ChainID, rh.blocksPerScan, lookbackFrom)
if err != nil {
return nil, fmt.Errorf("error getting next headers batch: %w", err)
}
return rh.findForkPoint(nextHeadersBatch)
}

func (rh *ReorgHandler) getNewBlocksByNumber(reversedBlockHeaders []common.BlockHeader) (*map[string]common.Block, error) {
blockNumbers := make([]*big.Int, 0, len(reversedBlockHeaders))
for _, header := range reversedBlockHeaders {
blockNumbers = append(blockNumbers, header.Number)
}
blockResults := rh.rpc.GetBlocks(blockNumbers)
fetchedBlocksByNumber := make(map[string]common.Block)
for _, blockResult := range blockResults {
if blockResult.Error != nil {
return nil, fmt.Errorf("error fetching block %s: %w", blockResult.BlockNumber.String(), blockResult.Error)
}
fetchedBlocksByNumber[blockResult.BlockNumber.String()] = blockResult.Data
}
return &fetchedBlocksByNumber, nil
}

func (rh *ReorgHandler) handleReorg(reorgStart *big.Int, reorgEnd *big.Int) error {
blockRange := make([]*big.Int, 0, new(big.Int).Sub(reorgEnd, reorgStart).Int64())
for i := new(big.Int).Set(reorgStart); i.Cmp(reorgEnd) <= 0; i.Add(i, big.NewInt(1)) {
blockRange = append(blockRange, new(big.Int).Set(i))
}

results := rh.worker.Run(blockRange)
data := make([]common.BlockData, 0, len(results))
for _, result := range results {
if result.Error != nil {
return fmt.Errorf("cannot fix reorg: failed block %s: %w", result.BlockNumber.String(), result.Error)
}
data = append(data, common.BlockData{
Block: result.Data.Block,
Logs: result.Data.Logs,
Transactions: result.Data.Transactions,
Traces: result.Data.Traces,
})
}
// TODO make delete and insert atomic
if err := rh.storage.MainStorage.DeleteBlockData(rh.rpc.ChainID, blockRange); err != nil {
return fmt.Errorf("error deleting data for blocks %v: %w", blockRange, err)
}
if err := rh.storage.MainStorage.InsertBlockData(&data); err != nil {
return fmt.Errorf("error saving data to main storage: %w", err)
}
return nil
}

0 comments on commit cdaf53e

Please sign in to comment.