From 17aadad2f625e6ddabff89fda27ced55fbd12e01 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Mon, 15 Jul 2024 20:31:35 +0200 Subject: [PATCH] Transition block lookup sync to range sync --- beacon_node/beacon_chain/src/test_utils.rs | 2 +- .../src/network_beacon_processor/mod.rs | 5 +- .../network/src/sync/block_lookups/mod.rs | 60 +++++++++--- .../sync/block_lookups/single_block_lookup.rs | 10 +- .../network/src/sync/block_lookups/tests.rs | 50 ++++++++-- beacon_node/network/src/sync/manager.rs | 92 +++++++++++++++++-- .../network/src/sync/network_context.rs | 6 ++ .../network/src/sync/range_sync/range.rs | 3 + 8 files changed, 197 insertions(+), 31 deletions(-) diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index eb2d78685b2..fa4d929098f 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -2604,7 +2604,7 @@ pub struct MakeAttestationOptions { pub fn build_log(level: slog::Level, enabled: bool) -> Logger { let decorator = TermDecorator::new().build(); let drain = FullFormat::new(decorator).build().fuse(); - let drain = Async::new(drain).build().fuse(); + let drain = Async::new(drain).chan_size(10_000).build().fuse(); if enabled { Logger::root(drain.filter_level(level).fuse(), o!()) diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 7f551c544c7..5fa93cda483 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -23,6 +23,7 @@ use std::sync::Arc; use std::time::Duration; use store::MemoryStore; use task_executor::TaskExecutor; +use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::{self, error::TrySendError}; use types::*; @@ -751,7 +752,7 @@ impl NetworkBeaconProcessor { /// Send a message to `sync_tx`. /// /// Creates a log if there is an internal error. - fn send_sync_message(&self, message: SyncMessage) { + pub(crate) fn send_sync_message(&self, message: SyncMessage) { self.sync_tx.send(message).unwrap_or_else(|e| { debug!(self.log, "Could not send message to the sync service"; "error" => %e) @@ -779,6 +780,7 @@ impl NetworkBeaconProcessor> { // processor (but not much else). pub fn null_for_testing( network_globals: Arc>, + sync_tx: UnboundedSender>, chain: Arc>>, executor: TaskExecutor, log: Logger, @@ -791,7 +793,6 @@ impl NetworkBeaconProcessor> { } = <_>::default(); let (network_tx, _network_rx) = mpsc::unbounded_channel(); - let (sync_tx, _sync_rx) = mpsc::unbounded_channel(); let network_beacon_processor = Self { beacon_processor_send: beacon_processor_tx, diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 7a5cda20692..df9d6cb57fc 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -28,6 +28,7 @@ use super::network_context::{PeerGroup, RpcResponseError, SyncNetworkContext}; use crate::metrics; use crate::sync::block_lookups::common::ResponseType; use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor; +use crate::sync::SyncMessage; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::data_availability_checker::AvailabilityCheckErrorCategory; use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; @@ -53,7 +54,10 @@ mod tests; /// The maximum depth we will search for a parent block. In principle we should have sync'd any /// canonical chain to its head once the peer connects. A chain should not appear where it's depth /// is further back than the most recent head slot. -pub(crate) const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE * 2; +/// +/// Have the same value as range's sync tolerance to consider a peer synced. Once sync lookup +/// reaches the maximum depth it will force trigger range sync. +pub(crate) const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE; const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60; pub const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 4; @@ -252,22 +256,52 @@ impl BlockLookups { // blocks on top of A forming A -> C. The malicious peer forces us to fetch C // from it, which will result in parent A hitting the chain_too_long error. Then // the valid chain A -> B is dropped too. - if let Ok(block_to_drop) = find_oldest_fork_ancestor(parent_chains, chain_idx) { - // Drop all lookups descending from the child of the too long parent chain - if let Some((lookup_id, lookup)) = self + // + // `find_oldest_fork_ancestor` should never return Err, unwrapping to tip for + // complete-ness + let parent_chain_tip = parent_chain.tip; + let block_to_drop = + find_oldest_fork_ancestor(parent_chains, chain_idx).unwrap_or(parent_chain_tip); + // Drop all lookups descending from the child of the too long parent chain + if let Some((lookup_id, lookup)) = self + .single_block_lookups + .iter() + .find(|(_, l)| l.block_root() == block_to_drop) + { + // If a lookup chain is too long, we can't distinguish a valid chain from a + // malicious one. We must attempt to sync this chain to not lose liveness. If + // the chain grows too long, we stop lookup sync and transition this head to + // forward range sync. We need to tell range sync which head to sync to, and + // from which peers. The lookup of the very tip of this chain may contain zero + // peers if it's the parent-child lookup. So we do a bit of a trick here: + // - Tell range sync to sync to the tip's root (if available, else its ancestor) + // - But use all peers in the ancestor lookup, which should have at least one + // peer, and its peer set is a strict superset of the tip's lookup. + let (remote_head_root, remote_head_slot) = match self .single_block_lookups .iter() - .find(|(_, l)| l.block_root() == block_to_drop) + .find(|(_, l)| l.block_root() == parent_chain_tip) { - for &peer_id in lookup.all_peers() { - cx.report_peer( - peer_id, - PeerAction::LowToleranceError, - "chain_too_long", - ); + Some((_, tip_lookup)) => { + (parent_chain_tip, tip_lookup.peek_downloaded_block_slot()) } - self.drop_lookup_and_children(*lookup_id); - } + None => (lookup.block_root(), lookup.peek_downloaded_block_slot()), + }; + + cx.send_sync_message(SyncMessage::AddPeersForceRangeSync { + peers: lookup.all_peers().copied().collect(), + head_slot: remote_head_slot, + head_root: remote_head_root, + }); + + // Do not downscore peers here. Because we can't distinguish a valid chain from + // a malicious one we may penalize honest peers for attempting to discover us a + // valid chain. Until blocks_by_range allows to specify a tip, for example with + // https://github.com/ethereum/consensus-specs/pull/3845 we will have poor + // attributability. A peer can send us garbage blocks over blocks_by_root, and + // then correct blocks via blocks_by_range. + + self.drop_lookup_and_children(*lookup_id); } return false; diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 4ae55d5aafe..90abca6953b 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -16,7 +16,7 @@ use std::time::{Duration, Instant}; use store::Hash256; use strum::IntoStaticStr; use types::blob_sidecar::FixedBlobSidecarList; -use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock}; +use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock, Slot}; // Dedicated enum for LookupResult to force its usage #[must_use = "LookupResult must be handled with on_lookup_result"] @@ -92,6 +92,14 @@ impl SingleBlockLookup { } } + /// Return the slot of this lookup's block if it's currently cached as `AwaitingProcessing` + pub fn peek_downloaded_block_slot(&self) -> Option { + self.block_request_state + .state + .peek_downloaded_data() + .map(|block| block.slot()) + } + /// Get the block root that is being requested. pub fn block_root(&self) -> Hash256 { self.block_root diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 4c63943e3f8..8ce74b850c6 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -1,5 +1,6 @@ use crate::network_beacon_processor::NetworkBeaconProcessor; use crate::sync::manager::{BlockProcessType, SyncManager}; +use crate::sync::range_sync::RangeSyncType; use crate::sync::sampling::SamplingConfig; use crate::sync::{SamplingId, SyncMessage}; use crate::NetworkMessage; @@ -77,6 +78,8 @@ struct TestRig { network_rx: mpsc::UnboundedReceiver>, /// Stores all `NetworkMessage`s received from `network_recv`. (e.g. outgoing RPC requests) network_rx_queue: Vec>, + /// Receiver for `SyncMessage` from the network + sync_rx: mpsc::UnboundedReceiver>, /// To send `SyncMessage`. For sending RPC responses or block processing results to sync. sync_manager: SyncManager, /// To manipulate sync state and peer connection status @@ -130,8 +133,7 @@ impl TestRig { let chain = harness.chain.clone(); let (network_tx, network_rx) = mpsc::unbounded_channel(); - // TODO(das): make the generation of the ENR use the deterministic rng to have consistent - // column assignments + let (sync_tx, sync_rx) = mpsc::unbounded_channel::>(); let globals = Arc::new(NetworkGlobals::new_test_globals( Vec::new(), &log, @@ -139,13 +141,12 @@ impl TestRig { )); let (beacon_processor, beacon_processor_rx) = NetworkBeaconProcessor::null_for_testing( globals, + sync_tx, chain.clone(), harness.runtime.task_executor.clone(), log.clone(), ); - let (_sync_send, sync_recv) = mpsc::unbounded_channel::>(); - let fork_name = chain.spec.fork_name_at_slot::(chain.slot().unwrap()); // All current tests expect synced and EL online state @@ -159,13 +160,15 @@ impl TestRig { beacon_processor_rx_queue: vec![], network_rx, network_rx_queue: vec![], + sync_rx, rng, network_globals: beacon_processor.network_globals.clone(), sync_manager: SyncManager::new( chain, network_tx, beacon_processor.into(), - sync_recv, + // Pass empty recv not tied to any tx + mpsc::unbounded_channel().1, SamplingConfig::Custom { required_successes: vec![SAMPLING_REQUIRED_SUCCESSES], }, @@ -232,6 +235,13 @@ impl TestRig { self.send_sync_message(SyncMessage::SampleBlock(block_root, block_slot)) } + /// Drain all sync messages in the sync_rx attached to the beacon processor + fn drain_sync_rx(&mut self) { + while let Ok(sync_message) = self.sync_rx.try_recv() { + self.send_sync_message(sync_message); + } + } + fn rand_block(&mut self) -> SignedBeaconBlock { self.rand_block_and_blobs(NumBlobs::None).0 } @@ -288,6 +298,10 @@ impl TestRig { self.sync_manager.active_parent_lookups().len() } + fn active_range_sync_chain(&self) -> (RangeSyncType, Slot, Slot) { + self.sync_manager.get_range_sync_chains().unwrap().unwrap() + } + fn assert_single_lookups_count(&self, count: usize) { assert_eq!( self.active_single_lookups_count(), @@ -1665,7 +1679,18 @@ fn test_parent_lookup_too_deep_grow_ancestor() { ) } - rig.expect_penalty(peer_id, "chain_too_long"); + // Should create a new syncing chain + rig.drain_sync_rx(); + assert_eq!( + rig.active_range_sync_chain(), + ( + RangeSyncType::Head, + Slot::new(0), + Slot::new(PARENT_DEPTH_TOLERANCE as u64 - 1) + ) + ); + // Should not penalize peer, but network is not clear because of the blocks_by_range requests + rig.expect_no_penalty_for(peer_id); rig.assert_failed_chain(chain_hash); } @@ -1689,7 +1714,18 @@ fn test_parent_lookup_too_deep_grow_tip() { ); } - rig.expect_penalty(peer_id, "chain_too_long"); + // Should create a new syncing chain + rig.drain_sync_rx(); + assert_eq!( + rig.active_range_sync_chain(), + ( + RangeSyncType::Head, + Slot::new(0), + Slot::new(PARENT_DEPTH_TOLERANCE as u64 - 2) + ) + ); + // Should not penalize peer, but network is not clear because of the blocks_by_range requests + rig.expect_no_penalty_for(peer_id); rig.assert_failed_chain(tip.canonical_root()); } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index d6ce14adb16..67ee8c5a1f5 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -90,6 +90,15 @@ pub enum SyncMessage { /// A useful peer has been discovered. AddPeer(PeerId, SyncInfo), + /// Force trigger range sync for a set of peers given a head they claim to have imported. Used + /// by block lookup to trigger range sync if a parent chain grows too large. + AddPeersForceRangeSync { + peers: Vec, + head_root: Hash256, + /// Sync lookup may not know the Slot of this head. However this situation is very rare. + head_slot: Option, + }, + /// A block has been received from the RPC. RpcBlock { request_id: SyncRequestId, @@ -318,6 +327,13 @@ impl SyncManager { .collect() } + #[cfg(test)] + pub(crate) fn get_range_sync_chains( + &self, + ) -> Result, &'static str> { + self.range_sync.state() + } + #[cfg(test)] pub(crate) fn get_failed_chains(&mut self) -> Vec { self.block_lookups.get_failed_chains() @@ -361,16 +377,72 @@ impl SyncManager { let sync_type = remote_sync_type(&local, &remote, &self.chain); // update the state of the peer. - let should_add = self.update_peer_sync_state(&peer_id, &local, &remote, &sync_type); - - if matches!(sync_type, PeerSyncType::Advanced) && should_add { - self.range_sync - .add_peer(&mut self.network, local, peer_id, remote); + let is_still_connected = self.update_peer_sync_state(&peer_id, &local, &remote, &sync_type); + if is_still_connected { + match sync_type { + PeerSyncType::Behind => {} // Do nothing + PeerSyncType::Advanced => { + self.range_sync + .add_peer(&mut self.network, local, peer_id, remote); + } + PeerSyncType::FullySynced => { + // Sync considers this peer close enough to the head to not trigger range sync. + // Range sync handles well syncing large ranges of blocks, of a least a few blocks. + // However this peer may be in a fork that we should sync but we have not discovered + // yet. If the head of the peer is unknown, attempt block lookup first. If the + // unknown head turns out to be on a longer fork, it will trigger range sync. + // + // A peer should always be considered `Advanced` if its finalized root is + // unknown and ahead of ours, so we don't check for that root here. + // + // TODO: This fork-choice check is potentially duplicated, review code + if !self.chain.block_is_known_to_fork_choice(&remote.head_root) { + self.handle_unknown_block_root(peer_id, remote.head_root); + } + } + } } self.update_sync_state(); } + fn add_peers_force_range_sync( + &mut self, + peers: &[PeerId], + head_root: Hash256, + head_slot: Option, + ) { + let status = self.chain.status_message(); + let local = SyncInfo { + head_slot: status.head_slot, + head_root: status.head_root, + finalized_epoch: status.finalized_epoch, + finalized_root: status.finalized_root, + }; + + let head_slot = head_slot.unwrap_or_else(|| { + debug!(self.log, + "On add peers force range sync assuming local head_slot"; + "local_head_slot" => local.head_slot, + "head_root" => ?head_root + ); + local.head_slot + }); + + let remote = SyncInfo { + head_slot, + head_root, + // Set finalized to same as local to trigger Head sync + finalized_epoch: local.finalized_epoch, + finalized_root: local.finalized_root, + }; + + for peer_id in peers { + self.range_sync + .add_peer(&mut self.network, local.clone(), *peer_id, remote.clone()); + } + } + /// Handles RPC errors related to requests that were emitted from the sync manager. fn inject_error(&mut self, peer_id: PeerId, request_id: SyncRequestId, error: RPCError) { trace!(self.log, "Sync manager received a failed RPC"); @@ -446,8 +518,7 @@ impl SyncManager { } /// Updates the syncing state of a peer. - /// Return whether the peer should be used for range syncing or not, according to its - /// connection status. + /// Return true if the peer is still connected and known to the peers DB fn update_peer_sync_state( &mut self, peer_id: &PeerId, @@ -651,6 +722,13 @@ impl SyncManager { SyncMessage::AddPeer(peer_id, info) => { self.add_peer(peer_id, info); } + SyncMessage::AddPeersForceRangeSync { + peers, + head_root, + head_slot, + } => { + self.add_peers_force_range_sync(&peers, head_root, head_slot); + } SyncMessage::RpcBlock { request_id, peer_id, diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 1cf028dbcd8..ec92f05b955 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -7,6 +7,7 @@ pub use self::requests::{BlocksByRootSingleRequest, DataColumnsByRootSingleBlock use super::block_sidecar_coupling::RangeBlockComponentsRequest; use super::manager::BlockProcessType; use super::range_sync::{BatchId, ByRangeRequestType, ChainId}; +use super::SyncMessage; use crate::metrics; use crate::network_beacon_processor::NetworkBeaconProcessor; use crate::service::NetworkMessage; @@ -234,6 +235,11 @@ impl SyncNetworkContext { } } + pub fn send_sync_message(&mut self, sync_message: SyncMessage) { + self.network_beacon_processor + .send_sync_message(sync_message); + } + /// Returns the ids of all the requests made to the given peer_id. pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Vec { let failed_range_ids = diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index beb04fac28b..2f2e4cd10f9 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -387,6 +387,7 @@ where #[cfg(test)] mod tests { use crate::network_beacon_processor::NetworkBeaconProcessor; + use crate::sync::SyncMessage; use crate::NetworkMessage; use super::*; @@ -689,6 +690,7 @@ mod tests { log.new(o!("component" => "range")), ); let (network_tx, network_rx) = mpsc::unbounded_channel(); + let (sync_tx, _sync_rx) = mpsc::unbounded_channel::>(); let globals = Arc::new(NetworkGlobals::new_test_globals( Vec::new(), &log, @@ -697,6 +699,7 @@ mod tests { let (network_beacon_processor, beacon_processor_rx) = NetworkBeaconProcessor::null_for_testing( globals.clone(), + sync_tx, chain.clone(), harness.runtime.task_executor.clone(), log.clone(),