Skip to content

Commit

Permalink
Refactor rpc for easier reuse (#88)
Browse files Browse the repository at this point in the history
### TL;DR

Refactored RPC to wrap the needed rpc calls and return already serialized data. This way we don't have to repeat the request creation and response handling whenever we need it. We have fixed queries so maintaining wrappers doesn't seem like a problem

### What changed?

- Moved RPC-related functionality from `common` package to a new `rpc` package
- Created a new `Client` struct in the `rpc` package to handle RPC operations
- Refactored worker logic to use the new RPC client
- Updated orchestrator components to use the new RPC client
- Added utility functions for handling big.Int slices and RPC configurations

### How to test?

1. Run the indexer with the updated code
2. Verify that block fetching, log retrieval, and trace processing work as expected
3. Check that batch processing is functioning correctly by monitoring RPC calls
4. Ensure that the orchestrator components (poller, committer, failure recoverer) are operating properly with the new RPC client

### Why make this change?

This refactoring improves the overall structure and performance of the indexer:

1. Better separation of concerns by moving RPC-related code to its own package
2. Improved maintainability and readability of the codebase
3. Easier extension and modification of RPC-related functionality in the future
  • Loading branch information
iuwqyir authored Oct 10, 2024
2 parents 13398d9 + 49fe94d commit e4b9b29
Show file tree
Hide file tree
Showing 17 changed files with 405 additions and 334 deletions.
4 changes: 2 additions & 2 deletions cmd/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
"github.com/thirdweb-dev/indexer/internal/common"
"github.com/thirdweb-dev/indexer/internal/orchestrator"
"github.com/thirdweb-dev/indexer/internal/rpc"
)

var (
Expand All @@ -23,7 +23,7 @@ var (

func RunOrchestrator(cmd *cobra.Command, args []string) {
log.Info().Msg("Starting indexer")
rpc, err := common.InitializeRPC()
rpc, err := rpc.Initialize()
if err != nil {
log.Fatal().Err(err).Msg("Failed to initialize RPC")
}
Expand Down
2 changes: 2 additions & 0 deletions internal/common/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,5 @@ type BlockData struct {
Logs []Log
Traces []Trace
}

type RawBlock = map[string]interface{}
2 changes: 2 additions & 0 deletions internal/common/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@ type Log struct {
Data string `json:"data"`
Topics []string `json:"topics"`
}

type RawLogs = []map[string]interface{}
125 changes: 0 additions & 125 deletions internal/common/rpc.go

This file was deleted.

2 changes: 2 additions & 0 deletions internal/common/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ type Trace struct {
RewardType string `json:"reward_type"`
RefundAddress string `json:"refund_address"`
}

type RawTraces = []map[string]interface{}
18 changes: 18 additions & 0 deletions internal/common/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package common

import "math/big"

func BigIntSliceToChunks(values []*big.Int, chunkSize int) [][]*big.Int {
if chunkSize >= len(values) || chunkSize <= 0 {
return [][]*big.Int{values}
}
var chunks [][]*big.Int
for i := 0; i < len(values); i += chunkSize {
end := i + chunkSize
if end > len(values) {
end = len(values)
}
chunks = append(chunks, values[i:end])
}
return chunks
}
7 changes: 3 additions & 4 deletions internal/orchestrator/chain_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,18 @@ import (
"time"

"github.com/rs/zerolog/log"
"github.com/thirdweb-dev/indexer/internal/common"
"github.com/thirdweb-dev/indexer/internal/metrics"
"github.com/thirdweb-dev/indexer/internal/rpc"
)

const DEFAULT_CHAIN_TRACKER_POLL_INTERVAL = 300000 // 5 minutes

type ChainTracker struct {
rpc common.RPC
rpc rpc.Client
triggerIntervalMs int
}

func NewChainTracker(rpc common.RPC) *ChainTracker {

func NewChainTracker(rpc rpc.Client) *ChainTracker {
return &ChainTracker{
rpc: rpc,
triggerIntervalMs: DEFAULT_CHAIN_TRACKER_POLL_INTERVAL,
Expand Down
5 changes: 3 additions & 2 deletions internal/orchestrator/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
config "github.com/thirdweb-dev/indexer/configs"
"github.com/thirdweb-dev/indexer/internal/common"
"github.com/thirdweb-dev/indexer/internal/metrics"
"github.com/thirdweb-dev/indexer/internal/rpc"
"github.com/thirdweb-dev/indexer/internal/storage"
)

Expand All @@ -22,10 +23,10 @@ type Committer struct {
blocksPerCommit int
storage storage.IStorage
pollFromBlock *big.Int
rpc common.RPC
rpc rpc.Client
}

func NewCommitter(rpc common.RPC, storage storage.IStorage) *Committer {
func NewCommitter(rpc rpc.Client, storage storage.IStorage) *Committer {
triggerInterval := config.Cfg.Committer.Interval
if triggerInterval == 0 {
triggerInterval = DEFAULT_COMMITTER_TRIGGER_INTERVAL
Expand Down
15 changes: 8 additions & 7 deletions internal/orchestrator/failure_recoverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
config "github.com/thirdweb-dev/indexer/configs"
"github.com/thirdweb-dev/indexer/internal/common"
"github.com/thirdweb-dev/indexer/internal/metrics"
"github.com/thirdweb-dev/indexer/internal/rpc"
"github.com/thirdweb-dev/indexer/internal/storage"
"github.com/thirdweb-dev/indexer/internal/worker"
)
Expand All @@ -20,10 +21,10 @@ type FailureRecoverer struct {
failuresPerPoll int
triggerIntervalMs int
storage storage.IStorage
rpc common.RPC
rpc rpc.Client
}

func NewFailureRecoverer(rpc common.RPC, storage storage.IStorage) *FailureRecoverer {
func NewFailureRecoverer(rpc rpc.Client, storage storage.IStorage) *FailureRecoverer {
failuresPerPoll := config.Cfg.FailureRecoverer.BlocksPerRun
if failuresPerPoll == 0 {
failuresPerPoll = DEFAULT_FAILURES_PER_POLL
Expand Down Expand Up @@ -80,7 +81,7 @@ func (fr *FailureRecoverer) Start() {
select {}
}

func (fr *FailureRecoverer) handleWorkerResults(blockFailures []common.BlockFailure, results []worker.WorkerResult) {
func (fr *FailureRecoverer) handleWorkerResults(blockFailures []common.BlockFailure, results []rpc.GetFullBlockResult) {
log.Debug().Msgf("Failure Recoverer recovered %d blocks", len(results))
blockFailureMap := make(map[*big.Int]common.BlockFailure)
for _, failure := range blockFailures {
Expand All @@ -105,10 +106,10 @@ func (fr *FailureRecoverer) handleWorkerResults(blockFailures []common.BlockFail
})
} else {
successfulResults = append(successfulResults, common.BlockData{
Block: result.Block,
Logs: result.Logs,
Transactions: result.Transactions,
Traces: result.Traces,
Block: result.Data.Block,
Logs: result.Data.Logs,
Transactions: result.Data.Transactions,
Traces: result.Data.Traces,
})
failuresToDelete = append(failuresToDelete, blockFailureForBlock)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@ import (
"sync"

config "github.com/thirdweb-dev/indexer/configs"
"github.com/thirdweb-dev/indexer/internal/common"
"github.com/thirdweb-dev/indexer/internal/rpc"
"github.com/thirdweb-dev/indexer/internal/storage"
)

type Orchestrator struct {
rpc common.RPC
rpc rpc.Client
storage storage.IStorage
pollerEnabled bool
failureRecovererEnabled bool
committerEnabled bool
}

func NewOrchestrator(rpc common.RPC) (*Orchestrator, error) {
func NewOrchestrator(rpc rpc.Client) (*Orchestrator, error) {
storage, err := storage.NewStorageConnector(&config.Cfg.Storage)
if err != nil {
return nil, err
Expand Down
23 changes: 12 additions & 11 deletions internal/orchestrator/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
config "github.com/thirdweb-dev/indexer/configs"
"github.com/thirdweb-dev/indexer/internal/common"
"github.com/thirdweb-dev/indexer/internal/metrics"
"github.com/thirdweb-dev/indexer/internal/rpc"
"github.com/thirdweb-dev/indexer/internal/storage"
"github.com/thirdweb-dev/indexer/internal/worker"
)
Expand All @@ -19,7 +20,7 @@ const DEFAULT_BLOCKS_PER_POLL = 10
const DEFAULT_TRIGGER_INTERVAL = 1000

type Poller struct {
rpc common.RPC
rpc rpc.Client
blocksPerPoll int64
triggerIntervalMs int64
storage storage.IStorage
Expand All @@ -32,7 +33,7 @@ type BlockNumberWithError struct {
Error error
}

func NewPoller(rpc common.RPC, storage storage.IStorage) *Poller {
func NewPoller(rpc rpc.Client, storage storage.IStorage) *Poller {
blocksPerPoll := config.Cfg.Poller.BlocksPerPoll
if blocksPerPoll == 0 {
blocksPerPoll = DEFAULT_BLOCKS_PER_POLL
Expand Down Expand Up @@ -169,9 +170,9 @@ func (p *Poller) getEndBlockForRange(startBlock *big.Int, latestBlock *big.Int)
return endBlock
}

func (p *Poller) handleWorkerResults(results []worker.WorkerResult) {
var successfulResults []worker.WorkerResult
var failedResults []worker.WorkerResult
func (p *Poller) handleWorkerResults(results []rpc.GetFullBlockResult) {
var successfulResults []rpc.GetFullBlockResult
var failedResults []rpc.GetFullBlockResult

for _, result := range results {
if result.Error != nil {
Expand All @@ -185,17 +186,17 @@ func (p *Poller) handleWorkerResults(results []worker.WorkerResult) {
blockData := make([]common.BlockData, 0, len(successfulResults))
for _, result := range successfulResults {
blockData = append(blockData, common.BlockData{
Block: result.Block,
Logs: result.Logs,
Transactions: result.Transactions,
Traces: result.Traces,
Block: result.Data.Block,
Logs: result.Data.Logs,
Transactions: result.Data.Transactions,
Traces: result.Data.Traces,
})
}
if err := p.storage.StagingStorage.InsertBlockData(blockData); err != nil {
e := fmt.Errorf("error inserting block data: %v", err)
log.Error().Err(e)
for _, result := range successfulResults {
failedResults = append(failedResults, worker.WorkerResult{
failedResults = append(failedResults, rpc.GetFullBlockResult{
BlockNumber: result.BlockNumber,
Error: e,
})
Expand All @@ -208,7 +209,7 @@ func (p *Poller) handleWorkerResults(results []worker.WorkerResult) {
}
}

func (p *Poller) handleBlockFailures(results []worker.WorkerResult) {
func (p *Poller) handleBlockFailures(results []rpc.GetFullBlockResult) {
var blockFailures []common.BlockFailure
for _, result := range results {
if result.Error != nil {
Expand Down
Loading

0 comments on commit e4b9b29

Please sign in to comment.