From 0f624ef651a876e897c8e16901ef938f6397c3a8 Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Wed, 11 Dec 2024 23:22:40 -0600 Subject: [PATCH 1/4] refactor: remove unused code --- pkg/indexer/transactionLogs.go | 66 ---------------------------------- 1 file changed, 66 deletions(-) diff --git a/pkg/indexer/transactionLogs.go b/pkg/indexer/transactionLogs.go index dbcf3302..e4e74304 100644 --- a/pkg/indexer/transactionLogs.go +++ b/pkg/indexer/transactionLogs.go @@ -2,7 +2,6 @@ package indexer import ( "encoding/hex" - "encoding/json" "errors" "fmt" "github.com/Layr-Labs/sidecar/pkg/clients/ethereum" @@ -109,71 +108,6 @@ func (idx *Indexer) ParseTransactionLogs( WithTransactionHash(transaction.Hash.Value()). WithMetadata("contractAddress", contractAddress.Value()) } - - // First, attempt to decode the transaction input - txInput := transaction.Input.Value() - if len(txInput) >= 10 { - var method *abi.Method - decodedSig, err := hex.DecodeString(txInput[2:10]) - if err != nil { - idx.Logger.Sugar().Errorw("Failed to decode signature") - return nil, NewIndexError(IndexError_FailedToParseTransaction, err). - WithMessage("Failed to decode signature"). - WithBlockNumber(transaction.BlockNumber.Value()). - WithTransactionHash(transaction.Hash.Value()). - WithMetadata("contractAddress", contractAddress.Value()) - } - - if len(decodedSig) > 0 { - method, err = a.MethodById(decodedSig) - if err != nil { - msg := fmt.Sprintf("Failed to find method by ID '%s'", common.BytesToHash(decodedSig).String()) - idx.Logger.Sugar().Debugw(msg) - return nil, NewIndexError(IndexError_FailedToParseTransaction, err). - WithMessage(msg). - WithBlockNumber(transaction.BlockNumber.Value()). - WithTransactionHash(transaction.Hash.Value()). - WithMetadata("contractAddress", contractAddress.Value()) - } else { - parsedTransaction.MethodName = method.RawName - decodedData, err := hex.DecodeString(txInput[10:]) - if err != nil { - idx.Logger.Sugar().Errorw("Failed to decode transaction input data") - return nil, NewIndexError(IndexError_FailedToParseTransaction, err). - WithMessage("Failed to decode transaction input data"). - WithBlockNumber(transaction.BlockNumber.Value()). - WithTransactionHash(transaction.Hash.Value()). - WithMetadata("contractAddress", contractAddress.Value()) - } else { - callMap := map[string]interface{}{} - if err := method.Inputs.UnpackIntoMap(callMap, decodedData); err != nil { - idx.Logger.Sugar().Errorw("Failed to unpack data", - zap.Error(err), - zap.String("transactionHash", transaction.Hash.Value()), - zap.Uint64("blockNumber", transaction.BlockNumber.Value()), - ) - return nil, NewIndexError(IndexError_FailedToParseTransaction, err). - WithMessage("Failed to unpack data"). - WithBlockNumber(transaction.BlockNumber.Value()). - WithTransactionHash(transaction.Hash.Value()). - WithMetadata("contractAddress", contractAddress.Value()) - } - callMapBytes, err := json.Marshal(callMap) - if err != nil { - idx.Logger.Sugar().Errorw("Failed to marshal callMap data", zap.String("hash", transaction.Hash.Value())) - return nil, NewIndexError(IndexError_FailedToParseTransaction, err). - WithMessage("Failed to marshal callMap data"). - WithBlockNumber(transaction.BlockNumber.Value()). - WithTransactionHash(transaction.Hash.Value()). - WithMetadata("contractAddress", contractAddress.Value()) - } - parsedTransaction.DecodedData = string(callMapBytes) - } - } - } - } else { - idx.Logger.Sugar().Debugw(fmt.Sprintf("Transaction input is empty %s", contractAddress)) - } } else { idx.Logger.Sugar().Debugw("Base transaction is not interesting", zap.String("hash", transaction.Hash.Value()), From 56740f0d637e648a50b6df8b13145b8cbf9378d0 Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Wed, 11 Dec 2024 23:23:13 -0600 Subject: [PATCH 2/4] refactor: cleanup and improve sync progress tracking --- pkg/sidecar/blockIndexer.go | 112 ++++++++++++++++++++---------------- 1 file changed, 63 insertions(+), 49 deletions(-) diff --git a/pkg/sidecar/blockIndexer.go b/pkg/sidecar/blockIndexer.go index 1caed3c8..5d58bc4f 100644 --- a/pkg/sidecar/blockIndexer.go +++ b/pkg/sidecar/blockIndexer.go @@ -90,6 +90,54 @@ func (s *Sidecar) ProcessNewBlocks(ctx context.Context) error { } } +type Progress struct { + StartBlock uint64 + LastBlockProcessed uint64 + CurrentTip *atomic.Uint64 + AvgPerBlockMs float64 + StartTime time.Time + TotalDurationMs int64 + logger *zap.Logger +} + +func NewProgress(startBlock uint64, currentTip *atomic.Uint64, l *zap.Logger) *Progress { + return &Progress{ + StartBlock: startBlock, + LastBlockProcessed: startBlock, + CurrentTip: currentTip, + AvgPerBlockMs: 0, + StartTime: time.Now(), + logger: l, + } +} + +func (p *Progress) UpdateAndPrintProgress(lastBlockProcessed uint64) { + p.LastBlockProcessed = lastBlockProcessed + + blocksProcessed := lastBlockProcessed - p.StartBlock + currentTip := p.CurrentTip.Load() + totalBlocksToProcess := currentTip - p.StartBlock + blocksRemaining := currentTip - lastBlockProcessed + + if blocksProcessed == 0 || totalBlocksToProcess == 0 { + return + } + + pctComplete := (float64(blocksProcessed) / float64(totalBlocksToProcess)) * 100 + + runningAvg := time.Since(p.StartTime).Milliseconds() / int64(blocksProcessed) + + estTimeRemainingHours := float64(runningAvg*int64(blocksRemaining)) / 1000 / 60 / 60 + + p.logger.Sugar().Infow("Progress", + zap.String("percentComplete", fmt.Sprintf("%.2f", pctComplete)), + zap.Uint64("blocksRemaining", blocksRemaining), + zap.Float64("estimatedTimeRemaining (hrs)", estTimeRemainingHours), + zap.Float64("avgBlockProcessTime (ms)", float64(runningAvg)), + zap.Uint64("currentBlock", uint64(lastBlockProcessed)), + ) +} + func (s *Sidecar) IndexFromCurrentToTip(ctx context.Context) error { lastIndexedBlock, err := s.GetLastIndexedBlock() if err != nil { @@ -156,21 +204,19 @@ func (s *Sidecar) IndexFromCurrentToTip(ctx context.Context) error { for retryCount < 3 { // Get the latest safe block as a starting point - latestSafeBlockNumber, err := s.EthereumClient.GetLatestSafeBlock(ctx) + latestSafe, err := s.EthereumClient.GetLatestSafeBlock(ctx) if err != nil { s.Logger.Sugar().Fatalw("Failed to get current tip", zap.Error(err)) } + s.Logger.Sugar().Infow("Current tip", zap.Uint64("currentTip", latestSafe)) - s.Logger.Sugar().Infow("Starting indexing process", - zap.Int64("latestBlock", lastIndexedBlock), - zap.Uint64("currentTip", latestSafeBlockNumber), - ) - - if latestSafeBlockNumber >= uint64(lastIndexedBlock) { + if latestSafe >= uint64(lastIndexedBlock) { + s.Logger.Sugar().Infow("Current tip is greater than latest block, starting indexing process", zap.Uint64("currentTip", latestSafe)) + latestSafeBlockNumber = latestSafe break } - if latestSafeBlockNumber < uint64(lastIndexedBlock) { + if latestSafe < uint64(lastIndexedBlock) { if retryCount == 2 { s.Logger.Sugar().Fatalw("Current tip is less than latest block, but retry count is 2, exiting") return errors.New("Current tip is less than latest block, but retry count is 2, exiting") @@ -224,16 +270,12 @@ func (s *Sidecar) IndexFromCurrentToTip(ctx context.Context) error { } } }() - // Keep some metrics during the indexing process - blocksProcessed := int64(0) - runningAvg := float64(0) - totalDurationMs := int64(0) - - originalStartBlock := lastIndexedBlock //nolint:all currentBlock := lastIndexedBlock + progress := NewProgress(uint64(currentBlock), ¤tTip, s.Logger) + s.Logger.Sugar().Infow("Starting indexing process", zap.Int64("currentBlock", currentBlock), zap.Uint64("currentTip", currentTip.Load())) for uint64(currentBlock) <= currentTip.Load() { @@ -242,50 +284,22 @@ func (s *Sidecar) IndexFromCurrentToTip(ctx context.Context) error { return nil } tip := currentTip.Load() - blocksRemaining := tip - uint64(currentBlock) - totalBlocksToProcess := tip - uint64(originalStartBlock) - - var pctComplete float64 - if totalBlocksToProcess != 0 { - pctComplete = (float64(blocksProcessed) / float64(totalBlocksToProcess)) * 100 - } - - estTimeRemainingMs := runningAvg * float64(blocksRemaining) - estTimeRemainingHours := float64(estTimeRemainingMs) / 1000 / 60 / 60 - startTime := time.Now() - endBlock := int64(currentBlock + 100) - if endBlock > int64(tip) { - endBlock = int64(tip) + batchEndBlock := int64(currentBlock + 100) + if batchEndBlock > int64(tip) { + batchEndBlock = int64(tip) } - if err := s.Pipeline.RunForBlockBatch(ctx, uint64(currentBlock), uint64(endBlock), true); err != nil { + if err := s.Pipeline.RunForBlockBatch(ctx, uint64(currentBlock), uint64(batchEndBlock), true); err != nil { s.Logger.Sugar().Errorw("Failed to run pipeline for block batch", zap.Error(err), zap.Uint64("startBlock", uint64(currentBlock)), - zap.Int64("endBlock", endBlock), + zap.Int64("batchEndBlock", batchEndBlock), ) return err } + progress.UpdateAndPrintProgress(uint64(batchEndBlock)) - currentBlock = int64(endBlock) - delta := time.Since(startTime).Milliseconds() - blocksProcessed += (endBlock - currentBlock) - - totalDurationMs += delta - if blocksProcessed == 0 { - runningAvg = 0 - } else { - runningAvg = float64(totalDurationMs / blocksProcessed) - } - - s.Logger.Sugar().Infow("Progress", - zap.String("percentComplete", fmt.Sprintf("%.2f", pctComplete)), - zap.Uint64("blocksRemaining", blocksRemaining), - zap.Float64("estimatedTimeRemaining (hrs)", estTimeRemainingHours), - zap.Float64("avgBlockProcessTime (ms)", float64(runningAvg)), - zap.Uint64("currentBlock", uint64(currentBlock)), - ) - currentBlock = endBlock + 1 + currentBlock = batchEndBlock + 1 } return nil From ba61e84024dde531ef6665d15df30649d46fccaa Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Wed, 11 Dec 2024 23:38:49 -0600 Subject: [PATCH 3/4] refactor: rename confusing variable --- pkg/indexer/transactionLogs.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/indexer/transactionLogs.go b/pkg/indexer/transactionLogs.go index e4e74304..815ac07c 100644 --- a/pkg/indexer/transactionLogs.go +++ b/pkg/indexer/transactionLogs.go @@ -163,18 +163,17 @@ func (idx *Indexer) DecodeLogWithAbi( return idx.DecodeLog(a, lg) } else { idx.Logger.Sugar().Debugw("Log address does not match contract address", zap.String("logAddress", logAddress.String()), zap.String("contractAddress", txReceipt.GetTargetAddress().Value())) - // TODO - need a way to get the bytecode hash // Find/create the log address and attempt to determine if it is a proxy address - foundOrCreatedContract, err := idx.ContractManager.GetContractWithProxy(logAddress.String(), txReceipt.BlockNumber.Value()) + foundContract, err := idx.ContractManager.GetContractWithProxy(logAddress.String(), txReceipt.BlockNumber.Value()) if err != nil { return idx.DecodeLog(nil, lg) } - if foundOrCreatedContract == nil { + if foundContract == nil { idx.Logger.Sugar().Debugw("No contract found for address", zap.String("address", logAddress.String())) return idx.DecodeLog(nil, lg) } - contractAbi := foundOrCreatedContract.CombineAbis() + contractAbi := foundContract.CombineAbis() if err != nil { idx.Logger.Sugar().Errorw("Failed to combine ABIs", zap.Error(err), zap.String("contractAddress", logAddress.String())) return idx.DecodeLog(nil, lg) From fecbeaf11af96de7232bf7a3fc9c894d12966121 Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Thu, 12 Dec 2024 09:07:01 -0600 Subject: [PATCH 4/4] feat: add configurable pod restartPolicy --- charts/sidecar/Chart.yaml | 4 ++-- charts/sidecar/templates/sidecarStatefulSet.yaml | 1 + charts/sidecar/values.yaml | 1 + 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/charts/sidecar/Chart.yaml b/charts/sidecar/Chart.yaml index 72be72aa..585dffeb 100644 --- a/charts/sidecar/Chart.yaml +++ b/charts/sidecar/Chart.yaml @@ -4,6 +4,6 @@ description: A Helm chart for the EigenLayer sidecar type: application -version: 1.0.0-rc.4 +version: 1.0.0-rc.5 -appVersion: "v1.0.0-rc.6" +appVersion: "v1.0.0-rc.9" diff --git a/charts/sidecar/templates/sidecarStatefulSet.yaml b/charts/sidecar/templates/sidecarStatefulSet.yaml index 1b3f01c9..745ae073 100644 --- a/charts/sidecar/templates/sidecarStatefulSet.yaml +++ b/charts/sidecar/templates/sidecarStatefulSet.yaml @@ -21,6 +21,7 @@ spec: - name: {{ .Chart.Name }} image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" imagePullPolicy: {{ .Values.image.pullPolicy }} + restartPolicy: {{ .Values.sidecar.restartPolicy | default "Never"}} args: {{ .Values.sidecar.args | toJson}} {{- if .Values.sidecar.ports }} ports: diff --git a/charts/sidecar/values.yaml b/charts/sidecar/values.yaml index 32f575f6..b9d1ada7 100644 --- a/charts/sidecar/values.yaml +++ b/charts/sidecar/values.yaml @@ -5,6 +5,7 @@ image: sidecar: replicas: 1 + restartPolicy: "Never" nameOverride: '' additionalLabels: {} args: ["run"]