Skip to content

Commit

Permalink
refactor: remove dead code, improve progress tracking (#149)
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmcgary authored Dec 12, 2024
2 parents 9928590 + fecbeaf commit 7473a8e
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 121 deletions.
4 changes: 2 additions & 2 deletions charts/sidecar/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ description: A Helm chart for the EigenLayer sidecar

type: application

version: 1.0.0-rc.4
version: 1.0.0-rc.5

appVersion: "v1.0.0-rc.6"
appVersion: "v1.0.0-rc.9"
1 change: 1 addition & 0 deletions charts/sidecar/templates/sidecarStatefulSet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ spec:
- name: {{ .Chart.Name }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
restartPolicy: {{ .Values.sidecar.restartPolicy | default "Never"}}
args: {{ .Values.sidecar.args | toJson}}
{{- if .Values.sidecar.ports }}
ports:
Expand Down
1 change: 1 addition & 0 deletions charts/sidecar/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ image:

sidecar:
replicas: 1
restartPolicy: "Never"
nameOverride: ''
additionalLabels: {}
args: ["run"]
Expand Down
73 changes: 3 additions & 70 deletions pkg/indexer/transactionLogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package indexer

import (
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"github.com/Layr-Labs/sidecar/pkg/clients/ethereum"
Expand Down Expand Up @@ -109,71 +108,6 @@ func (idx *Indexer) ParseTransactionLogs(
WithTransactionHash(transaction.Hash.Value()).
WithMetadata("contractAddress", contractAddress.Value())
}

// First, attempt to decode the transaction input
txInput := transaction.Input.Value()
if len(txInput) >= 10 {
var method *abi.Method
decodedSig, err := hex.DecodeString(txInput[2:10])
if err != nil {
idx.Logger.Sugar().Errorw("Failed to decode signature")
return nil, NewIndexError(IndexError_FailedToParseTransaction, err).
WithMessage("Failed to decode signature").
WithBlockNumber(transaction.BlockNumber.Value()).
WithTransactionHash(transaction.Hash.Value()).
WithMetadata("contractAddress", contractAddress.Value())
}

if len(decodedSig) > 0 {
method, err = a.MethodById(decodedSig)
if err != nil {
msg := fmt.Sprintf("Failed to find method by ID '%s'", common.BytesToHash(decodedSig).String())
idx.Logger.Sugar().Debugw(msg)
return nil, NewIndexError(IndexError_FailedToParseTransaction, err).
WithMessage(msg).
WithBlockNumber(transaction.BlockNumber.Value()).
WithTransactionHash(transaction.Hash.Value()).
WithMetadata("contractAddress", contractAddress.Value())
} else {
parsedTransaction.MethodName = method.RawName
decodedData, err := hex.DecodeString(txInput[10:])
if err != nil {
idx.Logger.Sugar().Errorw("Failed to decode transaction input data")
return nil, NewIndexError(IndexError_FailedToParseTransaction, err).
WithMessage("Failed to decode transaction input data").
WithBlockNumber(transaction.BlockNumber.Value()).
WithTransactionHash(transaction.Hash.Value()).
WithMetadata("contractAddress", contractAddress.Value())
} else {
callMap := map[string]interface{}{}
if err := method.Inputs.UnpackIntoMap(callMap, decodedData); err != nil {
idx.Logger.Sugar().Errorw("Failed to unpack data",
zap.Error(err),
zap.String("transactionHash", transaction.Hash.Value()),
zap.Uint64("blockNumber", transaction.BlockNumber.Value()),
)
return nil, NewIndexError(IndexError_FailedToParseTransaction, err).
WithMessage("Failed to unpack data").
WithBlockNumber(transaction.BlockNumber.Value()).
WithTransactionHash(transaction.Hash.Value()).
WithMetadata("contractAddress", contractAddress.Value())
}
callMapBytes, err := json.Marshal(callMap)
if err != nil {
idx.Logger.Sugar().Errorw("Failed to marshal callMap data", zap.String("hash", transaction.Hash.Value()))
return nil, NewIndexError(IndexError_FailedToParseTransaction, err).
WithMessage("Failed to marshal callMap data").
WithBlockNumber(transaction.BlockNumber.Value()).
WithTransactionHash(transaction.Hash.Value()).
WithMetadata("contractAddress", contractAddress.Value())
}
parsedTransaction.DecodedData = string(callMapBytes)
}
}
}
} else {
idx.Logger.Sugar().Debugw(fmt.Sprintf("Transaction input is empty %s", contractAddress))
}
} else {
idx.Logger.Sugar().Debugw("Base transaction is not interesting",
zap.String("hash", transaction.Hash.Value()),
Expand Down Expand Up @@ -229,18 +163,17 @@ func (idx *Indexer) DecodeLogWithAbi(
return idx.DecodeLog(a, lg)
} else {
idx.Logger.Sugar().Debugw("Log address does not match contract address", zap.String("logAddress", logAddress.String()), zap.String("contractAddress", txReceipt.GetTargetAddress().Value()))
// TODO - need a way to get the bytecode hash
// Find/create the log address and attempt to determine if it is a proxy address
foundOrCreatedContract, err := idx.ContractManager.GetContractWithProxy(logAddress.String(), txReceipt.BlockNumber.Value())
foundContract, err := idx.ContractManager.GetContractWithProxy(logAddress.String(), txReceipt.BlockNumber.Value())
if err != nil {
return idx.DecodeLog(nil, lg)
}
if foundOrCreatedContract == nil {
if foundContract == nil {
idx.Logger.Sugar().Debugw("No contract found for address", zap.String("address", logAddress.String()))
return idx.DecodeLog(nil, lg)
}

contractAbi := foundOrCreatedContract.CombineAbis()
contractAbi := foundContract.CombineAbis()
if err != nil {
idx.Logger.Sugar().Errorw("Failed to combine ABIs", zap.Error(err), zap.String("contractAddress", logAddress.String()))
return idx.DecodeLog(nil, lg)
Expand Down
112 changes: 63 additions & 49 deletions pkg/sidecar/blockIndexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,54 @@ func (s *Sidecar) ProcessNewBlocks(ctx context.Context) error {
}
}

type Progress struct {
StartBlock uint64
LastBlockProcessed uint64
CurrentTip *atomic.Uint64
AvgPerBlockMs float64
StartTime time.Time
TotalDurationMs int64
logger *zap.Logger
}

func NewProgress(startBlock uint64, currentTip *atomic.Uint64, l *zap.Logger) *Progress {
return &Progress{
StartBlock: startBlock,
LastBlockProcessed: startBlock,
CurrentTip: currentTip,
AvgPerBlockMs: 0,
StartTime: time.Now(),
logger: l,
}
}

func (p *Progress) UpdateAndPrintProgress(lastBlockProcessed uint64) {
p.LastBlockProcessed = lastBlockProcessed

blocksProcessed := lastBlockProcessed - p.StartBlock
currentTip := p.CurrentTip.Load()
totalBlocksToProcess := currentTip - p.StartBlock
blocksRemaining := currentTip - lastBlockProcessed

if blocksProcessed == 0 || totalBlocksToProcess == 0 {
return
}

pctComplete := (float64(blocksProcessed) / float64(totalBlocksToProcess)) * 100

runningAvg := time.Since(p.StartTime).Milliseconds() / int64(blocksProcessed)

estTimeRemainingHours := float64(runningAvg*int64(blocksRemaining)) / 1000 / 60 / 60

p.logger.Sugar().Infow("Progress",
zap.String("percentComplete", fmt.Sprintf("%.2f", pctComplete)),
zap.Uint64("blocksRemaining", blocksRemaining),
zap.Float64("estimatedTimeRemaining (hrs)", estTimeRemainingHours),
zap.Float64("avgBlockProcessTime (ms)", float64(runningAvg)),
zap.Uint64("currentBlock", uint64(lastBlockProcessed)),
)
}

func (s *Sidecar) IndexFromCurrentToTip(ctx context.Context) error {
lastIndexedBlock, err := s.GetLastIndexedBlock()
if err != nil {
Expand Down Expand Up @@ -156,21 +204,19 @@ func (s *Sidecar) IndexFromCurrentToTip(ctx context.Context) error {

for retryCount < 3 {
// Get the latest safe block as a starting point
latestSafeBlockNumber, err := s.EthereumClient.GetLatestSafeBlock(ctx)
latestSafe, err := s.EthereumClient.GetLatestSafeBlock(ctx)
if err != nil {
s.Logger.Sugar().Fatalw("Failed to get current tip", zap.Error(err))
}
s.Logger.Sugar().Infow("Current tip", zap.Uint64("currentTip", latestSafe))

s.Logger.Sugar().Infow("Starting indexing process",
zap.Int64("latestBlock", lastIndexedBlock),
zap.Uint64("currentTip", latestSafeBlockNumber),
)

if latestSafeBlockNumber >= uint64(lastIndexedBlock) {
if latestSafe >= uint64(lastIndexedBlock) {
s.Logger.Sugar().Infow("Current tip is greater than latest block, starting indexing process", zap.Uint64("currentTip", latestSafe))
latestSafeBlockNumber = latestSafe
break
}

if latestSafeBlockNumber < uint64(lastIndexedBlock) {
if latestSafe < uint64(lastIndexedBlock) {
if retryCount == 2 {
s.Logger.Sugar().Fatalw("Current tip is less than latest block, but retry count is 2, exiting")
return errors.New("Current tip is less than latest block, but retry count is 2, exiting")
Expand Down Expand Up @@ -224,16 +270,12 @@ func (s *Sidecar) IndexFromCurrentToTip(ctx context.Context) error {
}
}
}()
// Keep some metrics during the indexing process
blocksProcessed := int64(0)
runningAvg := float64(0)
totalDurationMs := int64(0)

originalStartBlock := lastIndexedBlock

//nolint:all
currentBlock := lastIndexedBlock

progress := NewProgress(uint64(currentBlock), &currentTip, s.Logger)

s.Logger.Sugar().Infow("Starting indexing process", zap.Int64("currentBlock", currentBlock), zap.Uint64("currentTip", currentTip.Load()))

for uint64(currentBlock) <= currentTip.Load() {
Expand All @@ -242,50 +284,22 @@ func (s *Sidecar) IndexFromCurrentToTip(ctx context.Context) error {
return nil
}
tip := currentTip.Load()
blocksRemaining := tip - uint64(currentBlock)
totalBlocksToProcess := tip - uint64(originalStartBlock)

var pctComplete float64
if totalBlocksToProcess != 0 {
pctComplete = (float64(blocksProcessed) / float64(totalBlocksToProcess)) * 100
}

estTimeRemainingMs := runningAvg * float64(blocksRemaining)
estTimeRemainingHours := float64(estTimeRemainingMs) / 1000 / 60 / 60

startTime := time.Now()
endBlock := int64(currentBlock + 100)
if endBlock > int64(tip) {
endBlock = int64(tip)
batchEndBlock := int64(currentBlock + 100)
if batchEndBlock > int64(tip) {
batchEndBlock = int64(tip)
}
if err := s.Pipeline.RunForBlockBatch(ctx, uint64(currentBlock), uint64(endBlock), true); err != nil {
if err := s.Pipeline.RunForBlockBatch(ctx, uint64(currentBlock), uint64(batchEndBlock), true); err != nil {
s.Logger.Sugar().Errorw("Failed to run pipeline for block batch",
zap.Error(err),
zap.Uint64("startBlock", uint64(currentBlock)),
zap.Int64("endBlock", endBlock),
zap.Int64("batchEndBlock", batchEndBlock),
)
return err
}
progress.UpdateAndPrintProgress(uint64(batchEndBlock))

currentBlock = int64(endBlock)
delta := time.Since(startTime).Milliseconds()
blocksProcessed += (endBlock - currentBlock)

totalDurationMs += delta
if blocksProcessed == 0 {
runningAvg = 0
} else {
runningAvg = float64(totalDurationMs / blocksProcessed)
}

s.Logger.Sugar().Infow("Progress",
zap.String("percentComplete", fmt.Sprintf("%.2f", pctComplete)),
zap.Uint64("blocksRemaining", blocksRemaining),
zap.Float64("estimatedTimeRemaining (hrs)", estTimeRemainingHours),
zap.Float64("avgBlockProcessTime (ms)", float64(runningAvg)),
zap.Uint64("currentBlock", uint64(currentBlock)),
)
currentBlock = endBlock + 1
currentBlock = batchEndBlock + 1
}

return nil
Expand Down

0 comments on commit 7473a8e

Please sign in to comment.