Skip to content

Commit

Permalink
feat(evm): evm tx indexer service implemented (#2044)
Browse files Browse the repository at this point in the history
* feat(evm): evm tx indexer service implemented

* feat(evm): evm-indexer cli command to fill the indexing gaps

* chore: lint

* chore: changelog update

* fix: graceful stopping of the evm indexer

* fix: race conditions on evm indexer service

* fix: race condition within evm indexer
  • Loading branch information
onikonychev authored Sep 23, 2024
1 parent 5214349 commit 81ea61d
Show file tree
Hide file tree
Showing 16 changed files with 400 additions and 86 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- [#2030](https://github.com/NibiruChain/nibiru/pull/2030) - refactor(eth/rpc): Delete unused code and improve logging in the eth and debug namespaces
- [#2031](https://github.com/NibiruChain/nibiru/pull/2031) - fix(evm): debug calls with custom tracer and tracer options
- [#2039](https://github.com/NibiruChain/nibiru/pull/2039) - refactor(rpc-backend): remove unnecessary interface code
- [#2039](https://github.com/NibiruChain/nibiru/pull/2039) - refactor(rpc-backend): Remove mocks from eth/rpc/backend, partially completing [nibiru#2037](https://github.com/NibiruChain/nibiru/issue/2037).
- [#2044](https://github.com/NibiruChain/nibiru/pull/2044) - feat(evm): evm tx indexer service implemented
- [#2045](https://github.com/NibiruChain/nibiru/pull/2045) - test(evm): backend tests with test network and real txs

#### Dapp modules: perp, spot, oracle, etc
Expand Down
133 changes: 133 additions & 0 deletions app/server/evm_tx_indexer_cli.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright (c) 2023-2024 Nibi, Inc.
package server

import (
"fmt"
"strconv"

"github.com/spf13/cobra"

"github.com/NibiruChain/nibiru/v2/eth/indexer"

tmnode "github.com/cometbft/cometbft/node"
sm "github.com/cometbft/cometbft/state"
tmstore "github.com/cometbft/cometbft/store"
"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/server"
)

func NewEVMTxIndexCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "evm-tx-index [minBlockNumber|last-indexed] [maxBlockNumber|latest]",
Short: "Index historical evm blocks and transactions",
Long: `Command is useful for catching up if the node experienced a period
with EVMTxIndexer turned off or was stopped without proper closing/flushing EVMIndexerDB.
Processes blocks from minBlockNumber to maxBlockNumber, indexes evm txs.
- minBlockNumber: min block to start indexing. Supply "last-indexed" to start with the latest block available in EVMIndexerDB.
- maxBlockNumber: max block, could be a number or "latest".
Default run before the full node/archive node start should be:
nibid evm-tx-index last-indexed latest
`,
Args: cobra.ExactArgs(2),
RunE: func(cmd *cobra.Command, args []string) error {
serverCtx := server.GetServerContextFromCmd(cmd)
clientCtx, err := client.GetClientQueryContext(cmd)
if err != nil {
return err
}
cfg := serverCtx.Config
logger := serverCtx.Logger
evmIndexerDB, err := OpenIndexerDB(cfg.RootDir, server.GetAppDBBackend(serverCtx.Viper))
if err != nil {
logger.Error("failed to open evm indexer DB", "error", err.Error())
return err
}

evmTxIndexer := indexer.NewEVMTxIndexer(evmIndexerDB, logger.With("module", "evmindex"), clientCtx)

tmdb, err := tmnode.DefaultDBProvider(&tmnode.DBContext{ID: "blockstore", Config: cfg})
if err != nil {
return err
}
blockStore := tmstore.NewBlockStore(tmdb)
minAvailableHeight := blockStore.Base()
maxAvailableHeight := blockStore.Height()
fmt.Printf("Block range available on the node: %d - %d\n", minAvailableHeight, maxAvailableHeight)

var fromBlock int64
var toBlock int64

// FROM block could be one of two:
// - int64 number - replaced with minAvailableHeight if too low
// - last-indexed - latest available block in EVMIndexerDB, 0 if nothing is indexed
if args[0] == "last-indexed" {
fromBlock, err = evmTxIndexer.LastIndexedBlock()
if err != nil || fromBlock < 0 {
fromBlock = 0
}
} else {
fromBlock, err = strconv.ParseInt(args[1], 10, 64)
if err != nil {
return fmt.Errorf("cannot parse min block number: %s", args[1])
}
if fromBlock > maxAvailableHeight {
return fmt.Errorf("maximum available block is: %d", maxAvailableHeight)
}
}
if fromBlock < minAvailableHeight {
fromBlock = minAvailableHeight
}

// TO block could be one of two:
// - int64 number - replaced with maxAvailableHeight if too high
// - latest - latest available block in the node
if args[1] == "latest" {
toBlock = maxAvailableHeight
} else {
toBlock, err = strconv.ParseInt(args[1], 10, 64)
if err != nil {
return fmt.Errorf("cannot parse max block number: %s", args[1])
}
if toBlock > maxAvailableHeight {
toBlock = maxAvailableHeight
}
}
if fromBlock > toBlock {
return fmt.Errorf("minBlockNumber must be less or equal to maxBlockNumber")
}
stateDB, err := tmnode.DefaultDBProvider(&tmnode.DBContext{ID: "state", Config: cfg})
if err != nil {
return err
}
stateStore := sm.NewStore(stateDB, sm.StoreOptions{
DiscardABCIResponses: cfg.Storage.DiscardABCIResponses,
})

fmt.Printf("Indexing blocks from %d to %d\n", fromBlock, toBlock)
for height := fromBlock; height <= toBlock; height++ {
block := blockStore.LoadBlock(height)
if block == nil {
return fmt.Errorf("block not found %d", height)
}
blockResults, err := stateStore.LoadABCIResponses(height)
if err != nil {
return err
}
if err := evmTxIndexer.IndexBlock(block, blockResults.DeliverTxs); err != nil {
return err
}
fmt.Println(height)
}
err = evmTxIndexer.CloseDBAndExit()
if err != nil {
return err
}
fmt.Println("Indexing complete")
return nil
},
}
return cmd
}
135 changes: 135 additions & 0 deletions app/server/evm_tx_indexer_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright (c) 2023-2024 Nibi, Inc.
package server

import (
"context"
"sync/atomic"
"time"

"github.com/cometbft/cometbft/libs/service"
rpcclient "github.com/cometbft/cometbft/rpc/client"
"github.com/cometbft/cometbft/types"

"github.com/NibiruChain/nibiru/v2/eth/indexer"
)

const (
EVMTxIndexerServiceName = "EVMTxIndexerService"

NewBlockWaitTimeout = 60 * time.Second
)

// EVMTxIndexerService indexes transactions for json-rpc service.
type EVMTxIndexerService struct {
service.BaseService

evmTxIndexer *indexer.EVMTxIndexer
rpcClient rpcclient.Client
cancelFunc context.CancelFunc
}

// NewEVMIndexerService returns a new service instance.
func NewEVMIndexerService(evmTxIndexer *indexer.EVMTxIndexer, rpcClient rpcclient.Client) *EVMTxIndexerService {
indexerService := &EVMTxIndexerService{evmTxIndexer: evmTxIndexer, rpcClient: rpcClient}
indexerService.BaseService = *service.NewBaseService(nil, EVMTxIndexerServiceName, indexerService)
return indexerService
}

// OnStart implements service.Service by subscribing for new blocks
// and indexing them by events.
func (service *EVMTxIndexerService) OnStart() error {
ctx, cancel := context.WithCancel(context.Background())
service.cancelFunc = cancel

status, err := service.rpcClient.Status(ctx)
if err != nil {
return err
}

// chainHeightStorage is used within goroutine and the indexer loop so, using atomic for read/write
var chainHeightStorage int64
atomic.StoreInt64(&chainHeightStorage, status.SyncInfo.LatestBlockHeight)

newBlockSignal := make(chan struct{}, 1)
blockHeadersChan, err := service.rpcClient.Subscribe(
ctx,
EVMTxIndexerServiceName,
types.QueryForEvent(types.EventNewBlockHeader).String(),
0,
)
if err != nil {
return err
}

// Goroutine listening for new blocks
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
service.Logger.Info("Stopping indexer goroutine")
err := service.evmTxIndexer.CloseDBAndExit()
if err != nil {
service.Logger.Error("Error closing indexer DB", "err", err)
}
return
case msg := <-blockHeadersChan:
eventDataHeader := msg.Data.(types.EventDataNewBlockHeader)
currentChainHeight := eventDataHeader.Header.Height
chainHeight := atomic.LoadInt64(&chainHeightStorage)
if currentChainHeight > chainHeight {
atomic.StoreInt64(&chainHeightStorage, currentChainHeight)
// notify
select {
case newBlockSignal <- struct{}{}:
default:
}
}
}
}
}(ctx)

lastIndexedHeight, err := service.evmTxIndexer.LastIndexedBlock()
if err != nil {
return err
}
if lastIndexedHeight == -1 {
lastIndexedHeight = atomic.LoadInt64(&chainHeightStorage)
}

// Indexer loop
for {
chainHeight := atomic.LoadInt64(&chainHeightStorage)
if chainHeight <= lastIndexedHeight {
// nothing to index. wait for signal of new block
select {
case <-newBlockSignal:
case <-time.After(NewBlockWaitTimeout):
}
continue
}
for i := lastIndexedHeight + 1; i <= chainHeight; i++ {
block, err := service.rpcClient.Block(ctx, &i)
if err != nil {
service.Logger.Error("failed to fetch block", "height", i, "err", err)
break
}
blockResult, err := service.rpcClient.BlockResults(ctx, &i)
if err != nil {
service.Logger.Error("failed to fetch block result", "height", i, "err", err)
break
}
if err := service.evmTxIndexer.IndexBlock(block.Block, blockResult.TxsResults); err != nil {
service.Logger.Error("failed to index block", "height", i, "err", err)
}
lastIndexedHeight = blockResult.Height
}
}
}

func (service *EVMTxIndexerService) OnStop() {
service.Logger.Info("Stopping EVMTxIndexerService")
if service.cancelFunc != nil {
service.Logger.Info("Calling EVMIndexerService CancelFunc")
service.cancelFunc()
}
}
11 changes: 6 additions & 5 deletions app/server/json_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import (
)

// StartJSONRPC starts the JSON-RPC server
func StartJSONRPC(ctx *server.Context,
func StartJSONRPC(
ctx *server.Context,
clientCtx client.Context,
tmRPCAddr,
tmEndpoint string,
config *srvconfig.Config,
indexer eth.EVMTxIndexer,
) (*http.Server, chan struct{}, error) {
tmWsClient := ConnectTmWS(tmRPCAddr, tmEndpoint, ctx.Logger)
tmWsClientForRPCApi := ConnectTmWS(tmRPCAddr, tmEndpoint, ctx.Logger)

logger := ctx.Logger.With("module", "geth")
ethlog.Root().SetHandler(ethlog.FuncHandler(func(r *ethlog.Record) error {
Expand All @@ -48,7 +49,7 @@ func StartJSONRPC(ctx *server.Context,
allowUnprotectedTxs := config.JSONRPC.AllowUnprotectedTxs
rpcAPIArr := config.JSONRPC.API

apis := rpcapi.GetRPCAPIs(ctx, clientCtx, tmWsClient, allowUnprotectedTxs, indexer, rpcAPIArr)
apis := rpcapi.GetRPCAPIs(ctx, clientCtx, tmWsClientForRPCApi, allowUnprotectedTxs, indexer, rpcAPIArr)

for _, api := range apis {
if err := rpcServer.RegisterName(api.Namespace, api.Service); err != nil {
Expand Down Expand Up @@ -108,8 +109,8 @@ func StartJSONRPC(ctx *server.Context,
ctx.Logger.Info("Starting JSON WebSocket server", "address", config.JSONRPC.WsAddress)

// allocate separate WS connection to Tendermint
tmWsClient = ConnectTmWS(tmRPCAddr, tmEndpoint, ctx.Logger)
wsSrv := rpcapi.NewWebsocketsServer(clientCtx, ctx.Logger, tmWsClient, config)
tmWsClientForRPCWs := ConnectTmWS(tmRPCAddr, tmEndpoint, ctx.Logger)
wsSrv := rpcapi.NewWebsocketsServer(clientCtx, ctx.Logger, tmWsClientForRPCWs, config)
wsSrv.Start()
return httpSrv, httpSrvDone, nil
}
Loading

0 comments on commit 81ea61d

Please sign in to comment.