Skip to content

Commit

Permalink
Add reorg handler functionality and configuration options (#91)
Browse files Browse the repository at this point in the history
### TL;DR

Added a new reorg handler to detect and handle blockchain reorganizations.

### What changed?

- Introduced a new `ReorgHandler` component with configuration options
- Added reorg detection logic to scan for inconsistencies in block headers
- Implemented a fork point detection mechanism
- Created a process to handle reorgs by re-fetching and updating affected blocks
- Added new metrics for tracking reorg handler performance
- Updated the orchestrator to include the reorg handler in its workflow

### How to test?

1. Enable the reorg handler using the `--reorgHandler-enabled` flag or `REORGHANDLER_ENABLED` environment variable
2. Configure the reorg handler settings:
   - Interval: `--reorgHandler-interval` or `REORGHANDLER_INTERVAL`
   - Blocks per scan: `--reorgHandler-blocks-per-scan` or `REORGHANDLER_BLOCKSPERSCAN`
   - Starting block: `--reorgHandler-from-block` or `REORGHANDLER_FROMBLOCK`
   - Force starting block: `--reorgHandler-force-from-block` or `REORGHANDLER_FORCEFROMBLOCK`
3. Run the indexer and monitor the logs for reorg detection messages
4. Check the new metrics `reorg_handler_last_checked_block` and `reorg_handler_reorg_counter` for reorg handler activity

### Why make this change?

Blockchain reorganizations can lead to inconsistencies in indexed data. The reorg handler ensures data integrity by detecting reorgs and updating the affected blocks, maintaining the accuracy of the indexed information.
  • Loading branch information
iuwqyir authored Oct 10, 2024
2 parents 80fcbb5 + acb60cc commit f541e01
Show file tree
Hide file tree
Showing 6 changed files with 298 additions and 0 deletions.
55 changes: 55 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,61 @@ committer:
blocksPerCommit: 1000
```

#### Reorg Handler
Whether to enable the reorg handler. Default is `true`.

cmd: `--reorgHandler-enabled`
env: `REORGHANDLER_ENABLED`
yaml:
```yaml
reorgHandler:
enabled: true
```

#### Reorg Handler Interval
Reorg handler trigger interval in milliseconds. Default is `1000`.

cmd: `--reorgHandler-interval`
env: `REORGHANDLER_INTERVAL`
yaml:
```yaml
reorgHandler:
interval: 3000
```

#### Reorg Handler Blocks Per Scan
How many blocks to scan for reorgs. Default is `100`.

cmd: `--reorgHandler-blocks-per-scan`
env: `REORGHANDLER_BLOCKSPERSCAN`
yaml:
```yaml
reorgHandler:
blocksPerScan: 1000
```

#### Reorg Handler From Block
From which block to start scanning for reorgs. Default is `0`.

cmd: `--reorgHandler-from-block`
env: `REORGHANDLER_FROMBLOCK`
yaml:
```yaml
reorgHandler:
fromBlock: 20000000
```

#### Reorg Handler Force From Block
Whether to force the reorg handler to start from the block specified in `reorgHandler-from-block`. Default is `false`.

cmd: `--reorgHandler-force-from-block`
env: `REORGHANDLER_FORCEFROMBLOCK`
yaml:
```yaml
reorgHandler:
forceFromBlock: true
```

#### Failure Recoverer
Whether to enable the failure recoverer. Default is `true`.

Expand Down
10 changes: 10 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ func init() {
rootCmd.PersistentFlags().Bool("committer-enabled", true, "Toggle committer")
rootCmd.PersistentFlags().Int("committer-blocks-per-commit", 10, "How many blocks to commit each interval")
rootCmd.PersistentFlags().Int("committer-interval", 1000, "How often to commit blocks in milliseconds")
rootCmd.PersistentFlags().Bool("reorgHandler-enabled", true, "Toggle reorg handler")
rootCmd.PersistentFlags().Int("reorgHandler-interval", 1000, "How often to run reorg handler in milliseconds")
rootCmd.PersistentFlags().Int("reorgHandler-blocks-per-scan", 100, "How many blocks to scan for reorgs")
rootCmd.PersistentFlags().Int("reorgHandler-from-block", 0, "From which block to start scanning for reorgs")
rootCmd.PersistentFlags().Bool("reorgHandler-force-from-block", false, "Force the reorg handler to start from the block specified in `reorgHandler-from-block`")
rootCmd.PersistentFlags().Bool("failure-recoverer-enabled", true, "Toggle failure recoverer")
rootCmd.PersistentFlags().Int("failure-recoverer-blocks-per-run", 10, "How many blocks to run failure recoverer for")
rootCmd.PersistentFlags().Int("failure-recoverer-interval", 1000, "How often to run failure recoverer in milliseconds")
Expand Down Expand Up @@ -98,6 +103,11 @@ func init() {
viper.BindPFlag("committer.enabled", rootCmd.PersistentFlags().Lookup("committer-enabled"))
viper.BindPFlag("committer.blocksPerCommit", rootCmd.PersistentFlags().Lookup("committer-blocks-per-commit"))
viper.BindPFlag("committer.interval", rootCmd.PersistentFlags().Lookup("committer-interval"))
viper.BindPFlag("reorgHandler.enabled", rootCmd.PersistentFlags().Lookup("reorgHandler-enabled"))
viper.BindPFlag("reorgHandler.interval", rootCmd.PersistentFlags().Lookup("reorgHandler-interval"))
viper.BindPFlag("reorgHandler.blocksPerScan", rootCmd.PersistentFlags().Lookup("reorgHandler-blocks-per-scan"))
viper.BindPFlag("reorgHandler.fromBlock", rootCmd.PersistentFlags().Lookup("reorgHandler-from-block"))
viper.BindPFlag("reorgHandler.forceFromBlock", rootCmd.PersistentFlags().Lookup("reorgHandler-force-from-block"))
viper.BindPFlag("failureRecoverer.enabled", rootCmd.PersistentFlags().Lookup("failure-recoverer-enabled"))
viper.BindPFlag("failureRecoverer.blocksPerRun", rootCmd.PersistentFlags().Lookup("failure-recoverer-blocks-per-run"))
viper.BindPFlag("failureRecoverer.interval", rootCmd.PersistentFlags().Lookup("failure-recoverer-interval"))
Expand Down
9 changes: 9 additions & 0 deletions configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ type CommitterConfig struct {
BlocksPerCommit int `mapstructure:"blocksPerCommit"`
}

type ReorgHandlerConfig struct {
Enabled bool `mapstructure:"enabled"`
Interval int `mapstructure:"interval"`
BlocksPerScan int `mapstructure:"blocksPerScan"`
FromBlock int `mapstructure:"fromBlock"`
ForceFromBlock bool `mapstructure:"forceFromBlock"`
}

type FailureRecovererConfig struct {
Enabled bool `mapstructure:"enabled"`
Interval int `mapstructure:"interval"`
Expand Down Expand Up @@ -101,6 +109,7 @@ type Config struct {
Poller PollerConfig `mapstructure:"poller"`
Committer CommitterConfig `mapstructure:"committer"`
FailureRecoverer FailureRecovererConfig `mapstructure:"failureRecoverer"`
ReorgHandler ReorgHandlerConfig `mapstructure:"reorgHandler"`
Storage StorageConfig `mapstructure:"storage"`
API APIConfig `mapstructure:"api"`
}
Expand Down
13 changes: 13 additions & 0 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,16 @@ var (
Help: "The first block number in the failure recoverer batch",
})
)

// Reorg Handler Metrics
var (
ReorgHandlerLastCheckedBlock = promauto.NewGauge(prometheus.GaugeOpts{
Name: "reorg_handler_last_checked_block",
Help: "The last block number that the reorg handler checked",
})

ReorgCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "reorg_handler_reorg_counter",
Help: "The number of reorgs detected",
})
)
11 changes: 11 additions & 0 deletions internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Orchestrator struct {
pollerEnabled bool
failureRecovererEnabled bool
committerEnabled bool
reorgHandlerEnabled bool
}

func NewOrchestrator(rpc rpc.Client) (*Orchestrator, error) {
Expand All @@ -28,6 +29,7 @@ func NewOrchestrator(rpc rpc.Client) (*Orchestrator, error) {
pollerEnabled: config.Cfg.Poller.Enabled,
failureRecovererEnabled: config.Cfg.FailureRecoverer.Enabled,
committerEnabled: config.Cfg.Committer.Enabled,
reorgHandlerEnabled: config.Cfg.ReorgHandler.Enabled,
}, nil
}

Expand Down Expand Up @@ -61,6 +63,15 @@ func (o *Orchestrator) Start() {
}()
}

if o.reorgHandlerEnabled {
wg.Add(1)
go func() {
defer wg.Done()
reorgHandler := NewReorgHandler(o.rpc, o.storage)
reorgHandler.Start()
}()
}

// The chain tracker is always running
wg.Add(1)
go func() {
Expand Down
200 changes: 200 additions & 0 deletions internal/orchestrator/reorg_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package orchestrator

import (
"fmt"
"math/big"
"time"

"github.com/rs/zerolog/log"
config "github.com/thirdweb-dev/indexer/configs"
"github.com/thirdweb-dev/indexer/internal/common"
"github.com/thirdweb-dev/indexer/internal/metrics"
"github.com/thirdweb-dev/indexer/internal/rpc"
"github.com/thirdweb-dev/indexer/internal/storage"
"github.com/thirdweb-dev/indexer/internal/worker"
)

type ReorgHandler struct {
rpc rpc.Client
storage storage.IStorage
triggerInterval int
blocksPerScan int
lastCheckedBlock *big.Int
worker *worker.Worker
}

const DEFAULT_REORG_HANDLER_INTERVAL = 1000
const DEFAULT_REORG_HANDLER_BLOCKS_PER_SCAN = 100

func NewReorgHandler(rpc rpc.Client, storage storage.IStorage) *ReorgHandler {
triggerInterval := config.Cfg.ReorgHandler.Interval
if triggerInterval == 0 {
triggerInterval = DEFAULT_REORG_HANDLER_INTERVAL
}
blocksPerScan := config.Cfg.ReorgHandler.BlocksPerScan
if blocksPerScan == 0 {
blocksPerScan = DEFAULT_REORG_HANDLER_BLOCKS_PER_SCAN
}
return &ReorgHandler{
rpc: rpc,
storage: storage,
worker: worker.NewWorker(rpc),
triggerInterval: triggerInterval,
blocksPerScan: blocksPerScan,
lastCheckedBlock: getInitialCheckedBlockNumber(storage, rpc.ChainID),
}
}

func getInitialCheckedBlockNumber(storage storage.IStorage, chainId *big.Int) *big.Int {
bn := big.NewInt(int64(config.Cfg.ReorgHandler.FromBlock))
if !config.Cfg.ReorgHandler.ForceFromBlock {
storedFromBlock, err := storage.OrchestratorStorage.GetLastReorgCheckedBlockNumber(chainId)
if err != nil {
log.Debug().Err(err).Msgf("Error getting last reorg checked block number, using configured: %s", bn)
return bn
}
if storedFromBlock.Sign() <= 0 {
log.Debug().Msgf("Last reorg checked block number not found, using configured: %s", bn)
return bn
}
log.Debug().Msgf("Last reorg checked block number found, using: %s", storedFromBlock)
return storedFromBlock
}
log.Debug().Msgf("Force from block reorg check flag set, using configured: %s", bn)
return bn
}

func (rh *ReorgHandler) Start() {
interval := time.Duration(rh.triggerInterval) * time.Millisecond
ticker := time.NewTicker(interval)

log.Debug().Msgf("Reorg handler running")
go func() {
for range ticker.C {
lookbackFrom := new(big.Int).Add(rh.lastCheckedBlock, big.NewInt(int64(rh.blocksPerScan)))
blockHeaders, err := rh.storage.MainStorage.LookbackBlockHeaders(rh.rpc.ChainID, rh.blocksPerScan, lookbackFrom)
if err != nil {
log.Error().Err(err).Msg("Error getting recent block headers")
continue
}
if len(blockHeaders) == 0 {
log.Warn().Msg("No block headers found")
continue
}
mostRecentBlockHeader := blockHeaders[0]
reorgEndIndex := findReorgEndIndex(blockHeaders)
if reorgEndIndex == -1 {
rh.lastCheckedBlock = mostRecentBlockHeader.Number
rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber(rh.rpc.ChainID, mostRecentBlockHeader.Number)
metrics.ReorgHandlerLastCheckedBlock.Set(float64(mostRecentBlockHeader.Number.Int64()))
continue
}
metrics.ReorgCounter.Inc()
forkPoint, err := rh.findForkPoint(blockHeaders[reorgEndIndex:])
if err != nil {
log.Error().Err(err).Msg("Error while finding fork point")
continue
}
err = rh.handleReorg(forkPoint, lookbackFrom)
if err != nil {
log.Error().Err(err).Msg("Error while handling reorg")
continue
}
rh.lastCheckedBlock = mostRecentBlockHeader.Number
rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber(rh.rpc.ChainID, mostRecentBlockHeader.Number)
metrics.ReorgHandlerLastCheckedBlock.Set(float64(mostRecentBlockHeader.Number.Int64()))
}
}()

// Keep the program running (otherwise it will exit)
select {}
}

func findReorgEndIndex(reversedBlockHeaders []common.BlockHeader) (index int) {
for i := 0; i < len(reversedBlockHeaders)-1; i++ {
currentBlock := reversedBlockHeaders[i]
previousBlock := reversedBlockHeaders[i+1]

if currentBlock.ParentHash != previousBlock.Hash {
log.Debug().
Str("currentBlockNumber", currentBlock.Number.String()).
Str("currentBlockHash", currentBlock.Hash).
Str("currentBlockParentHash", currentBlock.ParentHash).
Str("previousBlockNumber", previousBlock.Number.String()).
Str("previousBlockHash", previousBlock.Hash).
Msg("Reorg detected: parent hash mismatch")
return i
}
}
return -1
}

func (rh *ReorgHandler) findForkPoint(reversedBlockHeaders []common.BlockHeader) (forkPoint *big.Int, err error) {
newBlocksByNumber, err := rh.getNewBlocksByNumber(reversedBlockHeaders)
if err != nil {
return nil, err
}

for i := 0; i < len(reversedBlockHeaders)-1; i++ {
blockHeader := reversedBlockHeaders[i]
block, ok := (*newBlocksByNumber)[blockHeader.Number.String()]
if !ok {
return nil, fmt.Errorf("block not found: %s", blockHeader.Number.String())
}
if block.Hash == blockHeader.Hash {
previousBlock := reversedBlockHeaders[i+1]
return previousBlock.Number, nil
}
}
lookbackFrom := reversedBlockHeaders[len(reversedBlockHeaders)-1].Number
nextHeadersBatch, err := rh.storage.MainStorage.LookbackBlockHeaders(rh.rpc.ChainID, rh.blocksPerScan, lookbackFrom)
if err != nil {
return nil, fmt.Errorf("error getting next headers batch: %w", err)
}
return rh.findForkPoint(nextHeadersBatch)
}

func (rh *ReorgHandler) getNewBlocksByNumber(reversedBlockHeaders []common.BlockHeader) (*map[string]common.Block, error) {
blockNumbers := make([]*big.Int, 0, len(reversedBlockHeaders))
for _, header := range reversedBlockHeaders {
blockNumbers = append(blockNumbers, header.Number)
}
blockResults := rh.rpc.GetBlocks(blockNumbers)
fetchedBlocksByNumber := make(map[string]common.Block)
for _, blockResult := range blockResults {
if blockResult.Error != nil {
return nil, fmt.Errorf("error fetching block %s: %w", blockResult.BlockNumber.String(), blockResult.Error)
}
fetchedBlocksByNumber[blockResult.BlockNumber.String()] = blockResult.Data
}
return &fetchedBlocksByNumber, nil
}

func (rh *ReorgHandler) handleReorg(reorgStart *big.Int, reorgEnd *big.Int) error {
blockRange := make([]*big.Int, 0, new(big.Int).Sub(reorgEnd, reorgStart).Int64())
for i := new(big.Int).Set(reorgStart); i.Cmp(reorgEnd) <= 0; i.Add(i, big.NewInt(1)) {
blockRange = append(blockRange, new(big.Int).Set(i))
}

results := rh.worker.Run(blockRange)
data := make([]common.BlockData, 0, len(results))
for _, result := range results {
if result.Error != nil {
return fmt.Errorf("cannot fix reorg: failed block %s: %w", result.BlockNumber.String(), result.Error)
}
data = append(data, common.BlockData{
Block: result.Data.Block,
Logs: result.Data.Logs,
Transactions: result.Data.Transactions,
Traces: result.Data.Traces,
})
}
// TODO make delete and insert atomic
if err := rh.storage.MainStorage.DeleteBlockData(rh.rpc.ChainID, blockRange); err != nil {
return fmt.Errorf("error deleting data for blocks %v: %w", blockRange, err)
}
if err := rh.storage.MainStorage.InsertBlockData(&data); err != nil {
return fmt.Errorf("error saving data to main storage: %w", err)
}
return nil
}

0 comments on commit f541e01

Please sign in to comment.