From 5b416ef027d651d4cce0953f66f2b15640f22bb4 Mon Sep 17 00:00:00 2001 From: iuwqyir Date: Fri, 4 Oct 2024 16:22:27 +0300 Subject: [PATCH] refactor rpc to a wrapper for easier reuse --- cmd/orchestrator.go | 4 +- internal/common/block.go | 2 + internal/common/log.go | 2 + internal/common/rpc.go | 125 --------------- internal/common/trace.go | 2 + internal/common/utils.go | 18 +++ internal/orchestrator/chain_tracker.go | 7 +- internal/orchestrator/committer.go | 5 +- internal/orchestrator/failure_recoverer.go | 15 +- internal/orchestrator/orchestrator.go | 6 +- internal/orchestrator/poller.go | 23 +-- internal/rpc/batcher.go | 86 ++++++++++ internal/rpc/rpc.go | 170 ++++++++++++++++++++ internal/{worker => rpc}/serializer.go | 51 ++++-- internal/rpc/utils.go | 33 ++++ internal/worker/worker.go | 176 +-------------------- 16 files changed, 391 insertions(+), 334 deletions(-) delete mode 100644 internal/common/rpc.go create mode 100644 internal/common/utils.go create mode 100644 internal/rpc/batcher.go create mode 100644 internal/rpc/rpc.go rename internal/{worker => rpc}/serializer.go (83%) create mode 100644 internal/rpc/utils.go diff --git a/cmd/orchestrator.go b/cmd/orchestrator.go index 76a0785..f577949 100644 --- a/cmd/orchestrator.go +++ b/cmd/orchestrator.go @@ -6,8 +6,8 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/zerolog/log" "github.com/spf13/cobra" - "github.com/thirdweb-dev/indexer/internal/common" "github.com/thirdweb-dev/indexer/internal/orchestrator" + "github.com/thirdweb-dev/indexer/internal/rpc" ) var ( @@ -23,7 +23,7 @@ var ( func RunOrchestrator(cmd *cobra.Command, args []string) { log.Info().Msg("Starting indexer") - rpc, err := common.InitializeRPC() + rpc, err := rpc.Initialize() if err != nil { log.Fatal().Err(err).Msg("Failed to initialize RPC") } diff --git a/internal/common/block.go b/internal/common/block.go index 186f366..48a4570 100644 --- a/internal/common/block.go +++ b/internal/common/block.go @@ -35,3 +35,5 @@ type BlockData struct { Logs []Log Traces []Trace } + +type RawBlock = map[string]interface{} diff --git a/internal/common/log.go b/internal/common/log.go index 5617ffa..ca83b4c 100644 --- a/internal/common/log.go +++ b/internal/common/log.go @@ -16,3 +16,5 @@ type Log struct { Data string `json:"data"` Topics []string `json:"topics"` } + +type RawLogs = []map[string]interface{} diff --git a/internal/common/rpc.go b/internal/common/rpc.go deleted file mode 100644 index e4bd760..0000000 --- a/internal/common/rpc.go +++ /dev/null @@ -1,125 +0,0 @@ -package common - -import ( - "context" - "fmt" - "math/big" - "strings" - - "github.com/ethereum/go-ethereum/ethclient" - "github.com/ethereum/go-ethereum/rpc" - "github.com/rs/zerolog/log" - config "github.com/thirdweb-dev/indexer/configs" -) - -type BlocksPerRequest struct { - Blocks int - Logs int - Traces int -} - -type RPC struct { - RPCClient *rpc.Client - EthClient *ethclient.Client - SupportsTraceBlock bool - IsWebsocket bool - URL string - ChainID *big.Int - BlocksPerRequest BlocksPerRequest -} - -// TODO: we should detect this automatically -var DEFAULT_BLOCKS_PER_REQUEST = 1000 -var DEFAULT_LOGS_PER_REQUEST = 100 -var DEFAULT_TRACES_PER_REQUEST = 100 - -func InitializeRPC() (*RPC, error) { - rpcUrl := config.Cfg.RPC.URL - if rpcUrl == "" { - return nil, fmt.Errorf("RPC_URL environment variable is not set") - } - log.Debug().Msg("Initializing RPC") - rpcClient, dialErr := rpc.Dial(rpcUrl) - if dialErr != nil { - return nil, dialErr - } - - blocksPerRequest := config.Cfg.RPC.Blocks.BlocksPerRequest - if blocksPerRequest == 0 { - blocksPerRequest = DEFAULT_BLOCKS_PER_REQUEST - } - - logsBlocksPerRequest := config.Cfg.RPC.Logs.BlocksPerRequest - if logsBlocksPerRequest == 0 { - logsBlocksPerRequest = DEFAULT_LOGS_PER_REQUEST - } - - tracesBlocksPerRequest := config.Cfg.RPC.Traces.BlocksPerRequest - if tracesBlocksPerRequest == 0 { - tracesBlocksPerRequest = DEFAULT_TRACES_PER_REQUEST - } - - ethClient := ethclient.NewClient(rpcClient) - - rpc := &RPC{ - RPCClient: rpcClient, - EthClient: ethClient, - URL: rpcUrl, - IsWebsocket: strings.HasPrefix(rpcUrl, "ws://") || strings.HasPrefix(rpcUrl, "wss://"), - BlocksPerRequest: BlocksPerRequest{ - Blocks: blocksPerRequest, - Logs: logsBlocksPerRequest, - Traces: tracesBlocksPerRequest, - }, - } - checkErr := rpc.checkSupportedMethods() - if checkErr != nil { - return nil, checkErr - } - - chainIdErr := rpc.setChainID() - if chainIdErr != nil { - return nil, chainIdErr - } - return rpc, nil -} - -func (rpc *RPC) checkSupportedMethods() error { - var blockByNumberResult interface{} - err := rpc.RPCClient.Call(&blockByNumberResult, "eth_getBlockByNumber", "latest", true) - if err != nil { - return fmt.Errorf("eth_getBlockByNumber method not supported: %v", err) - } - log.Debug().Msg("eth_getBlockByNumber method supported") - - var getLogsResult interface{} - logsErr := rpc.RPCClient.Call(&getLogsResult, "eth_getLogs", map[string]string{"fromBlock": "0x0", "toBlock": "0x0"}) - if logsErr != nil { - return fmt.Errorf("eth_getLogs method not supported: %v", logsErr) - } - log.Debug().Msg("eth_getLogs method supported") - - var traceBlockResult interface{} - if config.Cfg.RPC.Traces.Enabled { - if traceBlockErr := rpc.RPCClient.Call(&traceBlockResult, "trace_block", "latest"); traceBlockErr != nil { - log.Warn().Err(traceBlockErr).Msg("Optional method trace_block not supported") - } - } - rpc.SupportsTraceBlock = traceBlockResult != nil - log.Debug().Msgf("trace_block method supported: %v", rpc.SupportsTraceBlock) - return nil -} - -func (rpc *RPC) setChainID() error { - chainID, err := rpc.EthClient.ChainID(context.Background()) - if err != nil { - return fmt.Errorf("failed to get chain ID: %v", err) - } - rpc.ChainID = chainID - return nil -} - -func (rpc *RPC) Close() { - rpc.RPCClient.Close() - rpc.EthClient.Close() -} diff --git a/internal/common/trace.go b/internal/common/trace.go index e54072c..2ca4531 100644 --- a/internal/common/trace.go +++ b/internal/common/trace.go @@ -27,3 +27,5 @@ type Trace struct { RewardType string `json:"reward_type"` RefundAddress string `json:"refund_address"` } + +type RawTraces = []map[string]interface{} diff --git a/internal/common/utils.go b/internal/common/utils.go new file mode 100644 index 0000000..984a83b --- /dev/null +++ b/internal/common/utils.go @@ -0,0 +1,18 @@ +package common + +import "math/big" + +func BigIntSliceToChunks(values []*big.Int, chunkSize int) [][]*big.Int { + if chunkSize >= len(values) || chunkSize <= 0 { + return [][]*big.Int{values} + } + var chunks [][]*big.Int + for i := 0; i < len(values); i += chunkSize { + end := i + chunkSize + if end > len(values) { + end = len(values) + } + chunks = append(chunks, values[i:end]) + } + return chunks +} diff --git a/internal/orchestrator/chain_tracker.go b/internal/orchestrator/chain_tracker.go index 8489af4..a537028 100644 --- a/internal/orchestrator/chain_tracker.go +++ b/internal/orchestrator/chain_tracker.go @@ -5,19 +5,18 @@ import ( "time" "github.com/rs/zerolog/log" - "github.com/thirdweb-dev/indexer/internal/common" "github.com/thirdweb-dev/indexer/internal/metrics" + "github.com/thirdweb-dev/indexer/internal/rpc" ) const DEFAULT_CHAIN_TRACKER_POLL_INTERVAL = 300000 // 5 minutes type ChainTracker struct { - rpc common.RPC + rpc rpc.Client triggerIntervalMs int } -func NewChainTracker(rpc common.RPC) *ChainTracker { - +func NewChainTracker(rpc rpc.Client) *ChainTracker { return &ChainTracker{ rpc: rpc, triggerIntervalMs: DEFAULT_CHAIN_TRACKER_POLL_INTERVAL, diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index 9549068..a837d8d 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -11,6 +11,7 @@ import ( config "github.com/thirdweb-dev/indexer/configs" "github.com/thirdweb-dev/indexer/internal/common" "github.com/thirdweb-dev/indexer/internal/metrics" + "github.com/thirdweb-dev/indexer/internal/rpc" "github.com/thirdweb-dev/indexer/internal/storage" ) @@ -22,10 +23,10 @@ type Committer struct { blocksPerCommit int storage storage.IStorage pollFromBlock *big.Int - rpc common.RPC + rpc rpc.Client } -func NewCommitter(rpc common.RPC, storage storage.IStorage) *Committer { +func NewCommitter(rpc rpc.Client, storage storage.IStorage) *Committer { triggerInterval := config.Cfg.Committer.Interval if triggerInterval == 0 { triggerInterval = DEFAULT_COMMITTER_TRIGGER_INTERVAL diff --git a/internal/orchestrator/failure_recoverer.go b/internal/orchestrator/failure_recoverer.go index 5d07e16..16d7228 100644 --- a/internal/orchestrator/failure_recoverer.go +++ b/internal/orchestrator/failure_recoverer.go @@ -9,6 +9,7 @@ import ( config "github.com/thirdweb-dev/indexer/configs" "github.com/thirdweb-dev/indexer/internal/common" "github.com/thirdweb-dev/indexer/internal/metrics" + "github.com/thirdweb-dev/indexer/internal/rpc" "github.com/thirdweb-dev/indexer/internal/storage" "github.com/thirdweb-dev/indexer/internal/worker" ) @@ -20,10 +21,10 @@ type FailureRecoverer struct { failuresPerPoll int triggerIntervalMs int storage storage.IStorage - rpc common.RPC + rpc rpc.Client } -func NewFailureRecoverer(rpc common.RPC, storage storage.IStorage) *FailureRecoverer { +func NewFailureRecoverer(rpc rpc.Client, storage storage.IStorage) *FailureRecoverer { failuresPerPoll := config.Cfg.FailureRecoverer.BlocksPerRun if failuresPerPoll == 0 { failuresPerPoll = DEFAULT_FAILURES_PER_POLL @@ -80,7 +81,7 @@ func (fr *FailureRecoverer) Start() { select {} } -func (fr *FailureRecoverer) handleWorkerResults(blockFailures []common.BlockFailure, results []worker.WorkerResult) { +func (fr *FailureRecoverer) handleWorkerResults(blockFailures []common.BlockFailure, results []rpc.GetFullBlockResult) { log.Debug().Msgf("Failure Recoverer recovered %d blocks", len(results)) blockFailureMap := make(map[*big.Int]common.BlockFailure) for _, failure := range blockFailures { @@ -105,10 +106,10 @@ func (fr *FailureRecoverer) handleWorkerResults(blockFailures []common.BlockFail }) } else { successfulResults = append(successfulResults, common.BlockData{ - Block: result.Block, - Logs: result.Logs, - Transactions: result.Transactions, - Traces: result.Traces, + Block: result.Data.Block, + Logs: result.Data.Logs, + Transactions: result.Data.Transactions, + Traces: result.Data.Traces, }) failuresToDelete = append(failuresToDelete, blockFailureForBlock) } diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index 5168ba2..e4cffa3 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -4,19 +4,19 @@ import ( "sync" config "github.com/thirdweb-dev/indexer/configs" - "github.com/thirdweb-dev/indexer/internal/common" + "github.com/thirdweb-dev/indexer/internal/rpc" "github.com/thirdweb-dev/indexer/internal/storage" ) type Orchestrator struct { - rpc common.RPC + rpc rpc.Client storage storage.IStorage pollerEnabled bool failureRecovererEnabled bool committerEnabled bool } -func NewOrchestrator(rpc common.RPC) (*Orchestrator, error) { +func NewOrchestrator(rpc rpc.Client) (*Orchestrator, error) { storage, err := storage.NewStorageConnector(&config.Cfg.Storage) if err != nil { return nil, err diff --git a/internal/orchestrator/poller.go b/internal/orchestrator/poller.go index 031dae2..bb4e656 100644 --- a/internal/orchestrator/poller.go +++ b/internal/orchestrator/poller.go @@ -11,6 +11,7 @@ import ( config "github.com/thirdweb-dev/indexer/configs" "github.com/thirdweb-dev/indexer/internal/common" "github.com/thirdweb-dev/indexer/internal/metrics" + "github.com/thirdweb-dev/indexer/internal/rpc" "github.com/thirdweb-dev/indexer/internal/storage" "github.com/thirdweb-dev/indexer/internal/worker" ) @@ -19,7 +20,7 @@ const DEFAULT_BLOCKS_PER_POLL = 10 const DEFAULT_TRIGGER_INTERVAL = 1000 type Poller struct { - rpc common.RPC + rpc rpc.Client blocksPerPoll int64 triggerIntervalMs int64 storage storage.IStorage @@ -32,7 +33,7 @@ type BlockNumberWithError struct { Error error } -func NewPoller(rpc common.RPC, storage storage.IStorage) *Poller { +func NewPoller(rpc rpc.Client, storage storage.IStorage) *Poller { blocksPerPoll := config.Cfg.Poller.BlocksPerPoll if blocksPerPoll == 0 { blocksPerPoll = DEFAULT_BLOCKS_PER_POLL @@ -166,9 +167,9 @@ func (p *Poller) getEndBlockForRange(startBlock *big.Int, latestBlock *big.Int) return endBlock } -func (p *Poller) handleWorkerResults(results []worker.WorkerResult) { - var successfulResults []worker.WorkerResult - var failedResults []worker.WorkerResult +func (p *Poller) handleWorkerResults(results []rpc.GetFullBlockResult) { + var successfulResults []rpc.GetFullBlockResult + var failedResults []rpc.GetFullBlockResult for _, result := range results { if result.Error != nil { @@ -182,17 +183,17 @@ func (p *Poller) handleWorkerResults(results []worker.WorkerResult) { blockData := make([]common.BlockData, 0, len(successfulResults)) for _, result := range successfulResults { blockData = append(blockData, common.BlockData{ - Block: result.Block, - Logs: result.Logs, - Transactions: result.Transactions, - Traces: result.Traces, + Block: result.Data.Block, + Logs: result.Data.Logs, + Transactions: result.Data.Transactions, + Traces: result.Data.Traces, }) } if err := p.storage.StagingStorage.InsertBlockData(blockData); err != nil { e := fmt.Errorf("error inserting block data: %v", err) log.Error().Err(e) for _, result := range successfulResults { - failedResults = append(failedResults, worker.WorkerResult{ + failedResults = append(failedResults, rpc.GetFullBlockResult{ BlockNumber: result.BlockNumber, Error: e, }) @@ -205,7 +206,7 @@ func (p *Poller) handleWorkerResults(results []worker.WorkerResult) { } } -func (p *Poller) handleBlockFailures(results []worker.WorkerResult) { +func (p *Poller) handleBlockFailures(results []rpc.GetFullBlockResult) { var blockFailures []common.BlockFailure for _, result := range results { if result.Error != nil { diff --git a/internal/rpc/batcher.go b/internal/rpc/batcher.go new file mode 100644 index 0000000..d121c3f --- /dev/null +++ b/internal/rpc/batcher.go @@ -0,0 +1,86 @@ +package rpc + +import ( + "context" + "math/big" + "sync" + "time" + + gethRpc "github.com/ethereum/go-ethereum/rpc" + "github.com/rs/zerolog/log" + "github.com/thirdweb-dev/indexer/internal/common" +) + +type RPCFetchBatchResult[T any] struct { + BlockNumber *big.Int + Error error + Result T +} + +func RPCFetchInBatches[T any](rpc *Client, blockNumbers []*big.Int, batchSize int, batchDelay int, method string, argsFunc func(*big.Int) []interface{}) []RPCFetchBatchResult[T] { + if len(blockNumbers) <= batchSize { + return RPCFetchBatch[T](rpc, blockNumbers, method, argsFunc) + } + chunks := common.BigIntSliceToChunks(blockNumbers, batchSize) + + log.Debug().Msgf("Fetching %s for %d blocks in %d chunks of max %d requests", method, len(blockNumbers), len(chunks), batchSize) + + var wg sync.WaitGroup + resultsCh := make(chan []RPCFetchBatchResult[T], len(chunks)) + + for _, chunk := range chunks { + wg.Add(1) + go func(chunk []*big.Int) { + defer wg.Done() + resultsCh <- RPCFetchBatch[T](rpc, chunk, method, argsFunc) + if batchDelay > 0 { + time.Sleep(time.Duration(batchDelay) * time.Millisecond) + } + }(chunk) + } + go func() { + wg.Wait() + close(resultsCh) + }() + + results := make([]RPCFetchBatchResult[T], 0, len(blockNumbers)) + for batchResults := range resultsCh { + results = append(results, batchResults...) + } + + return results +} + +func RPCFetchBatch[T any](rpc *Client, blockNumbers []*big.Int, method string, argsFunc func(*big.Int) []interface{}) []RPCFetchBatchResult[T] { + batch := make([]gethRpc.BatchElem, len(blockNumbers)) + results := make([]RPCFetchBatchResult[T], len(blockNumbers)) + + for i, blockNum := range blockNumbers { + results[i] = RPCFetchBatchResult[T]{ + BlockNumber: blockNum, + } + batch[i] = gethRpc.BatchElem{ + Method: method, + Args: argsFunc(blockNum), + Result: new(T), + } + } + + err := rpc.RPCClient.BatchCallContext(context.Background(), batch) + if err != nil { + for i := range results { + results[i].Error = err + } + return results + } + + for i, elem := range batch { + if elem.Error != nil { + results[i].Error = elem.Error + } else { + results[i].Result = *elem.Result.(*T) + } + } + + return results +} diff --git a/internal/rpc/rpc.go b/internal/rpc/rpc.go new file mode 100644 index 0000000..8d8cae6 --- /dev/null +++ b/internal/rpc/rpc.go @@ -0,0 +1,170 @@ +package rpc + +import ( + "context" + "fmt" + "math/big" + "strings" + "sync" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/ethclient" + gethRpc "github.com/ethereum/go-ethereum/rpc" + "github.com/rs/zerolog/log" + config "github.com/thirdweb-dev/indexer/configs" + "github.com/thirdweb-dev/indexer/internal/common" +) + +type GetFullBlockResult struct { + BlockNumber *big.Int + Error error + Data common.BlockData +} + +type GetBlocksResult struct { + BlockNumber *big.Int + Error error + Data common.Block +} + +type BlocksPerRequestConfig struct { + Blocks int + Logs int + Traces int +} + +type Client struct { + RPCClient *gethRpc.Client + EthClient *ethclient.Client + SupportsTraceBlock bool + IsWebsocket bool + URL string + ChainID *big.Int + BlocksPerRequest BlocksPerRequestConfig +} + +func Initialize() (*Client, error) { + rpcUrl := config.Cfg.RPC.URL + if rpcUrl == "" { + return nil, fmt.Errorf("RPC_URL environment variable is not set") + } + log.Debug().Msg("Initializing RPC") + rpcClient, dialErr := gethRpc.Dial(rpcUrl) + if dialErr != nil { + return nil, dialErr + } + + ethClient := ethclient.NewClient(rpcClient) + + rpc := &Client{ + RPCClient: rpcClient, + EthClient: ethClient, + URL: rpcUrl, + IsWebsocket: strings.HasPrefix(rpcUrl, "ws://") || strings.HasPrefix(rpcUrl, "wss://"), + BlocksPerRequest: GetBlockPerRequestConfig(), + } + checkErr := rpc.checkSupportedMethods() + if checkErr != nil { + return nil, checkErr + } + + chainIdErr := rpc.setChainID() + if chainIdErr != nil { + return nil, chainIdErr + } + return rpc, nil +} + +func (rpc *Client) Close() { + rpc.RPCClient.Close() + rpc.EthClient.Close() +} + +func (rpc *Client) checkSupportedMethods() error { + var blockByNumberResult interface{} + err := rpc.RPCClient.Call(&blockByNumberResult, "eth_getBlockByNumber", "latest", true) + if err != nil { + return fmt.Errorf("eth_getBlockByNumber method not supported: %v", err) + } + log.Debug().Msg("eth_getBlockByNumber method supported") + + var getLogsResult interface{} + logsErr := rpc.RPCClient.Call(&getLogsResult, "eth_getLogs", map[string]string{"fromBlock": "0x0", "toBlock": "0x0"}) + if logsErr != nil { + return fmt.Errorf("eth_getLogs method not supported: %v", logsErr) + } + log.Debug().Msg("eth_getLogs method supported") + + var traceBlockResult interface{} + if config.Cfg.RPC.Traces.Enabled { + if traceBlockErr := rpc.RPCClient.Call(&traceBlockResult, "trace_block", "latest"); traceBlockErr != nil { + log.Warn().Err(traceBlockErr).Msg("Optional method trace_block not supported") + } + } + rpc.SupportsTraceBlock = traceBlockResult != nil + log.Debug().Msgf("trace_block method supported: %v", rpc.SupportsTraceBlock) + return nil +} + +func (rpc *Client) setChainID() error { + chainID, err := rpc.EthClient.ChainID(context.Background()) + if err != nil { + return fmt.Errorf("failed to get chain ID: %v", err) + } + rpc.ChainID = chainID + return nil +} + +func (rpc *Client) GetFullBlocks(blockNumbers []*big.Int) []GetFullBlockResult { + var wg sync.WaitGroup + var blocks []RPCFetchBatchResult[common.RawBlock] + var logs []RPCFetchBatchResult[common.RawLogs] + var traces []RPCFetchBatchResult[common.RawTraces] + + wg.Add(2) + + go func() { + defer wg.Done() + blocks = RPCFetchBatch[common.RawBlock](rpc, blockNumbers, "eth_getBlockByNumber", func(blockNum *big.Int) []interface{} { + return []interface{}{hexutil.EncodeBig(blockNum), true} + }) + }() + + go func() { + defer wg.Done() + logs = RPCFetchInBatches[common.RawLogs](rpc, blockNumbers, rpc.BlocksPerRequest.Logs, config.Cfg.RPC.Logs.BatchDelay, "eth_getLogs", func(blockNum *big.Int) []interface{} { + return []interface{}{map[string]string{"fromBlock": hexutil.EncodeBig(blockNum), "toBlock": hexutil.EncodeBig(blockNum)}} + }) + }() + + if rpc.SupportsTraceBlock { + wg.Add(1) + go func() { + defer wg.Done() + traces = RPCFetchInBatches[common.RawTraces](rpc, blockNumbers, rpc.BlocksPerRequest.Traces, config.Cfg.RPC.Traces.BatchDelay, "trace_block", func(blockNum *big.Int) []interface{} { + return []interface{}{hexutil.EncodeBig(blockNum)} + }) + }() + } + + wg.Wait() + + return SerializeFullBlocks(rpc.ChainID, blocks, logs, traces) +} + +func (rpc *Client) GetBlocks(blockNumbers []*big.Int) []GetBlocksResult { + var wg sync.WaitGroup + var blocks []RPCFetchBatchResult[common.RawBlock] + + wg.Add(1) + + go func() { + defer wg.Done() + blocks = RPCFetchBatch[common.RawBlock](rpc, blockNumbers, "eth_getBlockByNumber", func(blockNum *big.Int) []interface{} { + return []interface{}{hexutil.EncodeBig(blockNum), false} + }) + }() + wg.Wait() + + return SerializeBlocks(rpc.ChainID, blocks) +} diff --git a/internal/worker/serializer.go b/internal/rpc/serializer.go similarity index 83% rename from internal/worker/serializer.go rename to internal/rpc/serializer.go index e4f9d9d..69e2562 100644 --- a/internal/worker/serializer.go +++ b/internal/rpc/serializer.go @@ -1,4 +1,4 @@ -package worker +package rpc import ( "encoding/json" @@ -10,21 +10,21 @@ import ( "github.com/thirdweb-dev/indexer/internal/common" ) -func SerializeWorkerResults(chainId *big.Int, blocks []BatchFetchResult[RawBlock], logs []BatchFetchResult[RawLogs], traces []BatchFetchResult[RawTraces]) []WorkerResult { - results := make([]WorkerResult, 0, len(blocks)) +func SerializeFullBlocks(chainId *big.Int, blocks []RPCFetchBatchResult[common.RawBlock], logs []RPCFetchBatchResult[common.RawLogs], traces []RPCFetchBatchResult[common.RawTraces]) []GetFullBlockResult { + results := make([]GetFullBlockResult, 0, len(blocks)) - rawLogsMap := make(map[string]BatchFetchResult[RawLogs]) + rawLogsMap := make(map[string]RPCFetchBatchResult[common.RawLogs]) for _, rawLogs := range logs { rawLogsMap[rawLogs.BlockNumber.String()] = rawLogs } - rawTracesMap := make(map[string]BatchFetchResult[RawTraces]) + rawTracesMap := make(map[string]RPCFetchBatchResult[common.RawTraces]) for _, rawTraces := range traces { rawTracesMap[rawTraces.BlockNumber.String()] = rawTraces } for _, rawBlock := range blocks { - result := WorkerResult{ + result := GetFullBlockResult{ BlockNumber: rawBlock.BlockNumber, } if rawBlock.Result == nil { @@ -40,15 +40,15 @@ func SerializeWorkerResults(chainId *big.Int, blocks []BatchFetchResult[RawBlock continue } - result.Block = serializeBlock(chainId, rawBlock.Result) - blockTimestamp := result.Block.Timestamp - result.Transactions = serializeTransactions(chainId, rawBlock.Result["transactions"].([]interface{}), blockTimestamp) + result.Data.Block = serializeBlock(chainId, rawBlock.Result) + blockTimestamp := result.Data.Block.Timestamp + result.Data.Transactions = serializeTransactions(chainId, rawBlock.Result["transactions"].([]interface{}), blockTimestamp) if rawLogs, exists := rawLogsMap[rawBlock.BlockNumber.String()]; exists { if rawLogs.Error != nil { result.Error = rawLogs.Error } else { - result.Logs = serializeLogs(chainId, rawLogs.Result, result.Block) + result.Data.Logs = serializeLogs(chainId, rawLogs.Result, result.Data.Block) } } @@ -57,7 +57,7 @@ func SerializeWorkerResults(chainId *big.Int, blocks []BatchFetchResult[RawBlock if rawTraces.Error != nil { result.Error = rawTraces.Error } else { - result.Traces = serializeTraces(chainId, rawTraces.Result, result.Block) + result.Data.Traces = serializeTraces(chainId, rawTraces.Result, result.Data.Block) } } } @@ -68,7 +68,34 @@ func SerializeWorkerResults(chainId *big.Int, blocks []BatchFetchResult[RawBlock return results } -func serializeBlock(chainId *big.Int, block RawBlock) common.Block { +func SerializeBlocks(chainId *big.Int, blocks []RPCFetchBatchResult[common.RawBlock]) []GetBlocksResult { + results := make([]GetBlocksResult, 0, len(blocks)) + + for _, rawBlock := range blocks { + result := GetBlocksResult{ + BlockNumber: rawBlock.BlockNumber, + } + if rawBlock.Result == nil { + log.Warn().Msgf("Received a nil block result for block %s.", rawBlock.BlockNumber.String()) + result.Error = fmt.Errorf("received a nil block result from RPC") + results = append(results, result) + continue + } + + if rawBlock.Error != nil { + result.Error = rawBlock.Error + results = append(results, result) + continue + } + + result.Data = serializeBlock(chainId, rawBlock.Result) + results = append(results, result) + } + + return results +} + +func serializeBlock(chainId *big.Int, block common.RawBlock) common.Block { return common.Block{ ChainId: chainId, Number: hexToBigInt(block["number"]), diff --git a/internal/rpc/utils.go b/internal/rpc/utils.go new file mode 100644 index 0000000..26f2cd4 --- /dev/null +++ b/internal/rpc/utils.go @@ -0,0 +1,33 @@ +package rpc + +import ( + config "github.com/thirdweb-dev/indexer/configs" +) + +// TODO: we should detect this automatically +const ( + DEFAULT_BLOCKS_PER_REQUEST = 1000 + DEFAULT_LOGS_PER_REQUEST = 100 + DEFAULT_TRACES_PER_REQUEST = 100 +) + +func GetBlockPerRequestConfig() BlocksPerRequestConfig { + blocksPerRequest := config.Cfg.RPC.Blocks.BlocksPerRequest + if blocksPerRequest == 0 { + blocksPerRequest = DEFAULT_BLOCKS_PER_REQUEST + } + logsBlocksPerRequest := config.Cfg.RPC.Logs.BlocksPerRequest + if logsBlocksPerRequest == 0 { + logsBlocksPerRequest = DEFAULT_LOGS_PER_REQUEST + } + tracesBlocksPerRequest := config.Cfg.RPC.Traces.BlocksPerRequest + if tracesBlocksPerRequest == 0 { + tracesBlocksPerRequest = DEFAULT_TRACES_PER_REQUEST + } + + return BlocksPerRequestConfig{ + Blocks: blocksPerRequest, + Logs: logsBlocksPerRequest, + Traces: tracesBlocksPerRequest, + } +} diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 89e89f4..81c2974 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -1,80 +1,40 @@ package worker import ( - "context" "math/big" "sync" "time" - "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/rpc" "github.com/rs/zerolog/log" config "github.com/thirdweb-dev/indexer/configs" "github.com/thirdweb-dev/indexer/internal/common" "github.com/thirdweb-dev/indexer/internal/metrics" + "github.com/thirdweb-dev/indexer/internal/rpc" ) type Worker struct { - rpc common.RPC + rpc rpc.Client } -type WorkerResult struct { - BlockNumber *big.Int - Error error - Block common.Block - Transactions []common.Transaction - Logs []common.Log - Traces []common.Trace -} - -type BatchFetchResult[T any] struct { - BlockNumber *big.Int - Error error - Result T -} - -type RawBlock = map[string]interface{} -type RawLogs = []map[string]interface{} -type RawTraces = []map[string]interface{} - -type BlockFetchResult struct { - BlockNumber *big.Int - Error error - Block common.Block - Transactions []common.Transaction -} - -type LogsFetchResult struct { - BlockNumber *big.Int - Error error - Logs []common.Log -} - -type TracesFetchResult struct { - BlockNumber *big.Int - Error error - Traces []common.Trace -} - -func NewWorker(rpc common.RPC) *Worker { +func NewWorker(rpc rpc.Client) *Worker { return &Worker{ rpc: rpc, } } -func (w *Worker) Run(blockNumbers []*big.Int) []WorkerResult { +func (w *Worker) Run(blockNumbers []*big.Int) []rpc.GetFullBlockResult { blockCount := len(blockNumbers) - chunks := divideIntoChunks(blockNumbers, w.rpc.BlocksPerRequest.Blocks) + chunks := common.BigIntSliceToChunks(blockNumbers, w.rpc.BlocksPerRequest.Blocks) var wg sync.WaitGroup - resultsCh := make(chan []WorkerResult, len(chunks)) + resultsCh := make(chan []rpc.GetFullBlockResult, len(chunks)) log.Debug().Msgf("Worker Processing %d blocks in %d chunks of max %d blocks", blockCount, len(chunks), w.rpc.BlocksPerRequest.Blocks) for _, chunk := range chunks { wg.Add(1) go func(chunk []*big.Int) { defer wg.Done() - resultsCh <- w.processBatch(chunk) + resultsCh <- w.rpc.GetFullBlocks(chunk) if config.Cfg.RPC.Blocks.BatchDelay > 0 { time.Sleep(time.Duration(config.Cfg.RPC.Blocks.BatchDelay) * time.Millisecond) } @@ -85,7 +45,7 @@ func (w *Worker) Run(blockNumbers []*big.Int) []WorkerResult { close(resultsCh) }() - results := make([]WorkerResult, 0, blockCount) + results := make([]rpc.GetFullBlockResult, 0, blockCount) for batchResults := range resultsCh { results = append(results, batchResults...) } @@ -98,123 +58,3 @@ func (w *Worker) Run(blockNumbers []*big.Int) []WorkerResult { } return results } - -func divideIntoChunks(blockNumbers []*big.Int, batchSize int) [][]*big.Int { - if batchSize >= len(blockNumbers) { - return [][]*big.Int{blockNumbers} - } - var chunks [][]*big.Int - for i := 0; i < len(blockNumbers); i += batchSize { - end := i + batchSize - if end > len(blockNumbers) { - end = len(blockNumbers) - } - chunks = append(chunks, blockNumbers[i:end]) - } - return chunks -} - -func (w *Worker) processBatch(blockNumbers []*big.Int) []WorkerResult { - var wg sync.WaitGroup - var blocks []BatchFetchResult[RawBlock] - var logs []BatchFetchResult[RawLogs] - var traces []BatchFetchResult[RawTraces] - - wg.Add(2) - - go func() { - defer wg.Done() - blocks = fetchBatch[RawBlock](w.rpc, blockNumbers, "eth_getBlockByNumber", func(blockNum *big.Int) []interface{} { - return []interface{}{hexutil.EncodeBig(blockNum), true} - }) - }() - - go func() { - defer wg.Done() - logs = fetchInBatches[RawLogs](w.rpc, blockNumbers, w.rpc.BlocksPerRequest.Logs, config.Cfg.RPC.Logs.BatchDelay, "eth_getLogs", func(blockNum *big.Int) []interface{} { - return []interface{}{map[string]string{"fromBlock": hexutil.EncodeBig(blockNum), "toBlock": hexutil.EncodeBig(blockNum)}} - }) - }() - - if w.rpc.SupportsTraceBlock { - wg.Add(1) - go func() { - defer wg.Done() - traces = fetchInBatches[RawTraces](w.rpc, blockNumbers, w.rpc.BlocksPerRequest.Traces, config.Cfg.RPC.Traces.BatchDelay, "trace_block", func(blockNum *big.Int) []interface{} { - return []interface{}{hexutil.EncodeBig(blockNum)} - }) - }() - } - - wg.Wait() - - return SerializeWorkerResults(w.rpc.ChainID, blocks, logs, traces) -} - -func fetchInBatches[T any](RPC common.RPC, blockNumbers []*big.Int, batchSize int, batchDelay int, method string, argsFunc func(*big.Int) []interface{}) []BatchFetchResult[T] { - if len(blockNumbers) <= batchSize { - return fetchBatch[T](RPC, blockNumbers, method, argsFunc) - } - chunks := divideIntoChunks(blockNumbers, batchSize) - - log.Debug().Msgf("Fetching %s for %d blocks in %d chunks of max %d requests", method, len(blockNumbers), len(chunks), batchSize) - - var wg sync.WaitGroup - resultsCh := make(chan []BatchFetchResult[T], len(chunks)) - - for _, chunk := range chunks { - wg.Add(1) - go func(chunk []*big.Int) { - defer wg.Done() - resultsCh <- fetchBatch[T](RPC, chunk, method, argsFunc) - if batchDelay > 0 { - time.Sleep(time.Duration(batchDelay) * time.Millisecond) - } - }(chunk) - } - go func() { - wg.Wait() - close(resultsCh) - }() - - results := make([]BatchFetchResult[T], 0, len(blockNumbers)) - for batchResults := range resultsCh { - results = append(results, batchResults...) - } - - return results -} - -func fetchBatch[T any](RPC common.RPC, blockNumbers []*big.Int, method string, argsFunc func(*big.Int) []interface{}) []BatchFetchResult[T] { - batch := make([]rpc.BatchElem, len(blockNumbers)) - results := make([]BatchFetchResult[T], len(blockNumbers)) - - for i, blockNum := range blockNumbers { - results[i] = BatchFetchResult[T]{ - BlockNumber: blockNum, - } - batch[i] = rpc.BatchElem{ - Method: method, - Args: argsFunc(blockNum), - Result: new(T), - } - } - - err := RPC.RPCClient.BatchCallContext(context.Background(), batch) - if err != nil { - for i := range results { - results[i].Error = err - } - return results - } - - for i, elem := range batch { - if elem.Error != nil { - results[i].Error = elem.Error - } else { - results[i].Result = *elem.Result.(*T) - } - } - - return results -}