From 71c5388461df8e48bafb2e1ecd732eb29957ce03 Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Wed, 9 Oct 2024 00:18:41 +0300 Subject: [PATCH] Transition block lookup sync to range sync (#6122) * Transition block lookup sync to range sync * Log unexpected state * Merge remote-tracking branch 'sigp/unstable' into lookup-to-range * Add docs * Merge remote-tracking branch 'sigp/unstable' into lookup-to-range --- beacon_node/beacon_chain/src/test_utils.rs | 4 +- .../src/network_beacon_processor/mod.rs | 5 +- .../network/src/sync/block_lookups/mod.rs | 67 ++++++++++--- .../sync/block_lookups/single_block_lookup.rs | 10 +- .../network/src/sync/block_lookups/tests.rs | 48 +++++++++- beacon_node/network/src/sync/manager.rs | 93 +++++++++++++++++-- .../network/src/sync/network_context.rs | 6 ++ .../network/src/sync/range_sync/range.rs | 3 + 8 files changed, 206 insertions(+), 30 deletions(-) diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 344820c6a24..9be3b4cc2f9 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -2790,12 +2790,12 @@ pub fn build_log(level: slog::Level, logger_type: LoggerType) -> Logger { match logger_type { LoggerType::Test => { let drain = FullFormat::new(TermDecorator::new().build()).build().fuse(); - let drain = Async::new(drain).build().fuse(); + let drain = Async::new(drain).chan_size(10_000).build().fuse(); Logger::root(drain.filter_level(level).fuse(), o!()) } LoggerType::CI => { let drain = FullFormat::new(ci_decorator()).build().fuse(); - let drain = Async::new(drain).build().fuse(); + let drain = Async::new(drain).chan_size(10_000).build().fuse(); Logger::root(drain.filter_level(level).fuse(), o!()) } LoggerType::Null => { diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 5ec6140828b..04571e181d7 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -25,6 +25,7 @@ use std::sync::Arc; use std::time::Duration; use store::MemoryStore; use task_executor::TaskExecutor; +use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::{self, error::TrySendError}; use types::*; @@ -831,7 +832,7 @@ impl NetworkBeaconProcessor { /// Send a message to `sync_tx`. /// /// Creates a log if there is an internal error. - fn send_sync_message(&self, message: SyncMessage) { + pub(crate) fn send_sync_message(&self, message: SyncMessage) { self.sync_tx.send(message).unwrap_or_else(|e| { debug!(self.log, "Could not send message to the sync service"; "error" => %e) @@ -859,6 +860,7 @@ impl NetworkBeaconProcessor> { // processor (but not much else). pub fn null_for_testing( network_globals: Arc>, + sync_tx: UnboundedSender>, chain: Arc>>, executor: TaskExecutor, log: Logger, @@ -871,7 +873,6 @@ impl NetworkBeaconProcessor> { } = <_>::default(); let (network_tx, _network_rx) = mpsc::unbounded_channel(); - let (sync_tx, _sync_rx) = mpsc::unbounded_channel(); let network_beacon_processor = Self { beacon_processor_send: beacon_processor_tx, diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index a9dbf11fd06..a89f533ecc6 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -28,6 +28,7 @@ use super::network_context::{PeerGroup, RpcResponseError, SyncNetworkContext}; use crate::metrics; use crate::sync::block_lookups::common::ResponseType; use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor; +use crate::sync::SyncMessage; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::data_availability_checker::{ AvailabilityCheckError, AvailabilityCheckErrorCategory, @@ -55,7 +56,10 @@ mod tests; /// The maximum depth we will search for a parent block. In principle we should have sync'd any /// canonical chain to its head once the peer connects. A chain should not appear where it's depth /// is further back than the most recent head slot. -pub(crate) const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE * 2; +/// +/// Have the same value as range's sync tolerance to consider a peer synced. Once sync lookup +/// reaches the maximum depth it will force trigger range sync. +pub(crate) const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE; const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60; pub const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 4; @@ -254,22 +258,59 @@ impl BlockLookups { // blocks on top of A forming A -> C. The malicious peer forces us to fetch C // from it, which will result in parent A hitting the chain_too_long error. Then // the valid chain A -> B is dropped too. - if let Ok(block_to_drop) = find_oldest_fork_ancestor(parent_chains, chain_idx) { - // Drop all lookups descending from the child of the too long parent chain - if let Some((lookup_id, lookup)) = self + // + // `find_oldest_fork_ancestor` should never return Err, unwrapping to tip for + // complete-ness + let parent_chain_tip = parent_chain.tip; + let block_to_drop = + find_oldest_fork_ancestor(parent_chains, chain_idx).unwrap_or(parent_chain_tip); + // Drop all lookups descending from the child of the too long parent chain + if let Some((lookup_id, lookup)) = self + .single_block_lookups + .iter() + .find(|(_, l)| l.block_root() == block_to_drop) + { + // If a lookup chain is too long, we can't distinguish a valid chain from a + // malicious one. We must attempt to sync this chain to not lose liveness. If + // the chain grows too long, we stop lookup sync and transition this head to + // forward range sync. We need to tell range sync which head to sync to, and + // from which peers. The lookup of the very tip of this chain may contain zero + // peers if it's the parent-child lookup. So we do a bit of a trick here: + // - Tell range sync to sync to the tip's root (if available, else its ancestor) + // - But use all peers in the ancestor lookup, which should have at least one + // peer, and its peer set is a strict superset of the tip's lookup. + if let Some((_, tip_lookup)) = self .single_block_lookups .iter() - .find(|(_, l)| l.block_root() == block_to_drop) + .find(|(_, l)| l.block_root() == parent_chain_tip) { - for &peer_id in lookup.all_peers() { - cx.report_peer( - peer_id, - PeerAction::LowToleranceError, - "chain_too_long", - ); - } - self.drop_lookup_and_children(*lookup_id); + cx.send_sync_message(SyncMessage::AddPeersForceRangeSync { + peers: lookup.all_peers().copied().collect(), + head_slot: tip_lookup.peek_downloaded_block_slot(), + head_root: parent_chain_tip, + }); + } else { + // Should never happen, log error and continue the lookup drop + error!(self.log, "Unable to transition lookup to range sync"; + "error" => "Parent chain tip lookup not found", + "block_root" => ?parent_chain_tip + ); } + + // Do not downscore peers here. Because we can't distinguish a valid chain from + // a malicious one we may penalize honest peers for attempting to discover us a + // valid chain. Until blocks_by_range allows to specify a tip, for example with + // https://github.com/ethereum/consensus-specs/pull/3845 we will have poor + // attributability. A peer can send us garbage blocks over blocks_by_root, and + // then correct blocks via blocks_by_range. + + self.drop_lookup_and_children(*lookup_id); + } else { + // Should never happen + error!(self.log, "Unable to transition lookup to range sync"; + "error" => "Block to drop lookup not found", + "block_root" => ?block_to_drop + ); } return false; diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 73ffcd43845..4e7268a72ac 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -15,7 +15,7 @@ use std::time::{Duration, Instant}; use store::Hash256; use strum::IntoStaticStr; use types::blob_sidecar::FixedBlobSidecarList; -use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock}; +use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock, Slot}; // Dedicated enum for LookupResult to force its usage #[must_use = "LookupResult must be handled with on_lookup_result"] @@ -91,6 +91,14 @@ impl SingleBlockLookup { } } + /// Return the slot of this lookup's block if it's currently cached as `AwaitingProcessing` + pub fn peek_downloaded_block_slot(&self) -> Option { + self.block_request_state + .state + .peek_downloaded_data() + .map(|block| block.slot()) + } + /// Get the block root that is being requested. pub fn block_root(&self) -> Hash256 { self.block_root diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index cd4609e1473..0ed624fc0db 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -1,6 +1,7 @@ use crate::network_beacon_processor::NetworkBeaconProcessor; use crate::sync::manager::{BlockProcessType, SyncManager}; use crate::sync::peer_sampling::SamplingConfig; +use crate::sync::range_sync::RangeSyncType; use crate::sync::{SamplingId, SyncMessage}; use crate::NetworkMessage; use std::sync::Arc; @@ -78,6 +79,8 @@ struct TestRig { network_rx: mpsc::UnboundedReceiver>, /// Stores all `NetworkMessage`s received from `network_recv`. (e.g. outgoing RPC requests) network_rx_queue: Vec>, + /// Receiver for `SyncMessage` from the network + sync_rx: mpsc::UnboundedReceiver>, /// To send `SyncMessage`. For sending RPC responses or block processing results to sync. sync_manager: SyncManager, /// To manipulate sync state and peer connection status @@ -137,6 +140,7 @@ impl TestRig { let chain = harness.chain.clone(); let (network_tx, network_rx) = mpsc::unbounded_channel(); + let (sync_tx, sync_rx) = mpsc::unbounded_channel::>(); // TODO(das): make the generation of the ENR use the deterministic rng to have consistent // column assignments let network_config = Arc::new(NetworkConfig::default()); @@ -148,13 +152,12 @@ impl TestRig { )); let (beacon_processor, beacon_processor_rx) = NetworkBeaconProcessor::null_for_testing( globals, + sync_tx, chain.clone(), harness.runtime.task_executor.clone(), log.clone(), ); - let (_sync_send, sync_recv) = mpsc::unbounded_channel::>(); - let fork_name = chain.spec.fork_name_at_slot::(chain.slot().unwrap()); // All current tests expect synced and EL online state @@ -168,13 +171,15 @@ impl TestRig { beacon_processor_rx_queue: vec![], network_rx, network_rx_queue: vec![], + sync_rx, rng, network_globals: beacon_processor.network_globals.clone(), sync_manager: SyncManager::new( chain, network_tx, beacon_processor.into(), - sync_recv, + // Pass empty recv not tied to any tx + mpsc::unbounded_channel().1, SamplingConfig::Custom { required_successes: vec![SAMPLING_REQUIRED_SUCCESSES], }, @@ -237,6 +242,13 @@ impl TestRig { self.send_sync_message(SyncMessage::SampleBlock(block_root, block_slot)) } + /// Drain all sync messages in the sync_rx attached to the beacon processor + fn drain_sync_rx(&mut self) { + while let Ok(sync_message) = self.sync_rx.try_recv() { + self.send_sync_message(sync_message); + } + } + fn rand_block(&mut self) -> SignedBeaconBlock { self.rand_block_and_blobs(NumBlobs::None).0 } @@ -293,6 +305,10 @@ impl TestRig { self.sync_manager.active_parent_lookups().len() } + fn active_range_sync_chain(&self) -> (RangeSyncType, Slot, Slot) { + self.sync_manager.get_range_sync_chains().unwrap().unwrap() + } + fn assert_single_lookups_count(&self, count: usize) { assert_eq!( self.active_single_lookups_count(), @@ -1696,7 +1712,18 @@ fn test_parent_lookup_too_deep_grow_ancestor() { ) } - rig.expect_penalty(peer_id, "chain_too_long"); + // Should create a new syncing chain + rig.drain_sync_rx(); + assert_eq!( + rig.active_range_sync_chain(), + ( + RangeSyncType::Head, + Slot::new(0), + Slot::new(PARENT_DEPTH_TOLERANCE as u64 - 1) + ) + ); + // Should not penalize peer, but network is not clear because of the blocks_by_range requests + rig.expect_no_penalty_for(peer_id); rig.assert_failed_chain(chain_hash); } @@ -1723,7 +1750,18 @@ fn test_parent_lookup_too_deep_grow_tip() { ); } - rig.expect_penalty(peer_id, "chain_too_long"); + // Should create a new syncing chain + rig.drain_sync_rx(); + assert_eq!( + rig.active_range_sync_chain(), + ( + RangeSyncType::Head, + Slot::new(0), + Slot::new(PARENT_DEPTH_TOLERANCE as u64 - 2) + ) + ); + // Should not penalize peer, but network is not clear because of the blocks_by_range requests + rig.expect_no_penalty_for(peer_id); rig.assert_failed_chain(tip.canonical_root()); } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 708c4308b80..a2544b82b5f 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -94,6 +94,15 @@ pub enum SyncMessage { /// A useful peer has been discovered. AddPeer(PeerId, SyncInfo), + /// Force trigger range sync for a set of peers given a head they claim to have imported. Used + /// by block lookup to trigger range sync if a parent chain grows too large. + AddPeersForceRangeSync { + peers: Vec, + head_root: Hash256, + /// Sync lookup may not know the Slot of this head. However this situation is very rare. + head_slot: Option, + }, + /// A block has been received from the RPC. RpcBlock { request_id: SyncRequestId, @@ -322,6 +331,13 @@ impl SyncManager { .collect() } + #[cfg(test)] + pub(crate) fn get_range_sync_chains( + &self, + ) -> Result, &'static str> { + self.range_sync.state() + } + #[cfg(test)] pub(crate) fn get_failed_chains(&mut self) -> Vec { self.block_lookups.get_failed_chains() @@ -376,11 +392,30 @@ impl SyncManager { let sync_type = remote_sync_type(&local, &remote, &self.chain); // update the state of the peer. - let should_add = self.update_peer_sync_state(&peer_id, &local, &remote, &sync_type); - - if matches!(sync_type, PeerSyncType::Advanced) && should_add { - self.range_sync - .add_peer(&mut self.network, local, peer_id, remote); + let is_still_connected = self.update_peer_sync_state(&peer_id, &local, &remote, &sync_type); + if is_still_connected { + match sync_type { + PeerSyncType::Behind => {} // Do nothing + PeerSyncType::Advanced => { + self.range_sync + .add_peer(&mut self.network, local, peer_id, remote); + } + PeerSyncType::FullySynced => { + // Sync considers this peer close enough to the head to not trigger range sync. + // Range sync handles well syncing large ranges of blocks, of a least a few blocks. + // However this peer may be in a fork that we should sync but we have not discovered + // yet. If the head of the peer is unknown, attempt block lookup first. If the + // unknown head turns out to be on a longer fork, it will trigger range sync. + // + // A peer should always be considered `Advanced` if its finalized root is + // unknown and ahead of ours, so we don't check for that root here. + // + // TODO: This fork-choice check is potentially duplicated, review code + if !self.chain.block_is_known_to_fork_choice(&remote.head_root) { + self.handle_unknown_block_root(peer_id, remote.head_root); + } + } + } } self.update_sync_state(); @@ -391,6 +426,44 @@ impl SyncManager { } } + /// Trigger range sync for a set of peers that claim to have imported a head unknown to us. + fn add_peers_force_range_sync( + &mut self, + peers: &[PeerId], + head_root: Hash256, + head_slot: Option, + ) { + let status = self.chain.status_message(); + let local = SyncInfo { + head_slot: status.head_slot, + head_root: status.head_root, + finalized_epoch: status.finalized_epoch, + finalized_root: status.finalized_root, + }; + + let head_slot = head_slot.unwrap_or_else(|| { + debug!(self.log, + "On add peers force range sync assuming local head_slot"; + "local_head_slot" => local.head_slot, + "head_root" => ?head_root + ); + local.head_slot + }); + + let remote = SyncInfo { + head_slot, + head_root, + // Set finalized to same as local to trigger Head sync + finalized_epoch: local.finalized_epoch, + finalized_root: local.finalized_root, + }; + + for peer_id in peers { + self.range_sync + .add_peer(&mut self.network, local.clone(), *peer_id, remote.clone()); + } + } + /// Handles RPC errors related to requests that were emitted from the sync manager. fn inject_error(&mut self, peer_id: PeerId, request_id: SyncRequestId, error: RPCError) { trace!(self.log, "Sync manager received a failed RPC"); @@ -476,8 +549,7 @@ impl SyncManager { } /// Updates the syncing state of a peer. - /// Return whether the peer should be used for range syncing or not, according to its - /// connection status. + /// Return true if the peer is still connected and known to the peers DB fn update_peer_sync_state( &mut self, peer_id: &PeerId, @@ -686,6 +758,13 @@ impl SyncManager { SyncMessage::AddPeer(peer_id, info) => { self.add_peer(peer_id, info); } + SyncMessage::AddPeersForceRangeSync { + peers, + head_root, + head_slot, + } => { + self.add_peers_force_range_sync(&peers, head_root, head_slot); + } SyncMessage::RpcBlock { request_id, peer_id, diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 492b703f8a2..b67c0bf2dd8 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -7,6 +7,7 @@ pub use self::requests::{BlocksByRootSingleRequest, DataColumnsByRootSingleBlock use super::block_sidecar_coupling::RangeBlockComponentsRequest; use super::manager::BlockProcessType; use super::range_sync::{BatchId, ByRangeRequestType, ChainId}; +use super::SyncMessage; use crate::metrics; use crate::network_beacon_processor::NetworkBeaconProcessor; use crate::service::NetworkMessage; @@ -249,6 +250,11 @@ impl SyncNetworkContext { } } + pub fn send_sync_message(&mut self, sync_message: SyncMessage) { + self.network_beacon_processor + .send_sync_message(sync_message); + } + /// Returns the ids of all the requests made to the given peer_id. pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Vec { let failed_range_ids = diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index b88253c9e81..0ef99838dee 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -386,6 +386,7 @@ where #[cfg(test)] mod tests { use crate::network_beacon_processor::NetworkBeaconProcessor; + use crate::sync::SyncMessage; use crate::NetworkMessage; use super::*; @@ -690,6 +691,7 @@ mod tests { log.new(o!("component" => "range")), ); let (network_tx, network_rx) = mpsc::unbounded_channel(); + let (sync_tx, _sync_rx) = mpsc::unbounded_channel::>(); let network_config = Arc::new(NetworkConfig::default()); let globals = Arc::new(NetworkGlobals::new_test_globals( Vec::new(), @@ -700,6 +702,7 @@ mod tests { let (network_beacon_processor, beacon_processor_rx) = NetworkBeaconProcessor::null_for_testing( globals.clone(), + sync_tx, chain.clone(), harness.runtime.task_executor.clone(), log.clone(),