From cdaf53e7c1e4ef51e124514989e907bf8977bc20 Mon Sep 17 00:00:00 2001
From: iuwqyir <toomas.oosalu@gmail.com>
Date: Fri, 4 Oct 2024 19:32:16 +0300
Subject: [PATCH] handle reorgs

---
 README.md                              |  55 +++++++
 cmd/root.go                            |  10 ++
 configs/config.go                      |   9 ++
 internal/metrics/metrics.go            |  13 ++
 internal/orchestrator/orchestrator.go  |  11 ++
 internal/orchestrator/reorg_handler.go | 200 +++++++++++++++++++++++++
 6 files changed, 298 insertions(+)
 create mode 100644 internal/orchestrator/reorg_handler.go

diff --git a/README.md b/README.md
index 407783d..6af284a 100644
--- a/README.md
+++ b/README.md
@@ -294,6 +294,61 @@ committer:
   blocksPerCommit: 1000
 ```
 
+#### Reorg Handler
+Whether to enable the reorg handler. Default is `true`.
+
+cmd: `--reorgHandler-enabled`
+env: `REORGHANDLER_ENABLED`
+yaml:
+```yaml
+reorgHandler:
+  enabled: true
+```
+
+#### Reorg Handler Interval
+Reorg handler trigger interval in milliseconds. Default is `1000`.
+
+cmd: `--reorgHandler-interval`
+env: `REORGHANDLER_INTERVAL`
+yaml:
+```yaml
+reorgHandler:
+  interval: 3000
+```
+
+#### Reorg Handler Blocks Per Scan
+How many blocks to scan for reorgs. Default is `100`.
+
+cmd: `--reorgHandler-blocks-per-scan`
+env: `REORGHANDLER_BLOCKSPERSCAN`
+yaml:
+```yaml
+reorgHandler:
+  blocksPerScan: 1000
+```
+
+#### Reorg Handler From Block
+From which block to start scanning for reorgs. Default is `0`.
+
+cmd: `--reorgHandler-from-block`
+env: `REORGHANDLER_FROMBLOCK`
+yaml:
+```yaml
+reorgHandler:
+  fromBlock: 20000000
+```
+
+#### Reorg Handler Force From Block
+Whether to force the reorg handler to start from the block specified in `reorgHandler-from-block`. Default is `false`.
+
+cmd: `--reorgHandler-force-from-block`
+env: `REORGHANDLER_FORCEFROMBLOCK`  
+yaml:
+```yaml
+reorgHandler:
+  forceFromBlock: true
+```
+
 #### Failure Recoverer
 Whether to enable the failure recoverer. Default is `true`.
 
diff --git a/cmd/root.go b/cmd/root.go
index 8b294f3..6503886 100644
--- a/cmd/root.go
+++ b/cmd/root.go
@@ -55,6 +55,11 @@ func init() {
 	rootCmd.PersistentFlags().Bool("committer-enabled", true, "Toggle committer")
 	rootCmd.PersistentFlags().Int("committer-blocks-per-commit", 10, "How many blocks to commit each interval")
 	rootCmd.PersistentFlags().Int("committer-interval", 1000, "How often to commit blocks in milliseconds")
+	rootCmd.PersistentFlags().Bool("reorgHandler-enabled", true, "Toggle reorg handler")
+	rootCmd.PersistentFlags().Int("reorgHandler-interval", 1000, "How often to run reorg handler in milliseconds")
+	rootCmd.PersistentFlags().Int("reorgHandler-blocks-per-scan", 100, "How many blocks to scan for reorgs")
+	rootCmd.PersistentFlags().Int("reorgHandler-from-block", 0, "From which block to start scanning for reorgs")
+	rootCmd.PersistentFlags().Bool("reorgHandler-force-from-block", false, "Force the reorg handler to start from the block specified in `reorgHandler-from-block`")
 	rootCmd.PersistentFlags().Bool("failure-recoverer-enabled", true, "Toggle failure recoverer")
 	rootCmd.PersistentFlags().Int("failure-recoverer-blocks-per-run", 10, "How many blocks to run failure recoverer for")
 	rootCmd.PersistentFlags().Int("failure-recoverer-interval", 1000, "How often to run failure recoverer in milliseconds")
@@ -98,6 +103,11 @@ func init() {
 	viper.BindPFlag("committer.enabled", rootCmd.PersistentFlags().Lookup("committer-enabled"))
 	viper.BindPFlag("committer.blocksPerCommit", rootCmd.PersistentFlags().Lookup("committer-blocks-per-commit"))
 	viper.BindPFlag("committer.interval", rootCmd.PersistentFlags().Lookup("committer-interval"))
+	viper.BindPFlag("reorgHandler.enabled", rootCmd.PersistentFlags().Lookup("reorgHandler-enabled"))
+	viper.BindPFlag("reorgHandler.interval", rootCmd.PersistentFlags().Lookup("reorgHandler-interval"))
+	viper.BindPFlag("reorgHandler.blocksPerScan", rootCmd.PersistentFlags().Lookup("reorgHandler-blocks-per-scan"))
+	viper.BindPFlag("reorgHandler.fromBlock", rootCmd.PersistentFlags().Lookup("reorgHandler-from-block"))
+	viper.BindPFlag("reorgHandler.forceFromBlock", rootCmd.PersistentFlags().Lookup("reorgHandler-force-from-block"))
 	viper.BindPFlag("failureRecoverer.enabled", rootCmd.PersistentFlags().Lookup("failure-recoverer-enabled"))
 	viper.BindPFlag("failureRecoverer.blocksPerRun", rootCmd.PersistentFlags().Lookup("failure-recoverer-blocks-per-run"))
 	viper.BindPFlag("failureRecoverer.interval", rootCmd.PersistentFlags().Lookup("failure-recoverer-interval"))
diff --git a/configs/config.go b/configs/config.go
index 509d853..d92ae95 100644
--- a/configs/config.go
+++ b/configs/config.go
@@ -28,6 +28,14 @@ type CommitterConfig struct {
 	BlocksPerCommit int  `mapstructure:"blocksPerCommit"`
 }
 
+type ReorgHandlerConfig struct {
+	Enabled        bool `mapstructure:"enabled"`
+	Interval       int  `mapstructure:"interval"`
+	BlocksPerScan  int  `mapstructure:"blocksPerScan"`
+	FromBlock      int  `mapstructure:"fromBlock"`
+	ForceFromBlock bool `mapstructure:"forceFromBlock"`
+}
+
 type FailureRecovererConfig struct {
 	Enabled      bool `mapstructure:"enabled"`
 	Interval     int  `mapstructure:"interval"`
@@ -101,6 +109,7 @@ type Config struct {
 	Poller           PollerConfig           `mapstructure:"poller"`
 	Committer        CommitterConfig        `mapstructure:"committer"`
 	FailureRecoverer FailureRecovererConfig `mapstructure:"failureRecoverer"`
+	ReorgHandler     ReorgHandlerConfig     `mapstructure:"reorgHandler"`
 	Storage          StorageConfig          `mapstructure:"storage"`
 	API              APIConfig              `mapstructure:"api"`
 }
diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go
index 1faf4b2..704e993 100644
--- a/internal/metrics/metrics.go
+++ b/internal/metrics/metrics.go
@@ -69,3 +69,16 @@ var (
 		Help: "The first block number in the failure recoverer batch",
 	})
 )
+
+// Reorg Handler Metrics
+var (
+	ReorgHandlerLastCheckedBlock = promauto.NewGauge(prometheus.GaugeOpts{
+		Name: "reorg_handler_last_checked_block",
+		Help: "The last block number that the reorg handler checked",
+	})
+
+	ReorgCounter = promauto.NewCounter(prometheus.CounterOpts{
+		Name: "reorg_handler_reorg_counter",
+		Help: "The number of reorgs detected",
+	})
+)
diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go
index e4cffa3..41eadab 100644
--- a/internal/orchestrator/orchestrator.go
+++ b/internal/orchestrator/orchestrator.go
@@ -14,6 +14,7 @@ type Orchestrator struct {
 	pollerEnabled           bool
 	failureRecovererEnabled bool
 	committerEnabled        bool
+	reorgHandlerEnabled     bool
 }
 
 func NewOrchestrator(rpc rpc.Client) (*Orchestrator, error) {
@@ -28,6 +29,7 @@ func NewOrchestrator(rpc rpc.Client) (*Orchestrator, error) {
 		pollerEnabled:           config.Cfg.Poller.Enabled,
 		failureRecovererEnabled: config.Cfg.FailureRecoverer.Enabled,
 		committerEnabled:        config.Cfg.Committer.Enabled,
+		reorgHandlerEnabled:     config.Cfg.ReorgHandler.Enabled,
 	}, nil
 }
 
@@ -61,6 +63,15 @@ func (o *Orchestrator) Start() {
 		}()
 	}
 
+	if o.reorgHandlerEnabled {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			reorgHandler := NewReorgHandler(o.rpc, o.storage)
+			reorgHandler.Start()
+		}()
+	}
+
 	// The chain tracker is always running
 	wg.Add(1)
 	go func() {
diff --git a/internal/orchestrator/reorg_handler.go b/internal/orchestrator/reorg_handler.go
new file mode 100644
index 0000000..0a7309e
--- /dev/null
+++ b/internal/orchestrator/reorg_handler.go
@@ -0,0 +1,200 @@
+package orchestrator
+
+import (
+	"fmt"
+	"math/big"
+	"time"
+
+	"github.com/rs/zerolog/log"
+	config "github.com/thirdweb-dev/indexer/configs"
+	"github.com/thirdweb-dev/indexer/internal/common"
+	"github.com/thirdweb-dev/indexer/internal/metrics"
+	"github.com/thirdweb-dev/indexer/internal/rpc"
+	"github.com/thirdweb-dev/indexer/internal/storage"
+	"github.com/thirdweb-dev/indexer/internal/worker"
+)
+
+type ReorgHandler struct {
+	rpc              rpc.Client
+	storage          storage.IStorage
+	triggerInterval  int
+	blocksPerScan    int
+	lastCheckedBlock *big.Int
+	worker           *worker.Worker
+}
+
+const DEFAULT_REORG_HANDLER_INTERVAL = 1000
+const DEFAULT_REORG_HANDLER_BLOCKS_PER_SCAN = 100
+
+func NewReorgHandler(rpc rpc.Client, storage storage.IStorage) *ReorgHandler {
+	triggerInterval := config.Cfg.ReorgHandler.Interval
+	if triggerInterval == 0 {
+		triggerInterval = DEFAULT_REORG_HANDLER_INTERVAL
+	}
+	blocksPerScan := config.Cfg.ReorgHandler.BlocksPerScan
+	if blocksPerScan == 0 {
+		blocksPerScan = DEFAULT_REORG_HANDLER_BLOCKS_PER_SCAN
+	}
+	return &ReorgHandler{
+		rpc:              rpc,
+		storage:          storage,
+		worker:           worker.NewWorker(rpc),
+		triggerInterval:  triggerInterval,
+		blocksPerScan:    blocksPerScan,
+		lastCheckedBlock: getInitialCheckedBlockNumber(storage, rpc.ChainID),
+	}
+}
+
+func getInitialCheckedBlockNumber(storage storage.IStorage, chainId *big.Int) *big.Int {
+	bn := big.NewInt(int64(config.Cfg.ReorgHandler.FromBlock))
+	if !config.Cfg.ReorgHandler.ForceFromBlock {
+		storedFromBlock, err := storage.OrchestratorStorage.GetLastReorgCheckedBlockNumber(chainId)
+		if err != nil {
+			log.Debug().Err(err).Msgf("Error getting last reorg checked block number, using configured: %s", bn)
+			return bn
+		}
+		if storedFromBlock.Sign() <= 0 {
+			log.Debug().Msgf("Last reorg checked block number not found, using configured: %s", bn)
+			return bn
+		}
+		log.Debug().Msgf("Last reorg checked block number found, using: %s", storedFromBlock)
+		return storedFromBlock
+	}
+	log.Debug().Msgf("Force from block reorg check flag set, using configured: %s", bn)
+	return bn
+}
+
+func (rh *ReorgHandler) Start() {
+	interval := time.Duration(rh.triggerInterval) * time.Millisecond
+	ticker := time.NewTicker(interval)
+
+	log.Debug().Msgf("Reorg handler running")
+	go func() {
+		for range ticker.C {
+			lookbackFrom := new(big.Int).Add(rh.lastCheckedBlock, big.NewInt(int64(rh.blocksPerScan)))
+			blockHeaders, err := rh.storage.MainStorage.LookbackBlockHeaders(rh.rpc.ChainID, rh.blocksPerScan, lookbackFrom)
+			if err != nil {
+				log.Error().Err(err).Msg("Error getting recent block headers")
+				continue
+			}
+			if len(blockHeaders) == 0 {
+				log.Warn().Msg("No block headers found")
+				continue
+			}
+			mostRecentBlockHeader := blockHeaders[0]
+			reorgEndIndex := findReorgEndIndex(blockHeaders)
+			if reorgEndIndex == -1 {
+				rh.lastCheckedBlock = mostRecentBlockHeader.Number
+				rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber(rh.rpc.ChainID, mostRecentBlockHeader.Number)
+				metrics.ReorgHandlerLastCheckedBlock.Set(float64(mostRecentBlockHeader.Number.Int64()))
+				continue
+			}
+			metrics.ReorgCounter.Inc()
+			forkPoint, err := rh.findForkPoint(blockHeaders[reorgEndIndex:])
+			if err != nil {
+				log.Error().Err(err).Msg("Error while finding fork point")
+				continue
+			}
+			err = rh.handleReorg(forkPoint, lookbackFrom)
+			if err != nil {
+				log.Error().Err(err).Msg("Error while handling reorg")
+				continue
+			}
+			rh.lastCheckedBlock = mostRecentBlockHeader.Number
+			rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber(rh.rpc.ChainID, mostRecentBlockHeader.Number)
+			metrics.ReorgHandlerLastCheckedBlock.Set(float64(mostRecentBlockHeader.Number.Int64()))
+		}
+	}()
+
+	// Keep the program running (otherwise it will exit)
+	select {}
+}
+
+func findReorgEndIndex(reversedBlockHeaders []common.BlockHeader) (index int) {
+	for i := 0; i < len(reversedBlockHeaders)-1; i++ {
+		currentBlock := reversedBlockHeaders[i]
+		previousBlock := reversedBlockHeaders[i+1]
+
+		if currentBlock.ParentHash != previousBlock.Hash {
+			log.Debug().
+				Str("currentBlockNumber", currentBlock.Number.String()).
+				Str("currentBlockHash", currentBlock.Hash).
+				Str("currentBlockParentHash", currentBlock.ParentHash).
+				Str("previousBlockNumber", previousBlock.Number.String()).
+				Str("previousBlockHash", previousBlock.Hash).
+				Msg("Reorg detected: parent hash mismatch")
+			return i
+		}
+	}
+	return -1
+}
+
+func (rh *ReorgHandler) findForkPoint(reversedBlockHeaders []common.BlockHeader) (forkPoint *big.Int, err error) {
+	newBlocksByNumber, err := rh.getNewBlocksByNumber(reversedBlockHeaders)
+	if err != nil {
+		return nil, err
+	}
+
+	for i := 0; i < len(reversedBlockHeaders)-1; i++ {
+		blockHeader := reversedBlockHeaders[i]
+		block, ok := (*newBlocksByNumber)[blockHeader.Number.String()]
+		if !ok {
+			return nil, fmt.Errorf("block not found: %s", blockHeader.Number.String())
+		}
+		if block.Hash == blockHeader.Hash {
+			previousBlock := reversedBlockHeaders[i+1]
+			return previousBlock.Number, nil
+		}
+	}
+	lookbackFrom := reversedBlockHeaders[len(reversedBlockHeaders)-1].Number
+	nextHeadersBatch, err := rh.storage.MainStorage.LookbackBlockHeaders(rh.rpc.ChainID, rh.blocksPerScan, lookbackFrom)
+	if err != nil {
+		return nil, fmt.Errorf("error getting next headers batch: %w", err)
+	}
+	return rh.findForkPoint(nextHeadersBatch)
+}
+
+func (rh *ReorgHandler) getNewBlocksByNumber(reversedBlockHeaders []common.BlockHeader) (*map[string]common.Block, error) {
+	blockNumbers := make([]*big.Int, 0, len(reversedBlockHeaders))
+	for _, header := range reversedBlockHeaders {
+		blockNumbers = append(blockNumbers, header.Number)
+	}
+	blockResults := rh.rpc.GetBlocks(blockNumbers)
+	fetchedBlocksByNumber := make(map[string]common.Block)
+	for _, blockResult := range blockResults {
+		if blockResult.Error != nil {
+			return nil, fmt.Errorf("error fetching block %s: %w", blockResult.BlockNumber.String(), blockResult.Error)
+		}
+		fetchedBlocksByNumber[blockResult.BlockNumber.String()] = blockResult.Data
+	}
+	return &fetchedBlocksByNumber, nil
+}
+
+func (rh *ReorgHandler) handleReorg(reorgStart *big.Int, reorgEnd *big.Int) error {
+	blockRange := make([]*big.Int, 0, new(big.Int).Sub(reorgEnd, reorgStart).Int64())
+	for i := new(big.Int).Set(reorgStart); i.Cmp(reorgEnd) <= 0; i.Add(i, big.NewInt(1)) {
+		blockRange = append(blockRange, new(big.Int).Set(i))
+	}
+
+	results := rh.worker.Run(blockRange)
+	data := make([]common.BlockData, 0, len(results))
+	for _, result := range results {
+		if result.Error != nil {
+			return fmt.Errorf("cannot fix reorg: failed block %s: %w", result.BlockNumber.String(), result.Error)
+		}
+		data = append(data, common.BlockData{
+			Block:        result.Data.Block,
+			Logs:         result.Data.Logs,
+			Transactions: result.Data.Transactions,
+			Traces:       result.Data.Traces,
+		})
+	}
+	// TODO make delete and insert atomic
+	if err := rh.storage.MainStorage.DeleteBlockData(rh.rpc.ChainID, blockRange); err != nil {
+		return fmt.Errorf("error deleting data for blocks %v: %w", blockRange, err)
+	}
+	if err := rh.storage.MainStorage.InsertBlockData(&data); err != nil {
+		return fmt.Errorf("error saving data to main storage: %w", err)
+	}
+	return nil
+}