From 2b522d478062586de4d3386fa189aadf14f99030 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe <49718502+alexggh@users.noreply.github.com> Date: Wed, 11 Dec 2024 10:21:05 +0200 Subject: [PATCH] BACKPORT-CONFLICT --- .../network/approval-distribution/src/lib.rs | 146 ++++++++++++-- .../approval-distribution/src/tests.rs | 189 ++++++++++++++++++ prdoc/pr_6696.prdoc | 15 ++ 3 files changed, 337 insertions(+), 13 deletions(-) create mode 100644 prdoc/pr_6696.prdoc diff --git a/polkadot/node/network/approval-distribution/src/lib.rs b/polkadot/node/network/approval-distribution/src/lib.rs index ec0a842517fa..c99e582efec1 100644 --- a/polkadot/node/network/approval-distribution/src/lib.rs +++ b/polkadot/node/network/approval-distribution/src/lib.rs @@ -301,7 +301,7 @@ impl Default for AggressionConfig { fn default() -> Self { AggressionConfig { l1_threshold: Some(16), - l2_threshold: Some(28), + l2_threshold: Some(64), resend_unfinalized_period: Some(8), } } @@ -491,6 +491,15 @@ struct BlockEntry { /// Approval entries for whole block. These also contain all approvals in the case of multiple /// candidates being claimed by assignments. approval_entries: HashMap<(ValidatorIndex, CandidateBitfield), ApprovalEntry>, +<<<<<<< HEAD +======= + /// The block vrf story. + vrf_story: RelayVRFStory, + /// The block slot. + slot: Slot, + /// Backing off from re-sending messages to peers. + last_resent_at_block_number: Option, +>>>>>>> 85dd228 (Make approval-distribution aggression a bit more robust and less spammy (#6696)) } impl BlockEntry { @@ -786,6 +795,13 @@ impl State { candidates, session: meta.session, approval_entries: HashMap::new(), +<<<<<<< HEAD +======= + candidates_metadata: meta.candidates, + vrf_story: meta.vrf_story, + slot: meta.slot, + last_resent_at_block_number: None, +>>>>>>> 85dd228 (Make approval-distribution aggression a bit more robust and less spammy (#6696)) }); self.topologies.inc_session_refs(meta.session); @@ -1168,7 +1184,38 @@ impl State { self.enable_aggression(ctx, Resend::No, metrics).await; } +<<<<<<< HEAD async fn import_and_circulate_assignment( +======= + // When finality is lagging as a last resort nodes start sending the messages they have + // multiples times. This means it is safe to accept duplicate messages without punishing the + // peer and reduce the reputation and can end up banning the Peer, which in turn will create + // more no-shows. + fn accept_duplicates_from_validators( + blocks_by_number: &BTreeMap>, + topologies: &SessionGridTopologies, + aggression_config: &AggressionConfig, + entry: &BlockEntry, + peer: PeerId, + ) -> bool { + let topology = topologies.get_topology(entry.session); + let min_age = blocks_by_number.iter().next().map(|(num, _)| num); + let max_age = blocks_by_number.iter().rev().next().map(|(num, _)| num); + + // Return if we don't have at least 1 block. + let (min_age, max_age) = match (min_age, max_age) { + (Some(min), Some(max)) => (*min, *max), + _ => return false, + }; + + let age = max_age.saturating_sub(min_age); + + aggression_config.should_trigger_aggression(age) && + topology.map(|topology| topology.is_validator(&peer)).unwrap_or(false) + } + + async fn import_and_circulate_assignment( +>>>>>>> 85dd228 (Make approval-distribution aggression a bit more robust and less spammy (#6696)) &mut self, ctx: &mut Context, metrics: &Metrics, @@ -1239,6 +1286,7 @@ impl State { if peer_knowledge.contains(&message_subject, message_kind) { // wasn't included before if !peer_knowledge.received.insert(message_subject.clone(), message_kind) { +<<<<<<< HEAD gum::debug!( target: LOG_TARGET, ?peer_id, @@ -1249,10 +1297,31 @@ impl State { modify_reputation( &mut self.reputation, ctx.sender(), +======= + if !Self::accept_duplicates_from_validators( + &self.blocks_by_number, + &self.topologies, + &self.aggression_config, + entry, +>>>>>>> 85dd228 (Make approval-distribution aggression a bit more robust and less spammy (#6696)) peer_id, - COST_DUPLICATE_MESSAGE, - ) - .await; + ) { + gum::debug!( + target: LOG_TARGET, + ?peer_id, + ?message_subject, + "Duplicate assignment", + ); + + modify_reputation( + &mut self.reputation, + network_sender, + peer_id, + COST_DUPLICATE_MESSAGE, + ) + .await; + } + metrics.on_assignment_duplicate(); } else { gum::trace!( @@ -1530,6 +1599,9 @@ impl State { assignments_knowledge_key: &Vec<(MessageSubject, MessageKind)>, approval_knowledge_key: &(MessageSubject, MessageKind), entry: &mut BlockEntry, + blocks_by_number: &BTreeMap>, + topologies: &SessionGridTopologies, + aggression_config: &AggressionConfig, reputation: &mut ReputationAggregator, peer_id: PeerId, metrics: &Metrics, @@ -1557,6 +1629,7 @@ impl State { .received .insert(approval_knowledge_key.0.clone(), approval_knowledge_key.1) { +<<<<<<< HEAD gum::trace!( target: LOG_TARGET, ?peer_id, @@ -1567,10 +1640,29 @@ impl State { modify_reputation( reputation, ctx.sender(), +======= + if !Self::accept_duplicates_from_validators( + blocks_by_number, + topologies, + aggression_config, + entry, +>>>>>>> 85dd228 (Make approval-distribution aggression a bit more robust and less spammy (#6696)) peer_id, - COST_DUPLICATE_MESSAGE, - ) - .await; + ) { + gum::trace!( + target: LOG_TARGET, + ?peer_id, + ?approval_knowledge_key, + "Duplicate approval", + ); + modify_reputation( + reputation, + network_sender, + peer_id, + COST_DUPLICATE_MESSAGE, + ) + .await; + } metrics.on_approval_duplicate(); } return false @@ -1669,6 +1761,9 @@ impl State { &assignments_knowledge_keys, &approval_knwowledge_key, entry, + &self.blocks_by_number, + &self.topologies, + &self.aggression_config, &mut self.reputation, peer_id, metrics, @@ -2056,18 +2151,43 @@ impl State { &self.topologies, |block_entry| { let block_age = max_age - block_entry.number; + // We want to resend only for blocks of min_age, there is no point in + // resending for blocks newer than that, because we are just going to create load + // and not gain anything. + let diff_from_min_age = block_entry.number - min_age; + + // We want to back-off on resending for blocks that have been resent recently, to + // give time for nodes to process all the extra messages, if we still have not + // finalized we are going to resend again after unfinalized_period * 2 since the + // last resend. + let blocks_since_last_sent = block_entry + .last_resent_at_block_number + .map(|last_resent_at_block_number| max_age - last_resent_at_block_number); + + let can_resend_at_this_age = blocks_since_last_sent + .zip(config.resend_unfinalized_period) + .map(|(blocks_since_last_sent, unfinalized_period)| { + blocks_since_last_sent >= unfinalized_period * 2 + }) + .unwrap_or(true); if resend == Resend::Yes && - config - .resend_unfinalized_period - .as_ref() - .map_or(false, |p| block_age > 0 && block_age % p == 0) - { + config.resend_unfinalized_period.as_ref().map_or(false, |p| { + block_age > 0 && + block_age % p == 0 && diff_from_min_age == 0 && + can_resend_at_this_age + }) { // Retry sending to all peers. for (_, knowledge) in block_entry.known_by.iter_mut() { knowledge.sent = Knowledge::default(); } - + block_entry.last_resent_at_block_number = Some(max_age); + gum::debug!( + target: LOG_TARGET, + block_number = ?block_entry.number, + ?max_age, + "Aggression enabled with resend for block", + ); true } else { false diff --git a/polkadot/node/network/approval-distribution/src/tests.rs b/polkadot/node/network/approval-distribution/src/tests.rs index 2d08807f97b6..81f140b77aa5 100644 --- a/polkadot/node/network/approval-distribution/src/tests.rs +++ b/polkadot/node/network/approval-distribution/src/tests.rs @@ -818,6 +818,141 @@ fn peer_sending_us_the_same_we_just_sent_them_is_ok() { }); } +#[test] +fn peer_sending_us_duplicates_while_aggression_enabled_is_ok() { + let parent_hash = Hash::repeat_byte(0xFF); + let hash = Hash::repeat_byte(0xAA); + + let peers = make_peers_and_authority_ids(8); + let peer_a = peers.first().unwrap().0; + + let _ = test_harness( + Arc::new(MockAssignmentCriteria { tranche: Ok(0) }), + Arc::new(SystemClock {}), + state_without_reputation_delay(), + |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + let peer = &peer_a; + setup_peer_with_view(overseer, peer, view![], ValidationVersion::V3).await; + + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); + // Setup a topology where peer_a is neighbor to current node. + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0], &[2], 1), + ) + .await; + + // new block `hash` with 1 candidates + let meta = BlockApprovalMeta { + hash, + parent_hash, + number: 1, + candidates: vec![Default::default(); 1], + slot: 1.into(), + session: 1, + vrf_story: RelayVRFStory(Default::default()), + }; + let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); + overseer_send(overseer, msg).await; + + // import an assignment related to `hash` locally + let validator_index = ValidatorIndex(0); + let candidate_indices: CandidateBitfield = + vec![0 as CandidateIndex].try_into().unwrap(); + let candidate_bitfields = vec![CoreIndex(0)].try_into().unwrap(); + let cert = fake_assignment_cert_v2(hash, validator_index, candidate_bitfields); + overseer_send( + overseer, + ApprovalDistributionMessage::DistributeAssignment( + cert.clone().into(), + candidate_indices.clone(), + ), + ) + .await; + + // update peer view to include the hash + overseer_send( + overseer, + ApprovalDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerViewChange(*peer, view![hash]), + ), + ) + .await; + + // we should send them the assignment + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage( + peers, + Versioned::V3(protocol_v3::ValidationProtocol::ApprovalDistribution( + protocol_v3::ApprovalDistributionMessage::Assignments(assignments) + )) + )) => { + assert_eq!(peers.len(), 1); + assert_eq!(assignments.len(), 1); + } + ); + + // but if someone else is sending it the same assignment + // the peer could send us it as well + let assignments = vec![(cert, candidate_indices)]; + let msg = protocol_v3::ApprovalDistributionMessage::Assignments(assignments); + send_message_from_peer_v3(overseer, peer, msg.clone()).await; + + assert!( + overseer.recv().timeout(TIMEOUT).await.is_none(), + "we should not punish the peer" + ); + + // send the assignments again + send_message_from_peer_v3(overseer, peer, msg.clone()).await; + + // now we should + expect_reputation_change(overseer, peer, COST_DUPLICATE_MESSAGE).await; + + // Peers will be continously punished for sending duplicates until approval-distribution + // aggression kicks, at which point they aren't anymore. + let mut parent_hash = hash; + for level in 0..16 { + // As long as the lag is bellow l1 aggression, punish peers for duplicates. + send_message_from_peer_v3(overseer, peer, msg.clone()).await; + expect_reputation_change(overseer, peer, COST_DUPLICATE_MESSAGE).await; + + let number = 1 + level + 1; // first block had number 1 + let hash = BlakeTwo256::hash_of(&(parent_hash, number)); + let meta = BlockApprovalMeta { + hash, + parent_hash, + number, + candidates: vec![], + slot: (level as u64).into(), + session: 1, + vrf_story: RelayVRFStory(Default::default()), + }; + + let msg = ApprovalDistributionMessage::ApprovalCheckingLagUpdate(level + 1); + overseer_send(overseer, msg).await; + + let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); + overseer_send(overseer, msg).await; + + parent_hash = hash; + } + + // send the assignments again, we should not punish the peer because aggression is + // enabled. + send_message_from_peer_v3(overseer, peer, msg).await; + + assert!(overseer.recv().timeout(TIMEOUT).await.is_none(), "no message should be sent"); + virtual_overseer + }, + ); +} + #[test] fn import_approval_happy_path_v1_v2_peers() { let peers = make_peers_and_authority_ids(15); @@ -3488,6 +3623,60 @@ fn resends_messages_periodically() { assert_eq!(sent_assignments, assignments); } ); +<<<<<<< HEAD +======= + }; + + let mut number = 1; + for _ in 0..10 { + // Add blocks until resend is done. + { + let mut parent_hash = hash; + for level in 0..4 { + number = number + 1; + let hash = BlakeTwo256::hash_of(&(parent_hash, number)); + let meta = BlockApprovalMeta { + hash, + parent_hash, + number, + candidates: vec![], + slot: (level as u64).into(), + session: 1, + vrf_story: RelayVRFStory(Default::default()), + }; + + let msg = ApprovalDistributionMessage::ApprovalCheckingLagUpdate(2); + overseer_send(overseer, msg).await; + let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); + overseer_send(overseer, msg).await; + + parent_hash = hash; + } + } + + let mut expected_y = vec![50, 51, 52, 53]; + + // Expect messages sent only to topology peers, one by one. + for _ in 0..expected_y.len() { + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage( + sent_peers, + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Assignments(sent_assignments) + )) + )) => { + assert_eq!(sent_peers.len(), 1); + let expected_pos = expected_y.iter() + .position(|&i| &peers[i].0 == &sent_peers[0]) + .unwrap(); + + expected_y.remove(expected_pos); + assert_eq!(sent_assignments, assignments); + } + ); + } +>>>>>>> 85dd228 (Make approval-distribution aggression a bit more robust and less spammy (#6696)) } } diff --git a/prdoc/pr_6696.prdoc b/prdoc/pr_6696.prdoc new file mode 100644 index 000000000000..c5c73f831886 --- /dev/null +++ b/prdoc/pr_6696.prdoc @@ -0,0 +1,15 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: Make approval-distribution aggression a bit more robust and less spammy + +doc: + - audience: Node Dev + description: | + The problem with the current implementation of approval-distribution aggression is that is too spammy, + and can overload the nodes, so make it less spammy by moving back the moment we trigger L2 aggression + and make resend enable only for the latest unfinalized block. + +crates: + - name: polkadot-approval-distribution + bump: minor