Skip to content

Commit

Permalink
create interface for rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
iuwqyir committed Oct 8, 2024
1 parent 6997e9c commit df18b17
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 70 deletions.
2 changes: 1 addition & 1 deletion cmd/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func RunOrchestrator(cmd *cobra.Command, args []string) {
log.Fatal().Err(err).Msg("Failed to initialize RPC")
}

orchestrator, err := orchestrator.NewOrchestrator(*rpc)
orchestrator, err := orchestrator.NewOrchestrator(rpc)
if err != nil {
log.Fatal().Err(err).Msg("Failed to create orchestrator")
}
Expand Down
18 changes: 5 additions & 13 deletions internal/orchestrator/chain_tracker.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package orchestrator

import (
"context"
"time"

"github.com/rs/zerolog/log"
Expand All @@ -12,11 +11,11 @@ import (
const DEFAULT_CHAIN_TRACKER_POLL_INTERVAL = 300000 // 5 minutes

type ChainTracker struct {
rpc rpc.Client
rpc rpc.IRPCClient
triggerIntervalMs int
}

func NewChainTracker(rpc rpc.Client) *ChainTracker {
func NewChainTracker(rpc rpc.IRPCClient) *ChainTracker {
return &ChainTracker{
rpc: rpc,
triggerIntervalMs: DEFAULT_CHAIN_TRACKER_POLL_INTERVAL,
Expand All @@ -30,23 +29,16 @@ func (ct *ChainTracker) Start() {
log.Debug().Msgf("Chain tracker running")
go func() {
for range ticker.C {
latestBlockNumber, err := ct.getLatestBlockNumber()
latestBlockNumber, err := ct.rpc.GetLatestBlockNumber()
if err != nil {
log.Error().Err(err).Msg("Error getting latest block number")
continue
}
metrics.ChainHead.Set(float64(latestBlockNumber) / 100)
latestBlockNumberFloat, _ := latestBlockNumber.Float64()
metrics.ChainHead.Set(latestBlockNumberFloat)
}
}()

// Keep the program running (otherwise it will exit)
select {}
}

func (ct *ChainTracker) getLatestBlockNumber() (uint64, error) {
blockNumber, err := ct.rpc.EthClient.BlockNumber(context.Background())
if err != nil {
return 0, err
}
return blockNumber, nil
}
12 changes: 6 additions & 6 deletions internal/orchestrator/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ type Committer struct {
blocksPerCommit int
storage storage.IStorage
pollFromBlock *big.Int
rpc rpc.Client
rpc rpc.IRPCClient
}

func NewCommitter(rpc rpc.Client, storage storage.IStorage) *Committer {
func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage) *Committer {
triggerInterval := config.Cfg.Committer.Interval
if triggerInterval == 0 {
triggerInterval = DEFAULT_COMMITTER_TRIGGER_INTERVAL
Expand Down Expand Up @@ -71,7 +71,7 @@ func (c *Committer) Start() {
}

func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.ChainID)
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
log.Info().Msgf("Committer found this max block number in main storage: %s", latestCommittedBlockNumber.String())
if err != nil {
return nil, err
Expand Down Expand Up @@ -103,7 +103,7 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
return nil, nil
}

blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{BlockNumbers: blocksToCommit, ChainId: c.rpc.ChainID})
blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{BlockNumbers: blocksToCommit, ChainId: c.rpc.GetChainID()})
if err != nil {
return nil, fmt.Errorf("error fetching blocks to commit: %v", err)
}
Expand Down Expand Up @@ -180,7 +180,7 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc
}
log.Debug().Msgf("Detected %d missing blocks between blocks %s and %s", missingBlockCount, expectedStartBlockNumber.String(), actualFirstBlock.Number.String())

existingBlockFailures, err := c.storage.OrchestratorStorage.GetBlockFailures(storage.QueryFilter{BlockNumbers: missingBlockNumbers, ChainId: c.rpc.ChainID})
existingBlockFailures, err := c.storage.OrchestratorStorage.GetBlockFailures(storage.QueryFilter{BlockNumbers: missingBlockNumbers, ChainId: c.rpc.GetChainID()})
if err != nil {
return fmt.Errorf("error getting block failures while handling gap: %v", err)
}
Expand All @@ -197,7 +197,7 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc
if _, ok := existingBlockFailuresMap[blockNumberStr]; !ok {
blockFailures = append(blockFailures, common.BlockFailure{
BlockNumber: blockNumber,
ChainId: c.rpc.ChainID,
ChainId: c.rpc.GetChainID(),
FailureTime: time.Now(),
FailureCount: 1,
FailureReason: "Gap detected for this block",
Expand Down
8 changes: 4 additions & 4 deletions internal/orchestrator/failure_recoverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ type FailureRecoverer struct {
failuresPerPoll int
triggerIntervalMs int
storage storage.IStorage
rpc rpc.Client
rpc rpc.IRPCClient
}

func NewFailureRecoverer(rpc rpc.Client, storage storage.IStorage) *FailureRecoverer {
func NewFailureRecoverer(rpc rpc.IRPCClient, storage storage.IStorage) *FailureRecoverer {
failuresPerPoll := config.Cfg.FailureRecoverer.BlocksPerRun
if failuresPerPoll == 0 {
failuresPerPoll = DEFAULT_FAILURES_PER_POLL
Expand All @@ -49,7 +49,7 @@ func (fr *FailureRecoverer) Start() {
go func() {
for range ticker.C {
blockFailures, err := fr.storage.OrchestratorStorage.GetBlockFailures(storage.QueryFilter{
ChainId: fr.rpc.ChainID,
ChainId: fr.rpc.GetChainID(),
Limit: fr.failuresPerPoll,
})
if err != nil {
Expand Down Expand Up @@ -101,7 +101,7 @@ func (fr *FailureRecoverer) handleWorkerResults(blockFailures []common.BlockFail
BlockNumber: result.BlockNumber,
FailureReason: result.Error.Error(),
FailureTime: time.Now(),
ChainId: fr.rpc.ChainID,
ChainId: fr.rpc.GetChainID(),
FailureCount: failureCount,
})
} else {
Expand Down
4 changes: 2 additions & 2 deletions internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ import (
)

type Orchestrator struct {
rpc rpc.Client
rpc rpc.IRPCClient
storage storage.IStorage
pollerEnabled bool
failureRecovererEnabled bool
committerEnabled bool
reorgHandlerEnabled bool
}

func NewOrchestrator(rpc rpc.Client) (*Orchestrator, error) {
func NewOrchestrator(rpc rpc.IRPCClient) (*Orchestrator, error) {
storage, err := storage.NewStorageConnector(&config.Cfg.Storage)
if err != nil {
return nil, err
Expand Down
19 changes: 5 additions & 14 deletions internal/orchestrator/poller.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package orchestrator

import (
"context"
"fmt"
"math/big"
"sync"
Expand All @@ -20,7 +19,7 @@ const DEFAULT_BLOCKS_PER_POLL = 10
const DEFAULT_TRIGGER_INTERVAL = 1000

type Poller struct {
rpc rpc.Client
rpc rpc.IRPCClient
blocksPerPoll int64
triggerIntervalMs int64
storage storage.IStorage
Expand All @@ -33,7 +32,7 @@ type BlockNumberWithError struct {
Error error
}

func NewPoller(rpc rpc.Client, storage storage.IStorage) *Poller {
func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
blocksPerPoll := config.Cfg.Poller.BlocksPerPoll
if blocksPerPoll == 0 {
blocksPerPoll = DEFAULT_BLOCKS_PER_POLL
Expand All @@ -44,7 +43,7 @@ func NewPoller(rpc rpc.Client, storage storage.IStorage) *Poller {
}
untilBlock := big.NewInt(int64(config.Cfg.Poller.UntilBlock))
pollFromBlock := big.NewInt(int64(config.Cfg.Poller.FromBlock))
lastPolledBlock, err := storage.StagingStorage.GetLastStagedBlockNumber(rpc.ChainID, untilBlock)
lastPolledBlock, err := storage.StagingStorage.GetLastStagedBlockNumber(rpc.GetChainID(), untilBlock)
if err != nil || lastPolledBlock == nil || lastPolledBlock.Sign() <= 0 {
lastPolledBlock = new(big.Int).Sub(pollFromBlock, big.NewInt(1)) // needs to include the first block
log.Warn().Err(err).Msgf("No last polled block found, setting to %s", lastPolledBlock.String())
Expand Down Expand Up @@ -124,7 +123,7 @@ func (p *Poller) reachedPollLimit(blockNumber *big.Int) bool {
}

func (p *Poller) getBlockRange() ([]*big.Int, error) {
latestBlock, err := p.getLatestBlockNumber()
latestBlock, err := p.rpc.GetLatestBlockNumber()
if err != nil {
return nil, err
}
Expand All @@ -150,14 +149,6 @@ func (p *Poller) getBlockRange() ([]*big.Int, error) {
return blockNumbers, nil
}

func (p *Poller) getLatestBlockNumber() (*big.Int, error) {
latestBlockUint64, err := p.rpc.EthClient.BlockNumber(context.Background())
if err != nil {
return nil, fmt.Errorf("failed to get latest block number: %v", err)
}
return new(big.Int).SetUint64(latestBlockUint64), nil
}

func (p *Poller) getEndBlockForRange(startBlock *big.Int, latestBlock *big.Int) *big.Int {
endBlock := new(big.Int).Add(startBlock, big.NewInt(p.blocksPerPoll-1))
if endBlock.Cmp(latestBlock) > 0 {
Expand Down Expand Up @@ -217,7 +208,7 @@ func (p *Poller) handleBlockFailures(results []rpc.GetFullBlockResult) {
BlockNumber: result.BlockNumber,
FailureReason: result.Error.Error(),
FailureTime: time.Now(),
ChainId: p.rpc.ChainID,
ChainId: p.rpc.GetChainID(),
FailureCount: 1,
})
}
Expand Down
16 changes: 8 additions & 8 deletions internal/orchestrator/reorg_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

type ReorgHandler struct {
rpc rpc.Client
rpc rpc.IRPCClient
storage storage.IStorage
triggerInterval int
blocksPerScan int
Expand All @@ -26,7 +26,7 @@ type ReorgHandler struct {
const DEFAULT_REORG_HANDLER_INTERVAL = 1000
const DEFAULT_REORG_HANDLER_BLOCKS_PER_SCAN = 100

func NewReorgHandler(rpc rpc.Client, storage storage.IStorage) *ReorgHandler {
func NewReorgHandler(rpc rpc.IRPCClient, storage storage.IStorage) *ReorgHandler {
triggerInterval := config.Cfg.ReorgHandler.Interval
if triggerInterval == 0 {
triggerInterval = DEFAULT_REORG_HANDLER_INTERVAL
Expand All @@ -41,7 +41,7 @@ func NewReorgHandler(rpc rpc.Client, storage storage.IStorage) *ReorgHandler {
worker: worker.NewWorker(rpc),
triggerInterval: triggerInterval,
blocksPerScan: blocksPerScan,
lastCheckedBlock: getInitialCheckedBlockNumber(storage, rpc.ChainID),
lastCheckedBlock: getInitialCheckedBlockNumber(storage, rpc.GetChainID()),
}
}

Expand Down Expand Up @@ -72,7 +72,7 @@ func (rh *ReorgHandler) Start() {
go func() {
for range ticker.C {
lookbackFrom := new(big.Int).Add(rh.lastCheckedBlock, big.NewInt(int64(rh.blocksPerScan)))
blockHeaders, err := rh.storage.MainStorage.LookbackBlockHeaders(rh.rpc.ChainID, rh.blocksPerScan, lookbackFrom)
blockHeaders, err := rh.storage.MainStorage.LookbackBlockHeaders(rh.rpc.GetChainID(), rh.blocksPerScan, lookbackFrom)
if err != nil {
log.Error().Err(err).Msg("Error getting recent block headers")
continue
Expand All @@ -85,7 +85,7 @@ func (rh *ReorgHandler) Start() {
reorgEndIndex := findReorgEndIndex(blockHeaders)
if reorgEndIndex == -1 {
rh.lastCheckedBlock = mostRecentBlockHeader.Number
rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber(rh.rpc.ChainID, mostRecentBlockHeader.Number)
rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber(rh.rpc.GetChainID(), mostRecentBlockHeader.Number)
metrics.ReorgHandlerLastCheckedBlock.Set(float64(mostRecentBlockHeader.Number.Int64()))
continue
}
Expand All @@ -101,7 +101,7 @@ func (rh *ReorgHandler) Start() {
continue
}
rh.lastCheckedBlock = mostRecentBlockHeader.Number
rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber(rh.rpc.ChainID, mostRecentBlockHeader.Number)
rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber(rh.rpc.GetChainID(), mostRecentBlockHeader.Number)
metrics.ReorgHandlerLastCheckedBlock.Set(float64(mostRecentBlockHeader.Number.Int64()))
}
}()
Expand Down Expand Up @@ -147,7 +147,7 @@ func (rh *ReorgHandler) findForkPoint(reversedBlockHeaders []common.BlockHeader)
}
}
lookbackFrom := reversedBlockHeaders[len(reversedBlockHeaders)-1].Number
nextHeadersBatch, err := rh.storage.MainStorage.LookbackBlockHeaders(rh.rpc.ChainID, rh.blocksPerScan, lookbackFrom)
nextHeadersBatch, err := rh.storage.MainStorage.LookbackBlockHeaders(rh.rpc.GetChainID(), rh.blocksPerScan, lookbackFrom)
if err != nil {
return nil, fmt.Errorf("error getting next headers batch: %w", err)
}
Expand Down Expand Up @@ -190,7 +190,7 @@ func (rh *ReorgHandler) handleReorg(reorgStart *big.Int, reorgEnd *big.Int) erro
})
}
// TODO make delete and insert atomic
if err := rh.storage.MainStorage.DeleteBlockData(rh.rpc.ChainID, blockRange); err != nil {
if err := rh.storage.MainStorage.DeleteBlockData(rh.rpc.GetChainID(), blockRange); err != nil {
return fmt.Errorf("error deleting data for blocks %v: %w", blockRange, err)
}
if err := rh.storage.MainStorage.InsertBlockData(&data); err != nil {
Expand Down
Loading

0 comments on commit df18b17

Please sign in to comment.