Skip to content

Commit

Permalink
feat: batch processing blocks from the poller channel (#77)
Browse files Browse the repository at this point in the history
  • Loading branch information
lesterli authored Sep 30, 2024
1 parent 16e4013 commit 89d3fb0
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 61 deletions.
151 changes: 90 additions & 61 deletions finality-provider/service/fp_instance.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package service

import (
"encoding/hex"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -143,7 +142,7 @@ func (fp *FinalityProviderInstance) bootstrap() (uint64, error) {
return 0, err
}

if fp.checkLagging(latestBlockHeight) {
if fp.cfg.FastSyncInterval != 0 && fp.checkLagging(latestBlockHeight) {
_, err := fp.tryFastSync(latestBlockHeight)
if err != nil && !fpcc.IsExpected(err) {
return 0, err
Expand Down Expand Up @@ -186,55 +185,6 @@ func (fp *FinalityProviderInstance) finalitySigSubmissionLoop() {

for {
select {
case b := <-fp.poller.GetBlockInfoChan():
fp.logger.Debug(
"the finality-provider received a new block, start processing",
zap.String("pk", fp.GetBtcPkHex()),
zap.Uint64("height", b.Height),
zap.String("block_hash", hex.EncodeToString(b.Hash)),
)

// check whether the block has been processed before
if fp.hasProcessed(b.Height) {
continue
}
// check whether the finality provider has voting power
hasVp, err := fp.hasVotingPower(b.Height)
if err != nil {
fp.reportCriticalErr(err)
continue
}
if !hasVp {
// the finality provider does not have voting power
// and it will never will at this block
fp.MustSetLastProcessedHeight(b.Height)
fp.metrics.IncrementFpTotalBlocksWithoutVotingPower(fp.GetBtcPkHex())
continue
}
// use the copy of the block to avoid the impact to other receivers
nextBlock := *b
res, err := fp.retrySubmitFinalitySignatureUntilBlockFinalized(&nextBlock)
if err != nil {
fp.metrics.IncrementFpTotalFailedVotes(fp.GetBtcPkHex())
if !errors.Is(err, ErrFinalityProviderShutDown) {
fp.reportCriticalErr(err)
}
continue
}
if res == nil {
// this can happen when a finality signature is not needed
// either if the block is already submitted or the signature
// is already submitted
continue
}
fp.logger.Info(
"successfully submitted a finality signature to the consumer chain",
zap.String("consumer_id", string(fp.GetChainID())),
zap.String("pk", fp.GetBtcPkHex()),
zap.Uint64("height", b.Height),
zap.String("tx_hash", res.TxHash),
)

case targetBlock := <-fp.laggingTargetChan:
res, err := fp.tryFastSync(targetBlock)
fp.isLagging.Store(false)
Expand Down Expand Up @@ -272,10 +222,84 @@ func (fp *FinalityProviderInstance) finalitySigSubmissionLoop() {
case <-fp.quit:
fp.logger.Info("the finality signature submission loop is closing")
return
default:
pollerBlocks := fp.getAllBlocksFromChan()
if len(pollerBlocks) == 0 {
continue
}
targetHeight := pollerBlocks[len(pollerBlocks)-1].Height
fp.logger.Debug("the finality-provider received new block(s), start processing",
zap.String("pk", fp.GetBtcPkHex()),
zap.Uint64("start_height", pollerBlocks[0].Height),
zap.Uint64("end_height", targetHeight),
)
res, err := fp.retrySubmitFinalitySignatureUntilBlocksFinalized(pollerBlocks)
if err != nil {
fp.metrics.IncrementFpTotalFailedVotes(fp.GetBtcPkHex())
if !errors.Is(err, ErrFinalityProviderShutDown) {
fp.reportCriticalErr(err)
}
continue
}
if res == nil {
// this can happen when a finality signature is not needed
// either if the block is already submitted or the signature
// is already submitted
continue
}
fp.logger.Info(
"successfully submitted the finality signature to the consumer chain",
zap.String("consumer_id", string(fp.GetChainID())),
zap.String("pk", fp.GetBtcPkHex()),
zap.Uint64("start_height", pollerBlocks[0].Height),
zap.Uint64("end_height", targetHeight),
zap.String("tx_hash", res.TxHash),
)
}
}
}

func (fp *FinalityProviderInstance) getAllBlocksFromChan() []*types.BlockInfo {
var pollerBlocks []*types.BlockInfo
for {
select {
case b := <-fp.poller.GetBlockInfoChan():
shouldProcess, err := fp.shouldProcessBlock(b)
if err != nil {
if !errors.Is(err, ErrFinalityProviderShutDown) {
fp.reportCriticalErr(err)
}
break
}
if shouldProcess {
pollerBlocks = append(pollerBlocks, b)
}
default:
return pollerBlocks
}
}
}

func (fp *FinalityProviderInstance) shouldProcessBlock(b *types.BlockInfo) (bool, error) {
// check whether the block has been processed before
if fp.hasProcessed(b.Height) {
return false, nil
}
// check whether the finality provider has voting power
hasVp, err := fp.hasVotingPower(b.Height)
if err != nil {
return false, err
}
if !hasVp {
// the finality provider does not have voting power
// and it will never will at this block
fp.MustSetLastProcessedHeight(b.Height)
fp.metrics.IncrementFpTotalBlocksWithoutVotingPower(fp.GetBtcPkHex())
return false, nil
}
return true, nil
}

func (fp *FinalityProviderInstance) randomnessCommitmentLoop(startHeight uint64) {
defer fp.wg.Done()

Expand Down Expand Up @@ -471,23 +495,29 @@ func (fp *FinalityProviderInstance) checkLagging(currentBlockHeight uint64) bool
return currentBlockHeight >= fp.GetLastProcessedHeight()+fp.cfg.FastSyncGap
}

// retrySubmitFinalitySignatureUntilBlockFinalized periodically tries to submit finality signature until success or the block is finalized
// retrySubmitFinalitySignatureUntilBlocksFinalized periodically tries to submit finality signature until success or the block is finalized
// error will be returned if maximum retries have been reached or the query to the consumer chain fails
func (fp *FinalityProviderInstance) retrySubmitFinalitySignatureUntilBlockFinalized(targetBlock *types.BlockInfo) (*types.TxResponse, error) {
func (fp *FinalityProviderInstance) retrySubmitFinalitySignatureUntilBlocksFinalized(targetBlocks []*types.BlockInfo) (*types.TxResponse, error) {
var failedCycles uint32

targetHeight := targetBlocks[len(targetBlocks)-1].Height
// we break the for loop if the block is finalized or the signature is successfully submitted
// error will be returned if maximum retries have been reached or the query to the consumer chain fails
for {
// error will be returned if max retries have been reached
res, err := fp.SubmitFinalitySignature(targetBlock)
var res *types.TxResponse
var err error
if len(targetBlocks) == 1 {
res, err = fp.SubmitFinalitySignature(targetBlocks[0])
} else {
res, err = fp.SubmitBatchFinalitySignatures(targetBlocks)
}
if err != nil {

fp.logger.Debug(
"failed to submit finality signature to the consumer chain",
zap.String("pk", fp.GetBtcPkHex()),
zap.Uint32("current_failures", failedCycles),
zap.Uint64("target_block_height", targetBlock.Height),
zap.Uint64("target_start_height", targetBlocks[0].Height),
zap.Uint64("target_end_height", targetHeight),
zap.Error(err),
)

Expand All @@ -510,21 +540,20 @@ func (fp *FinalityProviderInstance) retrySubmitFinalitySignatureUntilBlockFinali
select {
case <-time.After(fp.cfg.SubmissionRetryInterval):
// periodically query the index block to be later checked whether it is Finalized
finalized, err := fp.consumerCon.QueryIsBlockFinalized(targetBlock.Height)
finalized, err := fp.consumerCon.QueryIsBlockFinalized(targetHeight)
if err != nil {
return nil, fmt.Errorf("failed to query block finalization at height %v: %w", targetBlock.Height, err)
return nil, fmt.Errorf("failed to query block finalization at height %v: %w", targetHeight, err)
}
if finalized {
fp.logger.Debug(
"the block is already finalized, skip submission",
zap.String("pk", fp.GetBtcPkHex()),
zap.Uint64("target_height", targetBlock.Height),
zap.Uint64("target_height", targetHeight),
)
// TODO: returning nil here is to safely break the loop
// the error still exists
return nil, nil
}

case <-fp.quit:
fp.logger.Debug("the finality-provider instance is closing", zap.String("pk", fp.GetBtcPkHex()))
return nil, ErrFinalityProviderShutDown
Expand Down
15 changes: 15 additions & 0 deletions itest/babylon/babylon_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,21 @@ func TestFinalityProviderLifeCycle(t *testing.T) {
lastVotedHeight := tm.WaitForFpVoteCast(t, fpIns)
tm.CheckBlockFinalization(t, lastVotedHeight, 1)
t.Logf("the block at height %v is finalized", lastVotedHeight)

// stop the FP for several blocks and disable fast sync, and then restart FP
// finality signature submission should get into the default case
var n uint = 3
tm.FpConfig.FastSyncInterval = 0
// finality signature submission would take about 5 seconds
// set the poll interval to 2 seconds to make sure the poller channel has multiple blocks
tm.FpConfig.PollerConfig.PollInterval = 2 * time.Second
tm.StopAndRestartFpAfterNBlocks(t, n, fpIns)

// wait for finality signature submission to run two times
time.Sleep(12 * time.Second)
lastProcessedHeight := fpIns.GetLastProcessedHeight()
require.True(t, lastProcessedHeight >= lastVotedHeight+uint64(n))
t.Logf("the last processed height is %v", lastProcessedHeight)
}

// TestDoubleSigning tests the attack scenario where the finality-provider
Expand Down
2 changes: 2 additions & 0 deletions itest/opstackl2/op_test_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,8 @@ func createBaseFpConfig(fpHomeDir string, index int, logger *zap.Logger) *fpcfg.
cfg.RandomnessCommitInterval = 2 * time.Second
cfg.NumPubRand = 64
cfg.MinRandHeightGap = 1000
cfg.FastSyncGap = 60
cfg.FastSyncLimit = 100
return cfg
}

Expand Down

0 comments on commit 89d3fb0

Please sign in to comment.