diff --git a/internal/blocksync/pool.go b/internal/blocksync/pool.go index 6f87703e8..0cb9eab30 100644 --- a/internal/blocksync/pool.go +++ b/internal/blocksync/pool.go @@ -327,8 +327,11 @@ func (pool *BlockPool) AddBlock(peerID types.NodeID, block *types.Block, extComm if peer != nil { peer.decrPending(blockSize) } - } else if setBlockResult < 0 { - err := errors.New("bpr requester peer is different from original peer") + + // Increment the number of consecutive successful block syncs for the peer + pool.peerManager.IncrementBlockSyncs(peerID) + } else { + err := errors.New("requester is different or block already exists") pool.sendError(err, peerID) return fmt.Errorf("%w (peer: %s, requester: %s, block height: %d)", err, peerID, requester.getPeerID(), block.Height) } @@ -358,7 +361,6 @@ func (pool *BlockPool) SetPeerRange(peerID types.NodeID, base int64, height int6 blockSyncPeers := pool.peerManager.GetBlockSyncPeers() if len(blockSyncPeers) > 0 && !blockSyncPeers[peerID] { - pool.logger.Info(fmt.Sprintf("Skip adding peer %s for blocksync, num of blocksync peers: %d, num of pool peers: %d", peerID, len(blockSyncPeers), len(pool.peers))) return } diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index d2a5f1a73..becaef5f2 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -258,6 +258,10 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, blo return r.respondToPeer(ctx, msg, envelope.From, blockSyncCh) case *bcproto.BlockResponse: block, err := types.BlockFromProto(msg.Block) + + r.logger.Info("received block response from peer", + "peer", envelope.From, + "height", block.Height) if err != nil { r.logger.Error("failed to convert block from proto", "peer", envelope.From, @@ -495,6 +499,7 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh var ( trySyncTicker = time.NewTicker(trySyncIntervalMS * time.Millisecond) switchToConsensusTicker = time.NewTicker(switchToConsensusIntervalSeconds * time.Second) + lastApplyBlockTime = time.Now() blocksSynced = uint64(0) @@ -695,7 +700,11 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh // TODO: Same thing for app - but we would need a way to get the hash // without persisting the state. + r.logger.Info(fmt.Sprintf("Requesting block %d from peer took %s", first.Height, time.Since(lastApplyBlockTime))) + startTime := time.Now() state, err = r.blockExec.ApplyBlock(ctx, state, firstID, first, nil) + r.logger.Info(fmt.Sprintf("ApplyBlock %d took %s", first.Height, time.Since(startTime))) + lastApplyBlockTime = time.Now() if err != nil { panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) } diff --git a/internal/p2p/peermanager.go b/internal/p2p/peermanager.go index 7962a29ab..dc5214aa9 100644 --- a/internal/p2p/peermanager.go +++ b/internal/p2p/peermanager.go @@ -25,8 +25,6 @@ import ( const ( // retryNever is returned by retryDelay() when retries are disabled. retryNever time.Duration = math.MaxInt64 - // DefaultMutableScore is the default score for a peer during initialization - DefaultMutableScore int64 = 10 ) // PeerStatus is a peer status. @@ -47,9 +45,10 @@ const ( type PeerScore uint8 const ( - PeerScoreUnconditional PeerScore = math.MaxUint8 // unconditional peers - PeerScorePersistent PeerScore = PeerScoreUnconditional - 1 // persistent peers - MaxPeerScoreNotPersistent PeerScore = PeerScorePersistent - 1 + PeerScoreUnconditional PeerScore = math.MaxUint8 // unconditional peers, 255 + PeerScorePersistent PeerScore = PeerScoreUnconditional - 1 // persistent peers, 254 + MaxPeerScoreNotPersistent PeerScore = PeerScorePersistent - 1 // not persistent peers, 253 + DefaultMutableScore PeerScore = MaxPeerScoreNotPersistent - 10 // mutable score, 243 ) // PeerUpdate is a peer update event sent via PeerUpdates. @@ -598,6 +597,7 @@ func (m *PeerManager) DialFailed(ctx context.Context, address NodeAddress) error addressInfo.LastDialFailure = time.Now().UTC() addressInfo.DialFailures++ + peer.ConsecSuccessfulBlocks = 0 // We need to invalidate the cache after score changed m.store.ranked = nil if err := m.store.Set(peer); err != nil { @@ -845,7 +845,15 @@ func (m *PeerManager) Disconnected(ctx context.Context, peerID types.NodeID) { // Update score and invalidate cache if a peer got disconnected if _, ok := m.store.peers[peerID]; ok { - m.store.peers[peerID].NumOfDisconnections++ + // check for potential overflow + if m.store.peers[peerID].NumOfDisconnections < math.MaxInt64 { + m.store.peers[peerID].NumOfDisconnections++ + } else { + fmt.Printf("Warning: NumOfDisconnections for peer %s has reached its maximum value\n", peerID) + m.store.peers[peerID].NumOfDisconnections = 0 + } + + m.store.peers[peerID].ConsecSuccessfulBlocks = 0 m.store.ranked = nil } @@ -992,16 +1000,16 @@ func (m *PeerManager) processPeerEvent(ctx context.Context, pu PeerUpdate) { return } - if _, ok := m.store.peers[pu.NodeID]; !ok { - m.store.peers[pu.NodeID] = &peerInfo{} - } - switch pu.Status { case PeerStatusBad: m.store.peers[pu.NodeID].MutableScore-- case PeerStatusGood: m.store.peers[pu.NodeID].MutableScore++ } + + if _, ok := m.store.peers[pu.NodeID]; !ok { + m.store.peers[pu.NodeID] = &peerInfo{} + } // Invalidate the cache after score changed m.store.ranked = nil } @@ -1350,7 +1358,9 @@ type peerInfo struct { Height int64 FixedScore PeerScore // mainly for tests - MutableScore int64 // updated by router + MutableScore PeerScore // updated by router + + ConsecSuccessfulBlocks int64 } // peerInfoFromProto converts a Protobuf PeerInfo message to a peerInfo, @@ -1408,6 +1418,7 @@ func (p *peerInfo) Copy() peerInfo { // Score calculates a score for the peer. Higher-scored peers will be // preferred over lower scores. func (p *peerInfo) Score() PeerScore { + // Use predetermined scores if set if p.FixedScore > 0 { return p.FixedScore } @@ -1415,20 +1426,27 @@ func (p *peerInfo) Score() PeerScore { return PeerScoreUnconditional } - score := p.MutableScore + score := int64(p.MutableScore) if p.Persistent || p.BlockSync { score = int64(PeerScorePersistent) } + // Add points for block sync performance + score += p.ConsecSuccessfulBlocks / 5 + + // Penalize for dial failures with time decay for _, addr := range p.AddressInfo { - // DialFailures is reset when dials succeed, so this - // is either the number of dial failures or 0. - score -= int64(addr.DialFailures) + failureScore := float64(addr.DialFailures) * math.Exp(-0.1*float64(time.Since(addr.LastDialFailure).Hours())) + score -= int64(failureScore) } - // We consider lowering the score for every 3 disconnection events - score -= p.NumOfDisconnections / 3 + // Penalize for disconnections with time decay + timeSinceLastDisconnect := time.Since(p.LastConnected) + decayFactor := math.Exp(-0.1 * timeSinceLastDisconnect.Hours()) + effectiveDisconnections := int64(float64(p.NumOfDisconnections) * decayFactor) + score -= effectiveDisconnections / 3 + // Cap score for non-persistent peers if !p.Persistent && score > int64(MaxPeerScoreNotPersistent) { score = int64(MaxPeerScoreNotPersistent) } @@ -1535,3 +1553,13 @@ func (m *PeerManager) MarkReadyConnected(nodeId types.NodeID) { m.ready[nodeId] = true m.connected[nodeId] = true } + +func (m *PeerManager) IncrementBlockSyncs(peerID types.NodeID) { + m.mtx.Lock() + defer m.mtx.Unlock() + + if peer, ok := m.store.peers[peerID]; ok { + peer.ConsecSuccessfulBlocks++ + m.store.ranked = nil + } +} diff --git a/internal/p2p/peermanager_scoring_test.go b/internal/p2p/peermanager_scoring_test.go index ecceb6288..3c56220f8 100644 --- a/internal/p2p/peermanager_scoring_test.go +++ b/internal/p2p/peermanager_scoring_test.go @@ -2,11 +2,12 @@ package p2p import ( "context" - "github.com/tendermint/tendermint/libs/log" "strings" "testing" "time" + "github.com/tendermint/tendermint/libs/log" + "github.com/stretchr/testify/require" dbm "github.com/tendermint/tm-db" @@ -44,7 +45,7 @@ func TestPeerScoring(t *testing.T) { NodeID: id, Status: PeerStatusGood, }) - require.EqualValues(t, defaultScore+int64(i), peerManager.Scores()[id]) + require.EqualValues(t, defaultScore+PeerScore(i), peerManager.Scores()[id]) } // watch the corresponding decreases respond to update for i := 1; i < 10; i++ { @@ -52,18 +53,21 @@ func TestPeerScoring(t *testing.T) { NodeID: id, Status: PeerStatusBad, }) - require.EqualValues(t, DefaultMutableScore+int64(9)-int64(i), peerManager.Scores()[id]) + require.EqualValues(t, DefaultMutableScore+PeerScore(9)-PeerScore(i), peerManager.Scores()[id]) } // Dial failure should decrease score - _ = peerManager.DialFailed(ctx, NodeAddress{NodeID: id, Protocol: "memory"}) - require.EqualValues(t, DefaultMutableScore-1, peerManager.Scores()[id]) + addr := NodeAddress{NodeID: id, Protocol: "memory"} + _ = peerManager.DialFailed(ctx, addr) + _ = peerManager.DialFailed(ctx, addr) + _ = peerManager.DialFailed(ctx, addr) + require.EqualValues(t, DefaultMutableScore-2, peerManager.Scores()[id]) // Disconnect every 3 times should also decrease score for i := 1; i < 7; i++ { peerManager.Disconnected(ctx, id) } - require.EqualValues(t, DefaultMutableScore-3, peerManager.Scores()[id]) + require.EqualValues(t, DefaultMutableScore-2, peerManager.Scores()[id]) }) t.Run("AsynchronousIncrement", func(t *testing.T) { start := peerManager.Scores()[id] @@ -92,18 +96,18 @@ func TestPeerScoring(t *testing.T) { "startAt=%d score=%d", start, peerManager.Scores()[id]) }) t.Run("TestNonPersistantPeerUpperBound", func(t *testing.T) { - start := int64(peerManager.Scores()[id] + 1) - for i := start; i <= int64(PeerScorePersistent)+start; i++ { - peerManager.processPeerEvent(ctx, PeerUpdate{ - NodeID: id, - Status: PeerStatusGood, - }) + // Reset peer state to remove any previous penalties + peerManager.store.peers[id] = &peerInfo{ + ID: id, + MutableScore: DefaultMutableScore, + } - if i >= int64(PeerScorePersistent) { - require.EqualValues(t, MaxPeerScoreNotPersistent, peerManager.Scores()[id]) - } else { - require.EqualValues(t, i, peerManager.Scores()[id]) - } + // Add successful blocks to increase score + for i := 0; i < 100; i++ { + peerManager.IncrementBlockSyncs(id) } + + // Score should be capped at MaxPeerScoreNotPersistent + require.EqualValues(t, MaxPeerScoreNotPersistent, peerManager.Scores()[id]) }) } diff --git a/internal/p2p/peermanager_test.go b/internal/p2p/peermanager_test.go index ad258dfc4..b86386b79 100644 --- a/internal/p2p/peermanager_test.go +++ b/internal/p2p/peermanager_test.go @@ -172,7 +172,7 @@ func TestNewPeerManager_Persistence(t *testing.T) { require.Equal(t, map[types.NodeID]p2p.PeerScore{ aID: p2p.PeerScorePersistent, bID: 1, - cID: 10, + cID: p2p.DefaultMutableScore, }, peerManager.Scores()) // Creating a new peer manager with the same database should retain the @@ -198,7 +198,7 @@ func TestNewPeerManager_Persistence(t *testing.T) { peerManager.DialFailed(ctx, bAddresses[0]) require.Equal(t, map[types.NodeID]p2p.PeerScore{ aID: 0, - bID: p2p.PeerScorePersistent - 1, + bID: p2p.PeerScorePersistent, cID: 1, }, peerManager.Scores()) } @@ -242,7 +242,7 @@ func TestNewPeerManager_Unconditional(t *testing.T) { require.Equal(t, map[types.NodeID]p2p.PeerScore{ aID: p2p.PeerScoreUnconditional, bID: 1, - cID: 10, + cID: p2p.DefaultMutableScore, }, peerManager.Scores()) // Creating a new peer manager with the same database should retain the @@ -680,7 +680,7 @@ func TestPeerManager_TryDialNext_UpgradeReservesPeer(t *testing.T) { c := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("c", 40))} peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{ - PeerScores: map[types.NodeID]p2p.PeerScore{b.NodeID: 11, c.NodeID: 11}, + PeerScores: map[types.NodeID]p2p.PeerScore{b.NodeID: p2p.DefaultMutableScore + 1, c.NodeID: p2p.DefaultMutableScore + 1}, MaxConnected: 1, MaxConnectedUpgrade: 2, }, p2p.NopMetrics()) @@ -846,13 +846,17 @@ func TestPeerManager_DialFailed_UnreservePeer(t *testing.T) { c := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("c", 40))} peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{ - PeerScores: map[types.NodeID]p2p.PeerScore{b.NodeID: 11, c.NodeID: 11}, + PeerScores: map[types.NodeID]p2p.PeerScore{ + a.NodeID: p2p.DefaultMutableScore - 1, // Set lower score for a to make it upgradeable + b.NodeID: p2p.DefaultMutableScore + 1, // Higher score for b to attempt upgrade + c.NodeID: p2p.DefaultMutableScore + 1, // Same high score for c to attempt upgrade after b fails + }, MaxConnected: 1, MaxConnectedUpgrade: 2, }, p2p.NopMetrics()) require.NoError(t, err) - // Add a and connect to it. + // Add and connect to peer a (lower scored) added, err := peerManager.Add(a) require.NoError(t, err) require.True(t, added) @@ -861,25 +865,21 @@ func TestPeerManager_DialFailed_UnreservePeer(t *testing.T) { require.Equal(t, a, dial) require.NoError(t, peerManager.Dialed(a)) - // Add b and start dialing it. This will claim a for upgrading. + // Add both higher scored peers b and c added, err = peerManager.Add(b) require.NoError(t, err) require.True(t, added) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) - require.Equal(t, b, dial) - - // Adding c and dialing it will fail, even though it could upgrade a and we - // have free upgrade slots, because a is the only connected peer that can be - // upgraded and b is already trying to upgrade it. - added, err = peerManager.Add(c) + added, err = peerManager.Add(c) // Add c before attempting upgrade require.NoError(t, err) require.True(t, added) + + // Attempt to dial b for upgrade dial, err = peerManager.TryDialNext() require.NoError(t, err) - require.Empty(t, dial) + require.Equal(t, b, dial) - // Failing b's dial will now make c available for dialing. + // When b's dial fails, the upgrade slot should be freed + // allowing c to attempt upgrade of the same peer (a) require.NoError(t, peerManager.DialFailed(ctx, b)) dial, err = peerManager.TryDialNext() require.NoError(t, err) @@ -963,11 +963,16 @@ func TestPeerManager_Dialed_MaxConnectedUpgrade(t *testing.T) { peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{ MaxConnected: 2, MaxConnectedUpgrade: 1, - PeerScores: map[types.NodeID]p2p.PeerScore{c.NodeID: 11, d.NodeID: 11}, + PeerScores: map[types.NodeID]p2p.PeerScore{ + a.NodeID: p2p.DefaultMutableScore - 1, // Lower score for a + b.NodeID: p2p.DefaultMutableScore - 1, // Lower score for b + c.NodeID: p2p.DefaultMutableScore + 1, // Higher score for c to upgrade + d.NodeID: p2p.DefaultMutableScore + 1, // Higher score for d to upgrade + }, }, p2p.NopMetrics()) require.NoError(t, err) - // Dialing a and b is fine. + // Connect to lower scored peers a and b added, err := peerManager.Add(a) require.NoError(t, err) require.True(t, added) @@ -978,20 +983,24 @@ func TestPeerManager_Dialed_MaxConnectedUpgrade(t *testing.T) { require.True(t, added) require.NoError(t, peerManager.Dialed(b)) - // Starting an upgrade of c should be fine. + // Add both higher scored peers c and d added, err = peerManager.Add(c) require.NoError(t, err) require.True(t, added) + added, err = peerManager.Add(d) + require.NoError(t, err) + require.True(t, added) + + // Start upgrade with c dial, err := peerManager.TryDialNext() require.NoError(t, err) require.Equal(t, c, dial) require.NoError(t, peerManager.Dialed(c)) - // Trying to mark d dialed should fail, since there are no more upgrade - // slots and a/b haven't been evicted yet. - added, err = peerManager.Add(d) + // Try to dial d - should fail since we're at upgrade capacity + dial, err = peerManager.TryDialNext() require.NoError(t, err) - require.True(t, added) + require.Zero(t, dial) require.Error(t, peerManager.Dialed(d)) } @@ -1013,7 +1022,7 @@ func TestPeerManager_Dialed_Upgrade(t *testing.T) { peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{ MaxConnected: 1, MaxConnectedUpgrade: 2, - PeerScores: map[types.NodeID]p2p.PeerScore{b.NodeID: 11, c.NodeID: 11}, + PeerScores: map[types.NodeID]p2p.PeerScore{b.NodeID: p2p.DefaultMutableScore + 1, c.NodeID: p2p.DefaultMutableScore + 1}, }, p2p.NopMetrics()) require.NoError(t, err) @@ -1237,8 +1246,8 @@ func TestPeerManager_Accepted_MaxConnectedUpgrade(t *testing.T) { peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{ PeerScores: map[types.NodeID]p2p.PeerScore{ - c.NodeID: 11, - d.NodeID: 12, + c.NodeID: p2p.DefaultMutableScore + 1, + d.NodeID: p2p.DefaultMutableScore + 2, }, MaxConnected: 1, MaxConnectedUpgrade: 1, @@ -1285,8 +1294,8 @@ func TestPeerManager_Accepted_Upgrade(t *testing.T) { peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{ PeerScores: map[types.NodeID]p2p.PeerScore{ - b.NodeID: 11, - c.NodeID: 11, + b.NodeID: p2p.DefaultMutableScore + 1, + c.NodeID: p2p.DefaultMutableScore + 1, }, MaxConnected: 1, MaxConnectedUpgrade: 2, @@ -1328,8 +1337,8 @@ func TestPeerManager_Accepted_UpgradeDialing(t *testing.T) { peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{ PeerScores: map[types.NodeID]p2p.PeerScore{ - b.NodeID: 11, - c.NodeID: 11, + b.NodeID: p2p.DefaultMutableScore + 1, + c.NodeID: p2p.DefaultMutableScore + 1, }, MaxConnected: 1, MaxConnectedUpgrade: 2, @@ -1504,7 +1513,7 @@ func TestPeerManager_EvictNext_WakeOnUpgradeDialed(t *testing.T) { peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{ MaxConnected: 1, MaxConnectedUpgrade: 1, - PeerScores: map[types.NodeID]p2p.PeerScore{b.NodeID: 11}, + PeerScores: map[types.NodeID]p2p.PeerScore{b.NodeID: p2p.DefaultMutableScore + 1}, }, p2p.NopMetrics()) require.NoError(t, err) @@ -1545,7 +1554,7 @@ func TestPeerManager_EvictNext_WakeOnUpgradeAccepted(t *testing.T) { peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{ MaxConnected: 1, MaxConnectedUpgrade: 1, - PeerScores: map[types.NodeID]p2p.PeerScore{b.NodeID: 11}, + PeerScores: map[types.NodeID]p2p.PeerScore{b.NodeID: p2p.DefaultMutableScore + 1}, }, p2p.NopMetrics()) require.NoError(t, err) diff --git a/internal/state/execution.go b/internal/state/execution.go index bebac2640..b95623845 100644 --- a/internal/state/execution.go +++ b/internal/state/execution.go @@ -332,6 +332,7 @@ func (blockExec *BlockExecutor) ApplyBlock( defer commitSpan.End() } // Lock mempool, commit app state, update mempoool. + commitStart := time.Now() retainHeight, err := blockExec.Commit(ctx, state, block, fBlockRes.TxResults) if err != nil { return state, fmt.Errorf("commit failed for application: %w", err) @@ -339,6 +340,11 @@ func (blockExec *BlockExecutor) ApplyBlock( if commitSpan != nil { commitSpan.End() } + if time.Since(commitStart) > 1000*time.Millisecond { + blockExec.logger.Info("commit in blockExec", + "duration", time.Since(commitStart), + "height", block.Height) + } // Update evpool with the latest state. blockExec.evpool.Update(ctx, state, block.Evidence)