diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 344e91711c4..5d02be2b4c1 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -362,6 +362,16 @@ impl SyncManager { self.sampling.get_request_status(block_root, index) } + #[cfg(test)] + pub(crate) fn range_sync_state(&self) -> super::range_sync::SyncChainStatus { + self.range_sync.state() + } + + #[cfg(test)] + pub(crate) fn update_execution_engine_state(&mut self, state: EngineState) { + self.handle_new_execution_engine_state(state); + } + fn network_globals(&self) -> &NetworkGlobals { self.network.network_globals() } diff --git a/beacon_node/network/src/sync/range_sync/block_storage.rs b/beacon_node/network/src/sync/range_sync/block_storage.rs deleted file mode 100644 index df49543a6b6..00000000000 --- a/beacon_node/network/src/sync/range_sync/block_storage.rs +++ /dev/null @@ -1,13 +0,0 @@ -use beacon_chain::{BeaconChain, BeaconChainTypes}; -use types::Hash256; - -/// Trait that helps maintain RangeSync's implementation split from the BeaconChain -pub trait BlockStorage { - fn is_block_known(&self, block_root: &Hash256) -> bool; -} - -impl BlockStorage for BeaconChain { - fn is_block_known(&self, block_root: &Hash256) -> bool { - self.block_is_known_to_fork_choice(block_root) - } -} diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 1217fbf8fed..c030d0a19e8 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -3,12 +3,11 @@ //! Each chain type is stored in it's own map. A variety of helper functions are given along with //! this struct to simplify the logic of the other layers of sync. -use super::block_storage::BlockStorage; use super::chain::{ChainId, ProcessingResult, RemoveChain, SyncingChain}; use super::sync_type::RangeSyncType; use crate::metrics; use crate::sync::network_context::SyncNetworkContext; -use beacon_chain::BeaconChainTypes; +use beacon_chain::{BeaconChain, BeaconChainTypes}; use fnv::FnvHashMap; use lighthouse_network::PeerId; use lighthouse_network::SyncInfo; @@ -37,10 +36,13 @@ pub enum RangeSyncState { Idle, } +pub type SyncChainStatus = + Result, &'static str>; + /// A collection of finalized and head chains currently being processed. -pub struct ChainCollection { +pub struct ChainCollection { /// The beacon chain for processing. - beacon_chain: Arc, + beacon_chain: Arc>, /// The set of finalized chains being synced. finalized_chains: FnvHashMap>, /// The set of head chains being synced. @@ -51,8 +53,8 @@ pub struct ChainCollection { log: slog::Logger, } -impl ChainCollection { - pub fn new(beacon_chain: Arc, log: slog::Logger) -> Self { +impl ChainCollection { + pub fn new(beacon_chain: Arc>, log: slog::Logger) -> Self { ChainCollection { beacon_chain, finalized_chains: FnvHashMap::default(), @@ -213,9 +215,7 @@ impl ChainCollection { } } - pub fn state( - &self, - ) -> Result, &'static str> { + pub fn state(&self) -> SyncChainStatus { match self.state { RangeSyncState::Finalized(ref syncing_id) => { let chain = self @@ -409,7 +409,8 @@ impl ChainCollection { let log_ref = &self.log; let is_outdated = |target_slot: &Slot, target_root: &Hash256| { - target_slot <= &local_finalized_slot || beacon_chain.is_block_known(target_root) + target_slot <= &local_finalized_slot + || beacon_chain.block_is_known_to_fork_choice(target_root) }; // Retain only head peers that remain relevant diff --git a/beacon_node/network/src/sync/range_sync/mod.rs b/beacon_node/network/src/sync/range_sync/mod.rs index d0f2f9217eb..8f881fba90f 100644 --- a/beacon_node/network/src/sync/range_sync/mod.rs +++ b/beacon_node/network/src/sync/range_sync/mod.rs @@ -2,7 +2,6 @@ //! peers. mod batch; -mod block_storage; mod chain; mod chain_collection; mod range; @@ -13,5 +12,7 @@ pub use batch::{ ByRangeRequestType, }; pub use chain::{BatchId, ChainId, EPOCHS_PER_BATCH}; +#[cfg(test)] +pub use chain_collection::SyncChainStatus; pub use range::RangeSync; pub use sync_type::RangeSyncType; diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 0ef99838dee..78679403bb4 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -39,9 +39,8 @@ //! Each chain is downloaded in batches of blocks. The batched blocks are processed sequentially //! and further batches are requested as current blocks are being processed. -use super::block_storage::BlockStorage; use super::chain::{BatchId, ChainId, RemoveChain, SyncingChain}; -use super::chain_collection::ChainCollection; +use super::chain_collection::{ChainCollection, SyncChainStatus}; use super::sync_type::RangeSyncType; use crate::metrics; use crate::status::ToStatusMessage; @@ -56,7 +55,7 @@ use lru_cache::LRUTimeCache; use slog::{crit, debug, trace, warn}; use std::collections::HashMap; use std::sync::Arc; -use types::{Epoch, EthSpec, Hash256, Slot}; +use types::{Epoch, EthSpec, Hash256}; /// For how long we store failed finalized chains to prevent retries. const FAILED_CHAINS_EXPIRY_SECONDS: u64 = 30; @@ -64,27 +63,26 @@ const FAILED_CHAINS_EXPIRY_SECONDS: u64 = 30; /// The primary object dealing with long range/batch syncing. This contains all the active and /// non-active chains that need to be processed before the syncing is considered complete. This /// holds the current state of the long range sync. -pub struct RangeSync> { +pub struct RangeSync { /// The beacon chain for processing. - beacon_chain: Arc, + beacon_chain: Arc>, /// Last known sync info of our useful connected peers. We use this information to create Head /// chains after all finalized chains have ended. awaiting_head_peers: HashMap, /// A collection of chains that need to be downloaded. This stores any head or finalized chains /// that need to be downloaded. - chains: ChainCollection, + chains: ChainCollection, /// Chains that have failed and are stored to prevent being retried. failed_chains: LRUTimeCache, /// The syncing logger. log: slog::Logger, } -impl RangeSync +impl RangeSync where - C: BlockStorage + ToStatusMessage, T: BeaconChainTypes, { - pub fn new(beacon_chain: Arc, log: slog::Logger) -> Self { + pub fn new(beacon_chain: Arc>, log: slog::Logger) -> Self { RangeSync { beacon_chain: beacon_chain.clone(), chains: ChainCollection::new(beacon_chain, log.clone()), @@ -96,9 +94,7 @@ where } } - pub fn state( - &self, - ) -> Result, &'static str> { + pub fn state(&self) -> SyncChainStatus { self.chains.state() } @@ -382,465 +378,3 @@ where } } } - -#[cfg(test)] -mod tests { - use crate::network_beacon_processor::NetworkBeaconProcessor; - use crate::sync::SyncMessage; - use crate::NetworkMessage; - - use super::*; - use crate::sync::network_context::{BlockOrBlob, RangeRequestId}; - use beacon_chain::builder::Witness; - use beacon_chain::eth1_chain::CachingEth1Backend; - use beacon_chain::parking_lot::RwLock; - use beacon_chain::test_utils::{BeaconChainHarness, EphemeralHarnessType}; - use beacon_chain::EngineState; - use beacon_processor::WorkEvent as BeaconWorkEvent; - use lighthouse_network::service::api_types::SyncRequestId; - use lighthouse_network::{ - rpc::StatusMessage, service::api_types::AppRequestId, NetworkConfig, NetworkGlobals, - }; - use slog::{o, Drain}; - use slot_clock::TestingSlotClock; - use std::collections::HashSet; - use store::MemoryStore; - use tokio::sync::mpsc; - use types::{FixedBytesExtended, ForkName, MinimalEthSpec as E}; - - #[derive(Debug)] - struct FakeStorage { - known_blocks: RwLock>, - status: RwLock, - } - - impl Default for FakeStorage { - fn default() -> Self { - FakeStorage { - known_blocks: RwLock::new(HashSet::new()), - status: RwLock::new(StatusMessage { - fork_digest: [0; 4], - finalized_root: Hash256::zero(), - finalized_epoch: 0usize.into(), - head_root: Hash256::zero(), - head_slot: 0usize.into(), - }), - } - } - } - - impl FakeStorage { - fn remember_block(&self, block_root: Hash256) { - self.known_blocks.write().insert(block_root); - } - - #[allow(dead_code)] - fn forget_block(&self, block_root: &Hash256) { - self.known_blocks.write().remove(block_root); - } - } - - impl BlockStorage for FakeStorage { - fn is_block_known(&self, block_root: &store::Hash256) -> bool { - self.known_blocks.read().contains(block_root) - } - } - - impl ToStatusMessage for FakeStorage { - fn status_message(&self) -> StatusMessage { - self.status.read().clone() - } - } - - type TestBeaconChainType = - Witness, E, MemoryStore, MemoryStore>; - - fn build_log(level: slog::Level, enabled: bool) -> slog::Logger { - let decorator = slog_term::TermDecorator::new().build(); - let drain = slog_term::FullFormat::new(decorator).build().fuse(); - let drain = slog_async::Async::new(drain).build().fuse(); - - if enabled { - slog::Logger::root(drain.filter_level(level).fuse(), o!()) - } else { - slog::Logger::root(drain.filter(|_| false).fuse(), o!()) - } - } - - #[allow(unused)] - struct TestRig { - log: slog::Logger, - /// To check what does sync send to the beacon processor. - beacon_processor_rx: mpsc::Receiver>, - /// To set up different scenarios where sync is told about known/unknown blocks. - chain: Arc, - /// Needed by range to handle communication with the network. - cx: SyncNetworkContext, - /// To check what the network receives from Range. - network_rx: mpsc::UnboundedReceiver>, - /// To modify what the network declares about various global variables, in particular about - /// the sync state of a peer. - globals: Arc>, - } - - impl RangeSync { - fn assert_state(&self, expected_state: RangeSyncType) { - assert_eq!( - self.state() - .expect("State is ok") - .expect("Range is syncing") - .0, - expected_state - ) - } - - #[allow(dead_code)] - fn assert_not_syncing(&self) { - assert!( - self.state().expect("State is ok").is_none(), - "Range should not be syncing." - ); - } - } - - impl TestRig { - fn local_info(&self) -> SyncInfo { - let StatusMessage { - fork_digest: _, - finalized_root, - finalized_epoch, - head_root, - head_slot, - } = self.chain.status.read().clone(); - SyncInfo { - head_slot, - head_root, - finalized_epoch, - finalized_root, - } - } - - /// Reads an BlocksByRange request to a given peer from the network receiver channel. - #[track_caller] - fn grab_request( - &mut self, - expected_peer: &PeerId, - fork_name: ForkName, - ) -> (AppRequestId, Option) { - let block_req_id = if let Ok(NetworkMessage::SendRequest { - peer_id, - request: _, - request_id, - }) = self.network_rx.try_recv() - { - assert_eq!(&peer_id, expected_peer); - request_id - } else { - panic!("Should have sent a batch request to the peer") - }; - let blob_req_id = if fork_name.deneb_enabled() { - if let Ok(NetworkMessage::SendRequest { - peer_id, - request: _, - request_id, - }) = self.network_rx.try_recv() - { - assert_eq!(&peer_id, expected_peer); - Some(request_id) - } else { - panic!("Should have sent a batch request to the peer") - } - } else { - None - }; - (block_req_id, blob_req_id) - } - - fn complete_range_block_and_blobs_response( - &mut self, - block_req: AppRequestId, - blob_req_opt: Option, - ) -> (ChainId, BatchId, Id) { - if blob_req_opt.is_some() { - match block_req { - AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }) => { - let _ = self - .cx - .range_block_and_blob_response(id, BlockOrBlob::Block(None)); - let response = self - .cx - .range_block_and_blob_response(id, BlockOrBlob::Blob(None)) - .unwrap(); - let (chain_id, batch_id) = - TestRig::unwrap_range_request_id(response.sender_id); - (chain_id, batch_id, id) - } - other => panic!("unexpected request {:?}", other), - } - } else { - match block_req { - AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }) => { - let response = self - .cx - .range_block_and_blob_response(id, BlockOrBlob::Block(None)) - .unwrap(); - let (chain_id, batch_id) = - TestRig::unwrap_range_request_id(response.sender_id); - (chain_id, batch_id, id) - } - other => panic!("unexpected request {:?}", other), - } - } - } - - fn unwrap_range_request_id(sender_id: RangeRequestId) -> (ChainId, BatchId) { - if let RangeRequestId::RangeSync { chain_id, batch_id } = sender_id { - (chain_id, batch_id) - } else { - panic!("expected RangeSync request: {:?}", sender_id) - } - } - - /// Produce a head peer - fn head_peer( - &self, - ) -> ( - PeerId, - SyncInfo, /* Local info */ - SyncInfo, /* Remote info */ - ) { - let local_info = self.local_info(); - - // Get a peer with an advanced head - let head_root = Hash256::random(); - let head_slot = local_info.head_slot + 1; - let remote_info = SyncInfo { - head_root, - head_slot, - ..local_info - }; - let peer_id = PeerId::random(); - (peer_id, local_info, remote_info) - } - - fn finalized_peer( - &self, - ) -> ( - PeerId, - SyncInfo, /* Local info */ - SyncInfo, /* Remote info */ - ) { - let local_info = self.local_info(); - - let finalized_root = Hash256::random(); - let finalized_epoch = local_info.finalized_epoch + 2; - let head_slot = finalized_epoch.start_slot(E::slots_per_epoch()); - let head_root = Hash256::random(); - let remote_info = SyncInfo { - finalized_epoch, - finalized_root, - head_slot, - head_root, - }; - - let peer_id = PeerId::random(); - (peer_id, local_info, remote_info) - } - - #[track_caller] - fn expect_empty_processor(&mut self) { - match self.beacon_processor_rx.try_recv() { - Ok(work) => { - panic!( - "Expected empty processor. Instead got {}", - work.work_type_str() - ); - } - Err(e) => match e { - mpsc::error::TryRecvError::Empty => {} - mpsc::error::TryRecvError::Disconnected => unreachable!("bad coded test?"), - }, - } - } - - #[track_caller] - fn expect_chain_segment(&mut self) { - match self.beacon_processor_rx.try_recv() { - Ok(work) => { - assert_eq!(work.work_type(), beacon_processor::WorkType::ChainSegment); - } - other => panic!("Expected chain segment process, found {:?}", other), - } - } - } - - fn range(log_enabled: bool) -> (TestRig, RangeSync) { - let log = build_log(slog::Level::Trace, log_enabled); - // Initialise a new beacon chain - let harness = BeaconChainHarness::>::builder(E) - .default_spec() - .logger(log.clone()) - .deterministic_keypairs(1) - .fresh_ephemeral_store() - .build(); - let chain = harness.chain; - - let fake_store = Arc::new(FakeStorage::default()); - let range_sync = RangeSync::::new( - fake_store.clone(), - log.new(o!("component" => "range")), - ); - let (network_tx, network_rx) = mpsc::unbounded_channel(); - let (sync_tx, _sync_rx) = mpsc::unbounded_channel::>(); - let network_config = Arc::new(NetworkConfig::default()); - let globals = Arc::new(NetworkGlobals::new_test_globals( - Vec::new(), - &log, - network_config, - chain.spec.clone(), - )); - let (network_beacon_processor, beacon_processor_rx) = - NetworkBeaconProcessor::null_for_testing( - globals.clone(), - sync_tx, - chain.clone(), - harness.runtime.task_executor.clone(), - log.clone(), - ); - let cx = SyncNetworkContext::new( - network_tx, - Arc::new(network_beacon_processor), - chain, - log.new(o!("component" => "network_context")), - ); - let test_rig = TestRig { - log, - beacon_processor_rx, - chain: fake_store, - cx, - network_rx, - globals, - }; - (test_rig, range_sync) - } - - #[test] - fn head_chain_removed_while_finalized_syncing() { - // NOTE: this is a regression test. - let (mut rig, mut range) = range(false); - - // Get a peer with an advanced head - let (head_peer, local_info, remote_info) = rig.head_peer(); - range.add_peer(&mut rig.cx, local_info, head_peer, remote_info); - range.assert_state(RangeSyncType::Head); - - let fork = rig - .cx - .chain - .spec - .fork_name_at_epoch(rig.cx.chain.epoch().unwrap()); - - // Sync should have requested a batch, grab the request. - let _ = rig.grab_request(&head_peer, fork); - - // Now get a peer with an advanced finalized epoch. - let (finalized_peer, local_info, remote_info) = rig.finalized_peer(); - range.add_peer(&mut rig.cx, local_info, finalized_peer, remote_info); - range.assert_state(RangeSyncType::Finalized); - - // Sync should have requested a batch, grab the request - let _ = rig.grab_request(&finalized_peer, fork); - - // Fail the head chain by disconnecting the peer. - range.remove_peer(&mut rig.cx, &head_peer); - range.assert_state(RangeSyncType::Finalized); - } - - #[test] - fn state_update_while_purging() { - // NOTE: this is a regression test. - let (mut rig, mut range) = range(true); - - // Get a peer with an advanced head - let (head_peer, local_info, head_info) = rig.head_peer(); - let head_peer_root = head_info.head_root; - range.add_peer(&mut rig.cx, local_info, head_peer, head_info); - range.assert_state(RangeSyncType::Head); - - let fork = rig - .cx - .chain - .spec - .fork_name_at_epoch(rig.cx.chain.epoch().unwrap()); - - // Sync should have requested a batch, grab the request. - let _ = rig.grab_request(&head_peer, fork); - - // Now get a peer with an advanced finalized epoch. - let (finalized_peer, local_info, remote_info) = rig.finalized_peer(); - let finalized_peer_root = remote_info.finalized_root; - range.add_peer(&mut rig.cx, local_info, finalized_peer, remote_info); - range.assert_state(RangeSyncType::Finalized); - - // Sync should have requested a batch, grab the request - let _ = rig.grab_request(&finalized_peer, fork); - - // Now the chain knows both chains target roots. - rig.chain.remember_block(head_peer_root); - rig.chain.remember_block(finalized_peer_root); - - // Add an additional peer to the second chain to make range update it's status - let (finalized_peer, local_info, remote_info) = rig.finalized_peer(); - range.add_peer(&mut rig.cx, local_info, finalized_peer, remote_info); - } - - #[test] - fn pause_and_resume_on_ee_offline() { - let (mut rig, mut range) = range(true); - let fork = rig - .cx - .chain - .spec - .fork_name_at_epoch(rig.cx.chain.epoch().unwrap()); - - // add some peers - let (peer1, local_info, head_info) = rig.head_peer(); - range.add_peer(&mut rig.cx, local_info, peer1, head_info); - let (block_req, blob_req_opt) = rig.grab_request(&peer1, fork); - - let (chain1, batch1, id1) = - rig.complete_range_block_and_blobs_response(block_req, blob_req_opt); - - // make the ee offline - rig.cx.update_execution_engine_state(EngineState::Offline); - - // send the response to the request - range.blocks_by_range_response(&mut rig.cx, peer1, chain1, batch1, id1, vec![]); - - // the beacon processor shouldn't have received any work - rig.expect_empty_processor(); - - // while the ee is offline, more peers might arrive. Add a new finalized peer. - let (peer2, local_info, finalized_info) = rig.finalized_peer(); - range.add_peer(&mut rig.cx, local_info, peer2, finalized_info); - let (block_req, blob_req_opt) = rig.grab_request(&peer2, fork); - - let (chain2, batch2, id2) = - rig.complete_range_block_and_blobs_response(block_req, blob_req_opt); - - // send the response to the request - range.blocks_by_range_response(&mut rig.cx, peer2, chain2, batch2, id2, vec![]); - - // the beacon processor shouldn't have received any work - rig.expect_empty_processor(); - - // make the beacon processor available again. - rig.cx.update_execution_engine_state(EngineState::Online); - - // now resume range, we should have two processing requests in the beacon processor. - range.resume(&mut rig.cx); - - rig.expect_chain_segment(); - rig.expect_chain_segment(); - } -} diff --git a/beacon_node/network/src/sync/range_sync/sync_type.rs b/beacon_node/network/src/sync/range_sync/sync_type.rs index d6ffd4a5dfb..4ff7e393101 100644 --- a/beacon_node/network/src/sync/range_sync/sync_type.rs +++ b/beacon_node/network/src/sync/range_sync/sync_type.rs @@ -1,10 +1,9 @@ //! Contains logic about identifying which Sync to perform given PeerSyncInfo of ourselves and //! of a remote. +use beacon_chain::{BeaconChain, BeaconChainTypes}; use lighthouse_network::SyncInfo; -use super::block_storage::BlockStorage; - /// The type of Range sync that should be done relative to our current state. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum RangeSyncType { @@ -17,8 +16,8 @@ pub enum RangeSyncType { impl RangeSyncType { /// Determines the type of sync given our local `PeerSyncInfo` and the remote's /// `PeerSyncInfo`. - pub fn new( - chain: &C, + pub fn new( + chain: &BeaconChain, local_info: &SyncInfo, remote_info: &SyncInfo, ) -> RangeSyncType { @@ -29,7 +28,7 @@ impl RangeSyncType { // not seen the finalized hash before. if remote_info.finalized_epoch > local_info.finalized_epoch - && !chain.is_block_known(&remote_info.finalized_root) + && !chain.block_is_known_to_fork_choice(&remote_info.finalized_root) { RangeSyncType::Finalized } else { diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index 9f2c9ef66f0..94aacad3e81 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -83,6 +83,7 @@ impl TestRig { .logger(log.clone()) .deterministic_keypairs(1) .fresh_ephemeral_store() + .mock_execution_layer() .testing_slot_clock(TestingSlotClock::new( Slot::new(0), Duration::from_secs(0), @@ -144,7 +145,7 @@ impl TestRig { } } - fn test_setup() -> Self { + pub fn test_setup() -> Self { Self::test_setup_with_config(None) } @@ -168,11 +169,11 @@ impl TestRig { } } - fn log(&self, msg: &str) { + pub fn log(&self, msg: &str) { info!(self.log, "TEST_RIG"; "msg" => msg); } - fn after_deneb(&self) -> bool { + pub fn after_deneb(&self) -> bool { matches!(self.fork_name, ForkName::Deneb | ForkName::Electra) } @@ -238,7 +239,7 @@ impl TestRig { (parent, block, parent_root, block_root) } - fn send_sync_message(&mut self, sync_message: SyncMessage) { + pub fn send_sync_message(&mut self, sync_message: SyncMessage) { self.sync_manager.handle_message(sync_message); } @@ -369,7 +370,7 @@ impl TestRig { self.expect_empty_network(); } - fn new_connected_peer(&mut self) -> PeerId { + pub fn new_connected_peer(&mut self) -> PeerId { self.network_globals .peers .write() @@ -811,7 +812,7 @@ impl TestRig { } } - fn peer_disconnected(&mut self, peer_id: PeerId) { + pub fn peer_disconnected(&mut self, peer_id: PeerId) { self.send_sync_message(SyncMessage::Disconnect(peer_id)); } @@ -827,7 +828,7 @@ impl TestRig { } } - fn pop_received_network_event) -> Option>( + pub fn pop_received_network_event) -> Option>( &mut self, predicate_transform: F, ) -> Result { @@ -847,7 +848,7 @@ impl TestRig { } } - fn pop_received_processor_event) -> Option>( + pub fn pop_received_processor_event) -> Option>( &mut self, predicate_transform: F, ) -> Result { @@ -871,6 +872,16 @@ impl TestRig { } } + pub fn expect_empty_processor(&mut self) { + self.drain_processor_rx(); + if !self.beacon_processor_rx_queue.is_empty() { + panic!( + "Expected processor to be empty, but has events: {:?}", + self.beacon_processor_rx_queue + ); + } + } + fn find_block_lookup_request( &mut self, for_block: Hash256, @@ -2173,7 +2184,8 @@ fn custody_lookup_happy_path() { mod deneb_only { use super::*; use beacon_chain::{ - block_verification_types::RpcBlock, data_availability_checker::AvailabilityCheckError, + block_verification_types::{AsBlock, RpcBlock}, + data_availability_checker::AvailabilityCheckError, }; use ssz_types::VariableList; use std::collections::VecDeque; diff --git a/beacon_node/network/src/sync/tests/range.rs b/beacon_node/network/src/sync/tests/range.rs index 8b137891791..6faa8b72472 100644 --- a/beacon_node/network/src/sync/tests/range.rs +++ b/beacon_node/network/src/sync/tests/range.rs @@ -1 +1,273 @@ +use super::*; +use crate::status::ToStatusMessage; +use crate::sync::manager::SLOT_IMPORT_TOLERANCE; +use crate::sync::range_sync::RangeSyncType; +use crate::sync::SyncMessage; +use beacon_chain::test_utils::{AttestationStrategy, BlockStrategy}; +use beacon_chain::EngineState; +use lighthouse_network::rpc::{RequestType, StatusMessage}; +use lighthouse_network::service::api_types::{AppRequestId, Id, SyncRequestId}; +use lighthouse_network::{PeerId, SyncInfo}; +use std::time::Duration; +use types::{EthSpec, Hash256, MinimalEthSpec as E, SignedBeaconBlock, Slot}; +const D: Duration = Duration::new(0, 0); + +impl TestRig { + /// Produce a head peer with an advanced head + fn add_head_peer(&mut self) -> PeerId { + self.add_head_peer_with_root(Hash256::random()) + } + + /// Produce a head peer with an advanced head + fn add_head_peer_with_root(&mut self, head_root: Hash256) -> PeerId { + let local_info = self.local_info(); + self.add_peer(SyncInfo { + head_root, + head_slot: local_info.head_slot + 1 + Slot::new(SLOT_IMPORT_TOLERANCE as u64), + ..local_info + }) + } + + // Produce a finalized peer with an advanced finalized epoch + fn add_finalized_peer(&mut self) -> PeerId { + self.add_finalized_peer_with_root(Hash256::random()) + } + + // Produce a finalized peer with an advanced finalized epoch + fn add_finalized_peer_with_root(&mut self, finalized_root: Hash256) -> PeerId { + let local_info = self.local_info(); + let finalized_epoch = local_info.finalized_epoch + 2; + self.add_peer(SyncInfo { + finalized_epoch, + finalized_root, + head_slot: finalized_epoch.start_slot(E::slots_per_epoch()), + head_root: Hash256::random(), + }) + } + + fn local_info(&self) -> SyncInfo { + let StatusMessage { + fork_digest: _, + finalized_root, + finalized_epoch, + head_root, + head_slot, + } = self.harness.chain.status_message(); + SyncInfo { + head_slot, + head_root, + finalized_epoch, + finalized_root, + } + } + + fn add_peer(&mut self, remote_info: SyncInfo) -> PeerId { + // Create valid peer known to network globals + let peer_id = self.new_connected_peer(); + // Send peer to sync + self.send_sync_message(SyncMessage::AddPeer(peer_id, remote_info.clone())); + peer_id + } + + fn assert_state(&self, state: RangeSyncType) { + assert_eq!( + self.sync_manager + .range_sync_state() + .expect("State is ok") + .expect("Range should be syncing") + .0, + state, + "not expected range sync state" + ); + } + + #[track_caller] + fn expect_chain_segment(&mut self) { + self.pop_received_processor_event(|ev| { + (ev.work_type() == beacon_processor::WorkType::ChainSegment).then_some(()) + }) + .unwrap_or_else(|e| panic!("Expect ChainSegment work event: {e:?}")); + } + + fn update_execution_engine_state(&mut self, state: EngineState) { + self.log(&format!("execution engine state updated: {state:?}")); + self.sync_manager.update_execution_engine_state(state); + } + + fn find_blocks_by_range_request(&mut self, target_peer_id: &PeerId) -> (Id, Option) { + let block_req_id = self + .pop_received_network_event(|ev| match ev { + NetworkMessage::SendRequest { + peer_id, + request: RequestType::BlocksByRange(_), + request_id: AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), + } if peer_id == target_peer_id => Some(*id), + _ => None, + }) + .expect("Should have a blocks by range request"); + + let blob_req_id = if self.after_deneb() { + Some( + self.pop_received_network_event(|ev| match ev { + NetworkMessage::SendRequest { + peer_id, + request: RequestType::BlobsByRange(_), + request_id: AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), + } if peer_id == target_peer_id => Some(*id), + _ => None, + }) + .expect("Should have a blobs by range request"), + ) + } else { + None + }; + + (block_req_id, blob_req_id) + } + + fn find_and_complete_blocks_by_range_request(&mut self, target_peer_id: PeerId) { + let (blocks_req_id, blobs_req_id) = self.find_blocks_by_range_request(&target_peer_id); + + // Complete the request with a single stream termination + self.log(&format!( + "Completing BlocksByRange request {blocks_req_id} with empty stream" + )); + self.send_sync_message(SyncMessage::RpcBlock { + request_id: SyncRequestId::RangeBlockAndBlobs { id: blocks_req_id }, + peer_id: target_peer_id, + beacon_block: None, + seen_timestamp: D, + }); + + if let Some(blobs_req_id) = blobs_req_id { + // Complete the request with a single stream termination + self.log(&format!( + "Completing BlobsByRange request {blobs_req_id} with empty stream" + )); + self.send_sync_message(SyncMessage::RpcBlob { + request_id: SyncRequestId::RangeBlockAndBlobs { id: blobs_req_id }, + peer_id: target_peer_id, + blob_sidecar: None, + seen_timestamp: D, + }); + } + } + + async fn create_canonical_block(&mut self) -> SignedBeaconBlock { + self.harness.advance_slot(); + + let block_root = self + .harness + .extend_chain( + 1, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + self.harness + .chain + .store + .get_full_block(&block_root) + .unwrap() + .unwrap() + } + + async fn remember_block(&mut self, block: SignedBeaconBlock) { + self.harness + .process_block(block.slot(), block.canonical_root(), (block.into(), None)) + .await + .unwrap(); + } +} + +#[test] +fn head_chain_removed_while_finalized_syncing() { + // NOTE: this is a regression test. + // Added in PR https://github.com/sigp/lighthouse/pull/2821 + let mut rig = TestRig::test_setup(); + + // Get a peer with an advanced head + let head_peer = rig.add_head_peer(); + rig.assert_state(RangeSyncType::Head); + + // Sync should have requested a batch, grab the request. + let _ = rig.find_blocks_by_range_request(&head_peer); + + // Now get a peer with an advanced finalized epoch. + let finalized_peer = rig.add_finalized_peer(); + rig.assert_state(RangeSyncType::Finalized); + + // Sync should have requested a batch, grab the request + let _ = rig.find_blocks_by_range_request(&finalized_peer); + + // Fail the head chain by disconnecting the peer. + rig.peer_disconnected(head_peer); + rig.assert_state(RangeSyncType::Finalized); +} + +#[tokio::test] +async fn state_update_while_purging() { + // NOTE: this is a regression test. + // Added in PR https://github.com/sigp/lighthouse/pull/2827 + let mut rig = TestRig::test_setup(); + + // Create blocks on a separate harness + let mut rig_2 = TestRig::test_setup(); + // Need to create blocks that can be inserted into the fork-choice and fit the "known + // conditions" below. + let head_peer_block = rig_2.create_canonical_block().await; + let head_peer_root = head_peer_block.canonical_root(); + let finalized_peer_block = rig_2.create_canonical_block().await; + let finalized_peer_root = finalized_peer_block.canonical_root(); + + // Get a peer with an advanced head + let head_peer = rig.add_head_peer_with_root(head_peer_root); + rig.assert_state(RangeSyncType::Head); + + // Sync should have requested a batch, grab the request. + let _ = rig.find_blocks_by_range_request(&head_peer); + + // Now get a peer with an advanced finalized epoch. + let finalized_peer = rig.add_finalized_peer_with_root(finalized_peer_root); + rig.assert_state(RangeSyncType::Finalized); + + // Sync should have requested a batch, grab the request + let _ = rig.find_blocks_by_range_request(&finalized_peer); + + // Now the chain knows both chains target roots. + rig.remember_block(head_peer_block).await; + rig.remember_block(finalized_peer_block).await; + + // Add an additional peer to the second chain to make range update it's status + rig.add_finalized_peer(); +} + +#[test] +fn pause_and_resume_on_ee_offline() { + let mut rig = TestRig::test_setup(); + + // add some peers + let peer1 = rig.add_head_peer(); + // make the ee offline + rig.update_execution_engine_state(EngineState::Offline); + // send the response to the request + rig.find_and_complete_blocks_by_range_request(peer1); + // the beacon processor shouldn't have received any work + rig.expect_empty_processor(); + + // while the ee is offline, more peers might arrive. Add a new finalized peer. + let peer2 = rig.add_finalized_peer(); + + // send the response to the request + rig.find_and_complete_blocks_by_range_request(peer2); + // the beacon processor shouldn't have received any work + rig.expect_empty_processor(); + // make the beacon processor available again. + // update_execution_engine_state implicitly calls resume + // now resume range, we should have two processing requests in the beacon processor. + rig.update_execution_engine_state(EngineState::Online); + + rig.expect_chain_segment(); + rig.expect_chain_segment(); +}