Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: batch processing blocks from the poller channel #77

Merged
merged 1 commit into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 90 additions & 61 deletions finality-provider/service/fp_instance.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package service

import (
"encoding/hex"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -143,7 +142,7 @@ func (fp *FinalityProviderInstance) bootstrap() (uint64, error) {
return 0, err
}

if fp.checkLagging(latestBlockHeight) {
if fp.cfg.FastSyncInterval != 0 && fp.checkLagging(latestBlockHeight) {
_, err := fp.tryFastSync(latestBlockHeight)
if err != nil && !fpcc.IsExpected(err) {
return 0, err
Expand Down Expand Up @@ -186,55 +185,6 @@ func (fp *FinalityProviderInstance) finalitySigSubmissionLoop() {

for {
select {
case b := <-fp.poller.GetBlockInfoChan():
fp.logger.Debug(
"the finality-provider received a new block, start processing",
zap.String("pk", fp.GetBtcPkHex()),
zap.Uint64("height", b.Height),
zap.String("block_hash", hex.EncodeToString(b.Hash)),
)

// check whether the block has been processed before
if fp.hasProcessed(b.Height) {
continue
}
// check whether the finality provider has voting power
hasVp, err := fp.hasVotingPower(b.Height)
if err != nil {
fp.reportCriticalErr(err)
continue
}
if !hasVp {
// the finality provider does not have voting power
// and it will never will at this block
fp.MustSetLastProcessedHeight(b.Height)
fp.metrics.IncrementFpTotalBlocksWithoutVotingPower(fp.GetBtcPkHex())
continue
}
// use the copy of the block to avoid the impact to other receivers
nextBlock := *b
res, err := fp.retrySubmitFinalitySignatureUntilBlockFinalized(&nextBlock)
if err != nil {
fp.metrics.IncrementFpTotalFailedVotes(fp.GetBtcPkHex())
if !errors.Is(err, ErrFinalityProviderShutDown) {
fp.reportCriticalErr(err)
}
continue
}
if res == nil {
// this can happen when a finality signature is not needed
// either if the block is already submitted or the signature
// is already submitted
continue
}
fp.logger.Info(
"successfully submitted a finality signature to the consumer chain",
zap.String("consumer_id", string(fp.GetChainID())),
zap.String("pk", fp.GetBtcPkHex()),
zap.Uint64("height", b.Height),
zap.String("tx_hash", res.TxHash),
)

case targetBlock := <-fp.laggingTargetChan:
res, err := fp.tryFastSync(targetBlock)
fp.isLagging.Store(false)
Expand Down Expand Up @@ -272,10 +222,84 @@ func (fp *FinalityProviderInstance) finalitySigSubmissionLoop() {
case <-fp.quit:
fp.logger.Info("the finality signature submission loop is closing")
return
default:
pollerBlocks := fp.getAllBlocksFromChan()
if len(pollerBlocks) == 0 {
continue
}
targetHeight := pollerBlocks[len(pollerBlocks)-1].Height
fp.logger.Debug("the finality-provider received new block(s), start processing",
zap.String("pk", fp.GetBtcPkHex()),
zap.Uint64("start_height", pollerBlocks[0].Height),
zap.Uint64("end_height", targetHeight),
)
res, err := fp.retrySubmitFinalitySignatureUntilBlocksFinalized(pollerBlocks)
if err != nil {
fp.metrics.IncrementFpTotalFailedVotes(fp.GetBtcPkHex())
if !errors.Is(err, ErrFinalityProviderShutDown) {
fp.reportCriticalErr(err)
}
continue
}
if res == nil {
// this can happen when a finality signature is not needed
// either if the block is already submitted or the signature
// is already submitted
continue
}
fp.logger.Info(
"successfully submitted the finality signature to the consumer chain",
zap.String("consumer_id", string(fp.GetChainID())),
zap.String("pk", fp.GetBtcPkHex()),
zap.Uint64("start_height", pollerBlocks[0].Height),
zap.Uint64("end_height", targetHeight),
zap.String("tx_hash", res.TxHash),
)
}
}
}

func (fp *FinalityProviderInstance) getAllBlocksFromChan() []*types.BlockInfo {
var pollerBlocks []*types.BlockInfo
for {
select {
case b := <-fp.poller.GetBlockInfoChan():
shouldProcess, err := fp.shouldProcessBlock(b)
if err != nil {
if !errors.Is(err, ErrFinalityProviderShutDown) {
fp.reportCriticalErr(err)
}
break
}
if shouldProcess {
pollerBlocks = append(pollerBlocks, b)
}
default:
return pollerBlocks
}
}
}

func (fp *FinalityProviderInstance) shouldProcessBlock(b *types.BlockInfo) (bool, error) {
// check whether the block has been processed before
if fp.hasProcessed(b.Height) {
return false, nil
}
// check whether the finality provider has voting power
hasVp, err := fp.hasVotingPower(b.Height)
if err != nil {
return false, err
}
if !hasVp {
// the finality provider does not have voting power
// and it will never will at this block
fp.MustSetLastProcessedHeight(b.Height)
fp.metrics.IncrementFpTotalBlocksWithoutVotingPower(fp.GetBtcPkHex())
return false, nil
}
return true, nil
}

func (fp *FinalityProviderInstance) randomnessCommitmentLoop(startHeight uint64) {
defer fp.wg.Done()

Expand Down Expand Up @@ -471,23 +495,29 @@ func (fp *FinalityProviderInstance) checkLagging(currentBlockHeight uint64) bool
return currentBlockHeight >= fp.GetLastProcessedHeight()+fp.cfg.FastSyncGap
}

// retrySubmitFinalitySignatureUntilBlockFinalized periodically tries to submit finality signature until success or the block is finalized
// retrySubmitFinalitySignatureUntilBlocksFinalized periodically tries to submit finality signature until success or the block is finalized
// error will be returned if maximum retries have been reached or the query to the consumer chain fails
func (fp *FinalityProviderInstance) retrySubmitFinalitySignatureUntilBlockFinalized(targetBlock *types.BlockInfo) (*types.TxResponse, error) {
func (fp *FinalityProviderInstance) retrySubmitFinalitySignatureUntilBlocksFinalized(targetBlocks []*types.BlockInfo) (*types.TxResponse, error) {
var failedCycles uint32

targetHeight := targetBlocks[len(targetBlocks)-1].Height
// we break the for loop if the block is finalized or the signature is successfully submitted
// error will be returned if maximum retries have been reached or the query to the consumer chain fails
for {
// error will be returned if max retries have been reached
res, err := fp.SubmitFinalitySignature(targetBlock)
var res *types.TxResponse
var err error
if len(targetBlocks) == 1 {
res, err = fp.SubmitFinalitySignature(targetBlocks[0])
} else {
res, err = fp.SubmitBatchFinalitySignatures(targetBlocks)
}
if err != nil {

fp.logger.Debug(
"failed to submit finality signature to the consumer chain",
zap.String("pk", fp.GetBtcPkHex()),
zap.Uint32("current_failures", failedCycles),
zap.Uint64("target_block_height", targetBlock.Height),
zap.Uint64("target_start_height", targetBlocks[0].Height),
zap.Uint64("target_end_height", targetHeight),
zap.Error(err),
)

Expand All @@ -510,21 +540,20 @@ func (fp *FinalityProviderInstance) retrySubmitFinalitySignatureUntilBlockFinali
select {
case <-time.After(fp.cfg.SubmissionRetryInterval):
// periodically query the index block to be later checked whether it is Finalized
finalized, err := fp.consumerCon.QueryIsBlockFinalized(targetBlock.Height)
finalized, err := fp.consumerCon.QueryIsBlockFinalized(targetHeight)
if err != nil {
return nil, fmt.Errorf("failed to query block finalization at height %v: %w", targetBlock.Height, err)
return nil, fmt.Errorf("failed to query block finalization at height %v: %w", targetHeight, err)
}
if finalized {
fp.logger.Debug(
"the block is already finalized, skip submission",
zap.String("pk", fp.GetBtcPkHex()),
zap.Uint64("target_height", targetBlock.Height),
zap.Uint64("target_height", targetHeight),
)
// TODO: returning nil here is to safely break the loop
// the error still exists
return nil, nil
}

case <-fp.quit:
fp.logger.Debug("the finality-provider instance is closing", zap.String("pk", fp.GetBtcPkHex()))
return nil, ErrFinalityProviderShutDown
Expand Down
15 changes: 15 additions & 0 deletions itest/babylon/babylon_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,21 @@ func TestFinalityProviderLifeCycle(t *testing.T) {
lastVotedHeight := tm.WaitForFpVoteCast(t, fpIns)
tm.CheckBlockFinalization(t, lastVotedHeight, 1)
t.Logf("the block at height %v is finalized", lastVotedHeight)

// stop the FP for several blocks and disable fast sync, and then restart FP
// finality signature submission should get into the default case
var n uint = 3
tm.FpConfig.FastSyncInterval = 0
// finality signature submission would take about 5 seconds
// set the poll interval to 2 seconds to make sure the poller channel has multiple blocks
tm.FpConfig.PollerConfig.PollInterval = 2 * time.Second
tm.StopAndRestartFpAfterNBlocks(t, n, fpIns)

// wait for finality signature submission to run two times
time.Sleep(12 * time.Second)
lastProcessedHeight := fpIns.GetLastProcessedHeight()
require.True(t, lastProcessedHeight >= lastVotedHeight+uint64(n))
t.Logf("the last processed height is %v", lastProcessedHeight)
}

// TestDoubleSigning tests the attack scenario where the finality-provider
Expand Down
2 changes: 2 additions & 0 deletions itest/opstackl2/op_test_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,8 @@ func createBaseFpConfig(fpHomeDir string, index int, logger *zap.Logger) *fpcfg.
cfg.RandomnessCommitInterval = 2 * time.Second
cfg.NumPubRand = 64
cfg.MinRandHeightGap = 1000
cfg.FastSyncGap = 60
cfg.FastSyncLimit = 100
return cfg
}

Expand Down
Loading