diff --git a/consensus/polybft/mocks_test.go b/consensus/polybft/mocks_test.go index c2e8c6424b..de1434ea51 100644 --- a/consensus/polybft/mocks_test.go +++ b/consensus/polybft/mocks_test.go @@ -395,6 +395,12 @@ func (tp *syncerMock) Sync(func(*types.FullBlock) bool) error { return args.Error(0) } +func (tp *syncerMock) IsSyncing() bool { + args := tp.Called() + + return args.Bool(0) +} + func init() { // setup custom hash header func setupHeaderHashFunc() diff --git a/consensus/polybft/polybft.go b/consensus/polybft/polybft.go index 0b85c59661..f9a46b63f5 100644 --- a/consensus/polybft/polybft.go +++ b/consensus/polybft/polybft.go @@ -599,12 +599,24 @@ func (p *Polybft) startConsensusProtocol() { if ev.Source == "syncer" && ev.NewChain[0].Number >= p.blockchain.CurrentHeader().Number { p.logger.Info("sync block notification received", "block height", ev.NewChain[0].Number, "current height", p.blockchain.CurrentHeader().Number) - syncerBlockCh <- struct{}{} + + select { + case syncerBlockCh <- struct{}{}: + default: + } } } } }() + // wait until he stops syncing + p.logger.Info("waiting to stop syncing so that we can try to join consensus if node is a validator") + + for p.syncer.IsSyncing() { + } + + p.logger.Info("node synced up on start. Trying to join consensus if validator") + var ( sequenceCh <-chan struct{} stopSequence func() diff --git a/e2e-polybft/e2e/consensus_test.go b/e2e-polybft/e2e/consensus_test.go index c7e6d82bd1..0599a4d564 100644 --- a/e2e-polybft/e2e/consensus_test.go +++ b/e2e-polybft/e2e/consensus_test.go @@ -1,6 +1,7 @@ package e2e import ( + "bytes" "fmt" "math/big" "path" @@ -59,22 +60,40 @@ func TestE2E_Consensus_Basic_WithNonValidators(t *testing.T) { }) t.Run("sync protocol, drop single validator node", func(t *testing.T) { + validatorSrv := cluster.Servers[0] + validatorAcc, err := sidechain.GetAccountFromDir(validatorSrv.DataDir()) + require.NoError(t, err) + // query the current block number, as it is a starting point for the test - currentBlockNum, err := cluster.Servers[0].JSONRPC().Eth().BlockNumber() + currentBlockNum, err := validatorSrv.JSONRPC().Eth().BlockNumber() require.NoError(t, err) + // wait for 2 epochs to elapse, before we stop the node + require.NoError(t, cluster.WaitForBlock(currentBlockNum+2*epochSize, 2*time.Minute)) + // stop one node - node := cluster.Servers[0] - node.Stop() + validatorSrv.Stop() + + // check what is the current block on the running nodes + currentBlockNum, err = cluster.Servers[1].JSONRPC().Eth().BlockNumber() + require.NoError(t, err) // wait for 2 epochs to elapse, so that rest of the network progresses require.NoError(t, cluster.WaitForBlock(currentBlockNum+2*epochSize, 2*time.Minute)) // start the node again - node.Start() + validatorSrv.Start() // wait 2 more epochs to elapse and make sure that stopped node managed to catch up require.NoError(t, cluster.WaitForBlock(currentBlockNum+4*epochSize, 2*time.Minute)) + + // wait until the validator mines one block to check if he is back in consensus + require.NoError(t, cluster.WaitUntil(3*time.Minute, 2*time.Second, func() bool { + latestBlock, err := cluster.Servers[0].JSONRPC().Eth().GetBlockByNumber(ethgo.Latest, false) + require.NoError(t, err) + + return bytes.Equal(validatorAcc.Address().Bytes(), latestBlock.Miner.Bytes()) + })) }) t.Run("sync protocol, drop single non-validator node", func(t *testing.T) { @@ -93,7 +112,12 @@ func TestE2E_Consensus_Basic_WithNonValidators(t *testing.T) { node.Start() // wait 2 more epochs to elapse and make sure that stopped node managed to catch up - require.NoError(t, cluster.WaitForBlock(currentBlockNum+4*epochSize, 2*time.Minute)) + blockToWait := currentBlockNum + 4*epochSize + require.NoError(t, cluster.WaitForBlock(blockToWait, 2*time.Minute)) + + latestBlockOnDroppedNode, err := node.JSONRPC().Eth().GetBlockByNumber(ethgo.Latest, false) + require.NoError(t, err) + require.GreaterOrEqual(t, latestBlockOnDroppedNode.Number, blockToWait) }) } diff --git a/syncer/syncer.go b/syncer/syncer.go index 5c20b52624..a7aa1ec922 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -272,6 +272,11 @@ func (s *syncer) bulkSyncWithPeer(peerID peer.ID, peerLatestBlock uint64, } } +// IsSyncing indicates if node is syncing with peer +func (s *syncer) IsSyncing() bool { + return s.GetSyncProgression() != nil +} + func updateMetrics(fullBlock *types.FullBlock) { metrics.SetGauge([]string{syncerMetrics, "tx_num"}, float32(len(fullBlock.Block.Transactions))) metrics.SetGauge([]string{syncerMetrics, "receipts_num"}, float32(len(fullBlock.Receipts))) diff --git a/syncer/types.go b/syncer/types.go index 9e3f81f36e..c8643d08ea 100644 --- a/syncer/types.go +++ b/syncer/types.go @@ -70,6 +70,8 @@ type Syncer interface { HasSyncPeer() bool // Sync starts routine to sync blocks Sync(func(*types.FullBlock) bool) error + // IsSyncing indicates if syncer is syncing with the best peer + IsSyncing() bool } type Progression interface {