Skip to content

Commit

Permalink
refactor rpc to a wrapper for easier reuse
Browse files Browse the repository at this point in the history
  • Loading branch information
iuwqyir committed Oct 10, 2024
1 parent 13398d9 commit 49fe94d
Show file tree
Hide file tree
Showing 17 changed files with 405 additions and 334 deletions.
4 changes: 2 additions & 2 deletions cmd/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
"github.com/thirdweb-dev/indexer/internal/common"
"github.com/thirdweb-dev/indexer/internal/orchestrator"
"github.com/thirdweb-dev/indexer/internal/rpc"
)

var (
Expand All @@ -23,7 +23,7 @@ var (

func RunOrchestrator(cmd *cobra.Command, args []string) {
log.Info().Msg("Starting indexer")
rpc, err := common.InitializeRPC()
rpc, err := rpc.Initialize()
if err != nil {
log.Fatal().Err(err).Msg("Failed to initialize RPC")
}
Expand Down
2 changes: 2 additions & 0 deletions internal/common/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,5 @@ type BlockData struct {
Logs []Log
Traces []Trace
}

type RawBlock = map[string]interface{}
2 changes: 2 additions & 0 deletions internal/common/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@ type Log struct {
Data string `json:"data"`
Topics []string `json:"topics"`
}

type RawLogs = []map[string]interface{}
125 changes: 0 additions & 125 deletions internal/common/rpc.go

This file was deleted.

2 changes: 2 additions & 0 deletions internal/common/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ type Trace struct {
RewardType string `json:"reward_type"`
RefundAddress string `json:"refund_address"`
}

type RawTraces = []map[string]interface{}
18 changes: 18 additions & 0 deletions internal/common/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package common

import "math/big"

func BigIntSliceToChunks(values []*big.Int, chunkSize int) [][]*big.Int {
if chunkSize >= len(values) || chunkSize <= 0 {
return [][]*big.Int{values}
}
var chunks [][]*big.Int
for i := 0; i < len(values); i += chunkSize {
end := i + chunkSize
if end > len(values) {
end = len(values)
}
chunks = append(chunks, values[i:end])
}
return chunks
}
7 changes: 3 additions & 4 deletions internal/orchestrator/chain_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,18 @@ import (
"time"

"github.com/rs/zerolog/log"
"github.com/thirdweb-dev/indexer/internal/common"
"github.com/thirdweb-dev/indexer/internal/metrics"
"github.com/thirdweb-dev/indexer/internal/rpc"
)

const DEFAULT_CHAIN_TRACKER_POLL_INTERVAL = 300000 // 5 minutes

type ChainTracker struct {
rpc common.RPC
rpc rpc.Client
triggerIntervalMs int
}

func NewChainTracker(rpc common.RPC) *ChainTracker {

func NewChainTracker(rpc rpc.Client) *ChainTracker {
return &ChainTracker{
rpc: rpc,
triggerIntervalMs: DEFAULT_CHAIN_TRACKER_POLL_INTERVAL,
Expand Down
5 changes: 3 additions & 2 deletions internal/orchestrator/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
config "github.com/thirdweb-dev/indexer/configs"
"github.com/thirdweb-dev/indexer/internal/common"
"github.com/thirdweb-dev/indexer/internal/metrics"
"github.com/thirdweb-dev/indexer/internal/rpc"
"github.com/thirdweb-dev/indexer/internal/storage"
)

Expand All @@ -22,10 +23,10 @@ type Committer struct {
blocksPerCommit int
storage storage.IStorage
pollFromBlock *big.Int
rpc common.RPC
rpc rpc.Client
}

func NewCommitter(rpc common.RPC, storage storage.IStorage) *Committer {
func NewCommitter(rpc rpc.Client, storage storage.IStorage) *Committer {
triggerInterval := config.Cfg.Committer.Interval
if triggerInterval == 0 {
triggerInterval = DEFAULT_COMMITTER_TRIGGER_INTERVAL
Expand Down
15 changes: 8 additions & 7 deletions internal/orchestrator/failure_recoverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
config "github.com/thirdweb-dev/indexer/configs"
"github.com/thirdweb-dev/indexer/internal/common"
"github.com/thirdweb-dev/indexer/internal/metrics"
"github.com/thirdweb-dev/indexer/internal/rpc"
"github.com/thirdweb-dev/indexer/internal/storage"
"github.com/thirdweb-dev/indexer/internal/worker"
)
Expand All @@ -20,10 +21,10 @@ type FailureRecoverer struct {
failuresPerPoll int
triggerIntervalMs int
storage storage.IStorage
rpc common.RPC
rpc rpc.Client
}

func NewFailureRecoverer(rpc common.RPC, storage storage.IStorage) *FailureRecoverer {
func NewFailureRecoverer(rpc rpc.Client, storage storage.IStorage) *FailureRecoverer {
failuresPerPoll := config.Cfg.FailureRecoverer.BlocksPerRun
if failuresPerPoll == 0 {
failuresPerPoll = DEFAULT_FAILURES_PER_POLL
Expand Down Expand Up @@ -80,7 +81,7 @@ func (fr *FailureRecoverer) Start() {
select {}
}

func (fr *FailureRecoverer) handleWorkerResults(blockFailures []common.BlockFailure, results []worker.WorkerResult) {
func (fr *FailureRecoverer) handleWorkerResults(blockFailures []common.BlockFailure, results []rpc.GetFullBlockResult) {
log.Debug().Msgf("Failure Recoverer recovered %d blocks", len(results))
blockFailureMap := make(map[*big.Int]common.BlockFailure)
for _, failure := range blockFailures {
Expand All @@ -105,10 +106,10 @@ func (fr *FailureRecoverer) handleWorkerResults(blockFailures []common.BlockFail
})
} else {
successfulResults = append(successfulResults, common.BlockData{
Block: result.Block,
Logs: result.Logs,
Transactions: result.Transactions,
Traces: result.Traces,
Block: result.Data.Block,
Logs: result.Data.Logs,
Transactions: result.Data.Transactions,
Traces: result.Data.Traces,
})
failuresToDelete = append(failuresToDelete, blockFailureForBlock)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@ import (
"sync"

config "github.com/thirdweb-dev/indexer/configs"
"github.com/thirdweb-dev/indexer/internal/common"
"github.com/thirdweb-dev/indexer/internal/rpc"
"github.com/thirdweb-dev/indexer/internal/storage"
)

type Orchestrator struct {
rpc common.RPC
rpc rpc.Client
storage storage.IStorage
pollerEnabled bool
failureRecovererEnabled bool
committerEnabled bool
}

func NewOrchestrator(rpc common.RPC) (*Orchestrator, error) {
func NewOrchestrator(rpc rpc.Client) (*Orchestrator, error) {
storage, err := storage.NewStorageConnector(&config.Cfg.Storage)
if err != nil {
return nil, err
Expand Down
23 changes: 12 additions & 11 deletions internal/orchestrator/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
config "github.com/thirdweb-dev/indexer/configs"
"github.com/thirdweb-dev/indexer/internal/common"
"github.com/thirdweb-dev/indexer/internal/metrics"
"github.com/thirdweb-dev/indexer/internal/rpc"
"github.com/thirdweb-dev/indexer/internal/storage"
"github.com/thirdweb-dev/indexer/internal/worker"
)
Expand All @@ -19,7 +20,7 @@ const DEFAULT_BLOCKS_PER_POLL = 10
const DEFAULT_TRIGGER_INTERVAL = 1000

type Poller struct {
rpc common.RPC
rpc rpc.Client
blocksPerPoll int64
triggerIntervalMs int64
storage storage.IStorage
Expand All @@ -32,7 +33,7 @@ type BlockNumberWithError struct {
Error error
}

func NewPoller(rpc common.RPC, storage storage.IStorage) *Poller {
func NewPoller(rpc rpc.Client, storage storage.IStorage) *Poller {
blocksPerPoll := config.Cfg.Poller.BlocksPerPoll
if blocksPerPoll == 0 {
blocksPerPoll = DEFAULT_BLOCKS_PER_POLL
Expand Down Expand Up @@ -169,9 +170,9 @@ func (p *Poller) getEndBlockForRange(startBlock *big.Int, latestBlock *big.Int)
return endBlock
}

func (p *Poller) handleWorkerResults(results []worker.WorkerResult) {
var successfulResults []worker.WorkerResult
var failedResults []worker.WorkerResult
func (p *Poller) handleWorkerResults(results []rpc.GetFullBlockResult) {
var successfulResults []rpc.GetFullBlockResult
var failedResults []rpc.GetFullBlockResult

for _, result := range results {
if result.Error != nil {
Expand All @@ -185,17 +186,17 @@ func (p *Poller) handleWorkerResults(results []worker.WorkerResult) {
blockData := make([]common.BlockData, 0, len(successfulResults))
for _, result := range successfulResults {
blockData = append(blockData, common.BlockData{
Block: result.Block,
Logs: result.Logs,
Transactions: result.Transactions,
Traces: result.Traces,
Block: result.Data.Block,
Logs: result.Data.Logs,
Transactions: result.Data.Transactions,
Traces: result.Data.Traces,
})
}
if err := p.storage.StagingStorage.InsertBlockData(blockData); err != nil {
e := fmt.Errorf("error inserting block data: %v", err)
log.Error().Err(e)
for _, result := range successfulResults {
failedResults = append(failedResults, worker.WorkerResult{
failedResults = append(failedResults, rpc.GetFullBlockResult{
BlockNumber: result.BlockNumber,
Error: e,
})
Expand All @@ -208,7 +209,7 @@ func (p *Poller) handleWorkerResults(results []worker.WorkerResult) {
}
}

func (p *Poller) handleBlockFailures(results []worker.WorkerResult) {
func (p *Poller) handleBlockFailures(results []rpc.GetFullBlockResult) {
var blockFailures []common.BlockFailure
for _, result := range results {
if result.Error != nil {
Expand Down
Loading

0 comments on commit 49fe94d

Please sign in to comment.