From df18b175a1093321daafa0ae78fbc9e14d9bd17c Mon Sep 17 00:00:00 2001 From: iuwqyir Date: Tue, 8 Oct 2024 14:56:22 +0300 Subject: [PATCH] create interface for rpc --- cmd/orchestrator.go | 2 +- internal/orchestrator/chain_tracker.go | 18 ++---- internal/orchestrator/committer.go | 12 ++-- internal/orchestrator/failure_recoverer.go | 8 +-- internal/orchestrator/orchestrator.go | 4 +- internal/orchestrator/poller.go | 19 ++---- internal/orchestrator/reorg_handler.go | 16 ++--- internal/rpc/rpc.go | 75 ++++++++++++++++------ internal/worker/worker.go | 8 +-- 9 files changed, 92 insertions(+), 70 deletions(-) diff --git a/cmd/orchestrator.go b/cmd/orchestrator.go index f577949..84665df 100644 --- a/cmd/orchestrator.go +++ b/cmd/orchestrator.go @@ -28,7 +28,7 @@ func RunOrchestrator(cmd *cobra.Command, args []string) { log.Fatal().Err(err).Msg("Failed to initialize RPC") } - orchestrator, err := orchestrator.NewOrchestrator(*rpc) + orchestrator, err := orchestrator.NewOrchestrator(rpc) if err != nil { log.Fatal().Err(err).Msg("Failed to create orchestrator") } diff --git a/internal/orchestrator/chain_tracker.go b/internal/orchestrator/chain_tracker.go index a537028..ec1171b 100644 --- a/internal/orchestrator/chain_tracker.go +++ b/internal/orchestrator/chain_tracker.go @@ -1,7 +1,6 @@ package orchestrator import ( - "context" "time" "github.com/rs/zerolog/log" @@ -12,11 +11,11 @@ import ( const DEFAULT_CHAIN_TRACKER_POLL_INTERVAL = 300000 // 5 minutes type ChainTracker struct { - rpc rpc.Client + rpc rpc.IRPCClient triggerIntervalMs int } -func NewChainTracker(rpc rpc.Client) *ChainTracker { +func NewChainTracker(rpc rpc.IRPCClient) *ChainTracker { return &ChainTracker{ rpc: rpc, triggerIntervalMs: DEFAULT_CHAIN_TRACKER_POLL_INTERVAL, @@ -30,23 +29,16 @@ func (ct *ChainTracker) Start() { log.Debug().Msgf("Chain tracker running") go func() { for range ticker.C { - latestBlockNumber, err := ct.getLatestBlockNumber() + latestBlockNumber, err := ct.rpc.GetLatestBlockNumber() if err != nil { log.Error().Err(err).Msg("Error getting latest block number") continue } - metrics.ChainHead.Set(float64(latestBlockNumber) / 100) + latestBlockNumberFloat, _ := latestBlockNumber.Float64() + metrics.ChainHead.Set(latestBlockNumberFloat) } }() // Keep the program running (otherwise it will exit) select {} } - -func (ct *ChainTracker) getLatestBlockNumber() (uint64, error) { - blockNumber, err := ct.rpc.EthClient.BlockNumber(context.Background()) - if err != nil { - return 0, err - } - return blockNumber, nil -} diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index f6171e0..098da0e 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -22,10 +22,10 @@ type Committer struct { blocksPerCommit int storage storage.IStorage pollFromBlock *big.Int - rpc rpc.Client + rpc rpc.IRPCClient } -func NewCommitter(rpc rpc.Client, storage storage.IStorage) *Committer { +func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage) *Committer { triggerInterval := config.Cfg.Committer.Interval if triggerInterval == 0 { triggerInterval = DEFAULT_COMMITTER_TRIGGER_INTERVAL @@ -71,7 +71,7 @@ func (c *Committer) Start() { } func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) { - latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.ChainID) + latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID()) log.Info().Msgf("Committer found this max block number in main storage: %s", latestCommittedBlockNumber.String()) if err != nil { return nil, err @@ -103,7 +103,7 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error return nil, nil } - blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{BlockNumbers: blocksToCommit, ChainId: c.rpc.ChainID}) + blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{BlockNumbers: blocksToCommit, ChainId: c.rpc.GetChainID()}) if err != nil { return nil, fmt.Errorf("error fetching blocks to commit: %v", err) } @@ -180,7 +180,7 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc } log.Debug().Msgf("Detected %d missing blocks between blocks %s and %s", missingBlockCount, expectedStartBlockNumber.String(), actualFirstBlock.Number.String()) - existingBlockFailures, err := c.storage.OrchestratorStorage.GetBlockFailures(storage.QueryFilter{BlockNumbers: missingBlockNumbers, ChainId: c.rpc.ChainID}) + existingBlockFailures, err := c.storage.OrchestratorStorage.GetBlockFailures(storage.QueryFilter{BlockNumbers: missingBlockNumbers, ChainId: c.rpc.GetChainID()}) if err != nil { return fmt.Errorf("error getting block failures while handling gap: %v", err) } @@ -197,7 +197,7 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc if _, ok := existingBlockFailuresMap[blockNumberStr]; !ok { blockFailures = append(blockFailures, common.BlockFailure{ BlockNumber: blockNumber, - ChainId: c.rpc.ChainID, + ChainId: c.rpc.GetChainID(), FailureTime: time.Now(), FailureCount: 1, FailureReason: "Gap detected for this block", diff --git a/internal/orchestrator/failure_recoverer.go b/internal/orchestrator/failure_recoverer.go index a8d25cd..900b832 100644 --- a/internal/orchestrator/failure_recoverer.go +++ b/internal/orchestrator/failure_recoverer.go @@ -21,10 +21,10 @@ type FailureRecoverer struct { failuresPerPoll int triggerIntervalMs int storage storage.IStorage - rpc rpc.Client + rpc rpc.IRPCClient } -func NewFailureRecoverer(rpc rpc.Client, storage storage.IStorage) *FailureRecoverer { +func NewFailureRecoverer(rpc rpc.IRPCClient, storage storage.IStorage) *FailureRecoverer { failuresPerPoll := config.Cfg.FailureRecoverer.BlocksPerRun if failuresPerPoll == 0 { failuresPerPoll = DEFAULT_FAILURES_PER_POLL @@ -49,7 +49,7 @@ func (fr *FailureRecoverer) Start() { go func() { for range ticker.C { blockFailures, err := fr.storage.OrchestratorStorage.GetBlockFailures(storage.QueryFilter{ - ChainId: fr.rpc.ChainID, + ChainId: fr.rpc.GetChainID(), Limit: fr.failuresPerPoll, }) if err != nil { @@ -101,7 +101,7 @@ func (fr *FailureRecoverer) handleWorkerResults(blockFailures []common.BlockFail BlockNumber: result.BlockNumber, FailureReason: result.Error.Error(), FailureTime: time.Now(), - ChainId: fr.rpc.ChainID, + ChainId: fr.rpc.GetChainID(), FailureCount: failureCount, }) } else { diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index 41eadab..d3b2058 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -9,7 +9,7 @@ import ( ) type Orchestrator struct { - rpc rpc.Client + rpc rpc.IRPCClient storage storage.IStorage pollerEnabled bool failureRecovererEnabled bool @@ -17,7 +17,7 @@ type Orchestrator struct { reorgHandlerEnabled bool } -func NewOrchestrator(rpc rpc.Client) (*Orchestrator, error) { +func NewOrchestrator(rpc rpc.IRPCClient) (*Orchestrator, error) { storage, err := storage.NewStorageConnector(&config.Cfg.Storage) if err != nil { return nil, err diff --git a/internal/orchestrator/poller.go b/internal/orchestrator/poller.go index fa8326a..827ca55 100644 --- a/internal/orchestrator/poller.go +++ b/internal/orchestrator/poller.go @@ -1,7 +1,6 @@ package orchestrator import ( - "context" "fmt" "math/big" "sync" @@ -20,7 +19,7 @@ const DEFAULT_BLOCKS_PER_POLL = 10 const DEFAULT_TRIGGER_INTERVAL = 1000 type Poller struct { - rpc rpc.Client + rpc rpc.IRPCClient blocksPerPoll int64 triggerIntervalMs int64 storage storage.IStorage @@ -33,7 +32,7 @@ type BlockNumberWithError struct { Error error } -func NewPoller(rpc rpc.Client, storage storage.IStorage) *Poller { +func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller { blocksPerPoll := config.Cfg.Poller.BlocksPerPoll if blocksPerPoll == 0 { blocksPerPoll = DEFAULT_BLOCKS_PER_POLL @@ -44,7 +43,7 @@ func NewPoller(rpc rpc.Client, storage storage.IStorage) *Poller { } untilBlock := big.NewInt(int64(config.Cfg.Poller.UntilBlock)) pollFromBlock := big.NewInt(int64(config.Cfg.Poller.FromBlock)) - lastPolledBlock, err := storage.StagingStorage.GetLastStagedBlockNumber(rpc.ChainID, untilBlock) + lastPolledBlock, err := storage.StagingStorage.GetLastStagedBlockNumber(rpc.GetChainID(), untilBlock) if err != nil || lastPolledBlock == nil || lastPolledBlock.Sign() <= 0 { lastPolledBlock = new(big.Int).Sub(pollFromBlock, big.NewInt(1)) // needs to include the first block log.Warn().Err(err).Msgf("No last polled block found, setting to %s", lastPolledBlock.String()) @@ -124,7 +123,7 @@ func (p *Poller) reachedPollLimit(blockNumber *big.Int) bool { } func (p *Poller) getBlockRange() ([]*big.Int, error) { - latestBlock, err := p.getLatestBlockNumber() + latestBlock, err := p.rpc.GetLatestBlockNumber() if err != nil { return nil, err } @@ -150,14 +149,6 @@ func (p *Poller) getBlockRange() ([]*big.Int, error) { return blockNumbers, nil } -func (p *Poller) getLatestBlockNumber() (*big.Int, error) { - latestBlockUint64, err := p.rpc.EthClient.BlockNumber(context.Background()) - if err != nil { - return nil, fmt.Errorf("failed to get latest block number: %v", err) - } - return new(big.Int).SetUint64(latestBlockUint64), nil -} - func (p *Poller) getEndBlockForRange(startBlock *big.Int, latestBlock *big.Int) *big.Int { endBlock := new(big.Int).Add(startBlock, big.NewInt(p.blocksPerPoll-1)) if endBlock.Cmp(latestBlock) > 0 { @@ -217,7 +208,7 @@ func (p *Poller) handleBlockFailures(results []rpc.GetFullBlockResult) { BlockNumber: result.BlockNumber, FailureReason: result.Error.Error(), FailureTime: time.Now(), - ChainId: p.rpc.ChainID, + ChainId: p.rpc.GetChainID(), FailureCount: 1, }) } diff --git a/internal/orchestrator/reorg_handler.go b/internal/orchestrator/reorg_handler.go index 0a7309e..412dfe7 100644 --- a/internal/orchestrator/reorg_handler.go +++ b/internal/orchestrator/reorg_handler.go @@ -15,7 +15,7 @@ import ( ) type ReorgHandler struct { - rpc rpc.Client + rpc rpc.IRPCClient storage storage.IStorage triggerInterval int blocksPerScan int @@ -26,7 +26,7 @@ type ReorgHandler struct { const DEFAULT_REORG_HANDLER_INTERVAL = 1000 const DEFAULT_REORG_HANDLER_BLOCKS_PER_SCAN = 100 -func NewReorgHandler(rpc rpc.Client, storage storage.IStorage) *ReorgHandler { +func NewReorgHandler(rpc rpc.IRPCClient, storage storage.IStorage) *ReorgHandler { triggerInterval := config.Cfg.ReorgHandler.Interval if triggerInterval == 0 { triggerInterval = DEFAULT_REORG_HANDLER_INTERVAL @@ -41,7 +41,7 @@ func NewReorgHandler(rpc rpc.Client, storage storage.IStorage) *ReorgHandler { worker: worker.NewWorker(rpc), triggerInterval: triggerInterval, blocksPerScan: blocksPerScan, - lastCheckedBlock: getInitialCheckedBlockNumber(storage, rpc.ChainID), + lastCheckedBlock: getInitialCheckedBlockNumber(storage, rpc.GetChainID()), } } @@ -72,7 +72,7 @@ func (rh *ReorgHandler) Start() { go func() { for range ticker.C { lookbackFrom := new(big.Int).Add(rh.lastCheckedBlock, big.NewInt(int64(rh.blocksPerScan))) - blockHeaders, err := rh.storage.MainStorage.LookbackBlockHeaders(rh.rpc.ChainID, rh.blocksPerScan, lookbackFrom) + blockHeaders, err := rh.storage.MainStorage.LookbackBlockHeaders(rh.rpc.GetChainID(), rh.blocksPerScan, lookbackFrom) if err != nil { log.Error().Err(err).Msg("Error getting recent block headers") continue @@ -85,7 +85,7 @@ func (rh *ReorgHandler) Start() { reorgEndIndex := findReorgEndIndex(blockHeaders) if reorgEndIndex == -1 { rh.lastCheckedBlock = mostRecentBlockHeader.Number - rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber(rh.rpc.ChainID, mostRecentBlockHeader.Number) + rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber(rh.rpc.GetChainID(), mostRecentBlockHeader.Number) metrics.ReorgHandlerLastCheckedBlock.Set(float64(mostRecentBlockHeader.Number.Int64())) continue } @@ -101,7 +101,7 @@ func (rh *ReorgHandler) Start() { continue } rh.lastCheckedBlock = mostRecentBlockHeader.Number - rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber(rh.rpc.ChainID, mostRecentBlockHeader.Number) + rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber(rh.rpc.GetChainID(), mostRecentBlockHeader.Number) metrics.ReorgHandlerLastCheckedBlock.Set(float64(mostRecentBlockHeader.Number.Int64())) } }() @@ -147,7 +147,7 @@ func (rh *ReorgHandler) findForkPoint(reversedBlockHeaders []common.BlockHeader) } } lookbackFrom := reversedBlockHeaders[len(reversedBlockHeaders)-1].Number - nextHeadersBatch, err := rh.storage.MainStorage.LookbackBlockHeaders(rh.rpc.ChainID, rh.blocksPerScan, lookbackFrom) + nextHeadersBatch, err := rh.storage.MainStorage.LookbackBlockHeaders(rh.rpc.GetChainID(), rh.blocksPerScan, lookbackFrom) if err != nil { return nil, fmt.Errorf("error getting next headers batch: %w", err) } @@ -190,7 +190,7 @@ func (rh *ReorgHandler) handleReorg(reorgStart *big.Int, reorgEnd *big.Int) erro }) } // TODO make delete and insert atomic - if err := rh.storage.MainStorage.DeleteBlockData(rh.rpc.ChainID, blockRange); err != nil { + if err := rh.storage.MainStorage.DeleteBlockData(rh.rpc.GetChainID(), blockRange); err != nil { return fmt.Errorf("error deleting data for blocks %v: %w", blockRange, err) } if err := rh.storage.MainStorage.InsertBlockData(&data); err != nil { diff --git a/internal/rpc/rpc.go b/internal/rpc/rpc.go index 10d283e..b2186a3 100644 --- a/internal/rpc/rpc.go +++ b/internal/rpc/rpc.go @@ -32,17 +32,28 @@ type BlocksPerRequestConfig struct { Traces int } +type IRPCClient interface { + GetFullBlocks(blockNumbers []*big.Int) []GetFullBlockResult + GetBlocks(blockNumbers []*big.Int) []GetBlocksResult + GetLatestBlockNumber() (*big.Int, error) + GetChainID() *big.Int + GetURL() string + GetBlocksPerRequest() BlocksPerRequestConfig + IsWebsocket() bool + SupportsTraceBlock() bool +} + type Client struct { RPCClient *gethRpc.Client EthClient *ethclient.Client - SupportsTraceBlock bool - IsWebsocket bool - URL string - ChainID *big.Int - BlocksPerRequest BlocksPerRequestConfig + supportsTraceBlock bool + isWebsocket bool + url string + chainID *big.Int + blocksPerRequest BlocksPerRequestConfig } -func Initialize() (*Client, error) { +func Initialize() (IRPCClient, error) { rpcUrl := config.Cfg.RPC.URL if rpcUrl == "" { return nil, fmt.Errorf("RPC_URL environment variable is not set") @@ -58,9 +69,9 @@ func Initialize() (*Client, error) { rpc := &Client{ RPCClient: rpcClient, EthClient: ethClient, - URL: rpcUrl, - IsWebsocket: strings.HasPrefix(rpcUrl, "ws://") || strings.HasPrefix(rpcUrl, "wss://"), - BlocksPerRequest: GetBlockPerRequestConfig(), + url: rpcUrl, + isWebsocket: strings.HasPrefix(rpcUrl, "ws://") || strings.HasPrefix(rpcUrl, "wss://"), + blocksPerRequest: GetBlockPerRequestConfig(), } checkErr := rpc.checkSupportedMethods() if checkErr != nil { @@ -71,7 +82,27 @@ func Initialize() (*Client, error) { if chainIdErr != nil { return nil, chainIdErr } - return rpc, nil + return IRPCClient(rpc), nil +} + +func (rpc *Client) GetChainID() *big.Int { + return rpc.chainID +} + +func (rpc *Client) GetURL() string { + return rpc.url +} + +func (rpc *Client) GetBlocksPerRequest() BlocksPerRequestConfig { + return rpc.blocksPerRequest +} + +func (rpc *Client) IsWebsocket() bool { + return rpc.isWebsocket +} + +func (rpc *Client) SupportsTraceBlock() bool { + return rpc.supportsTraceBlock } func (rpc *Client) Close() { @@ -100,8 +131,8 @@ func (rpc *Client) checkSupportedMethods() error { log.Warn().Err(traceBlockErr).Msg("Optional method trace_block not supported") } } - rpc.SupportsTraceBlock = traceBlockResult != nil - log.Debug().Msgf("trace_block method supported: %v", rpc.SupportsTraceBlock) + rpc.supportsTraceBlock = traceBlockResult != nil + log.Debug().Msgf("trace_block method supported: %v", rpc.supportsTraceBlock) return nil } @@ -110,7 +141,7 @@ func (rpc *Client) setChainID() error { if err != nil { return fmt.Errorf("failed to get chain ID: %v", err) } - rpc.ChainID = chainID + rpc.chainID = chainID return nil } @@ -129,20 +160,20 @@ func (rpc *Client) GetFullBlocks(blockNumbers []*big.Int) []GetFullBlockResult { go func() { defer wg.Done() - logs = RPCFetchInBatches[common.RawLogs](rpc, blockNumbers, rpc.BlocksPerRequest.Logs, config.Cfg.RPC.Logs.BatchDelay, "eth_getLogs", GetLogsParams) + logs = RPCFetchInBatches[common.RawLogs](rpc, blockNumbers, rpc.blocksPerRequest.Logs, config.Cfg.RPC.Logs.BatchDelay, "eth_getLogs", GetLogsParams) }() - if rpc.SupportsTraceBlock { + if rpc.supportsTraceBlock { wg.Add(1) go func() { defer wg.Done() - traces = RPCFetchInBatches[common.RawTraces](rpc, blockNumbers, rpc.BlocksPerRequest.Traces, config.Cfg.RPC.Traces.BatchDelay, "trace_block", TraceBlockParams) + traces = RPCFetchInBatches[common.RawTraces](rpc, blockNumbers, rpc.blocksPerRequest.Traces, config.Cfg.RPC.Traces.BatchDelay, "trace_block", TraceBlockParams) }() } wg.Wait() - return SerializeFullBlocks(rpc.ChainID, blocks, logs, traces) + return SerializeFullBlocks(rpc.chainID, blocks, logs, traces) } func (rpc *Client) GetBlocks(blockNumbers []*big.Int) []GetBlocksResult { @@ -157,5 +188,13 @@ func (rpc *Client) GetBlocks(blockNumbers []*big.Int) []GetBlocksResult { }() wg.Wait() - return SerializeBlocks(rpc.ChainID, blocks) + return SerializeBlocks(rpc.chainID, blocks) +} + +func (rpc *Client) GetLatestBlockNumber() (*big.Int, error) { + blockNumber, err := rpc.EthClient.BlockNumber(context.Background()) + if err != nil { + return nil, fmt.Errorf("failed to get latest block number: %v", err) + } + return new(big.Int).SetUint64(blockNumber), nil } diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 8298cf1..7394480 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -13,10 +13,10 @@ import ( ) type Worker struct { - rpc rpc.Client + rpc rpc.IRPCClient } -func NewWorker(rpc rpc.Client) *Worker { +func NewWorker(rpc rpc.IRPCClient) *Worker { return &Worker{ rpc: rpc, } @@ -24,12 +24,12 @@ func NewWorker(rpc rpc.Client) *Worker { func (w *Worker) Run(blockNumbers []*big.Int) []rpc.GetFullBlockResult { blockCount := len(blockNumbers) - chunks := common.BigIntSliceToChunks(blockNumbers, w.rpc.BlocksPerRequest.Blocks) + chunks := common.BigIntSliceToChunks(blockNumbers, w.rpc.GetBlocksPerRequest().Blocks) var wg sync.WaitGroup resultsCh := make(chan []rpc.GetFullBlockResult, len(chunks)) - log.Debug().Msgf("Worker Processing %d blocks in %d chunks of max %d blocks", blockCount, len(chunks), w.rpc.BlocksPerRequest.Blocks) + log.Debug().Msgf("Worker Processing %d blocks in %d chunks of max %d blocks", blockCount, len(chunks), w.rpc.GetBlocksPerRequest().Blocks) for _, chunk := range chunks { wg.Add(1) go func(chunk []*big.Int) {