From 4c6d75515623e22a3310773ae7bda1979bcc269f Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Mon, 7 Oct 2024 00:20:33 +0300 Subject: [PATCH 1/6] Write range sync tests in external event-driven form --- beacon_node/network/src/sync/manager.rs | 10 + .../src/sync/range_sync/chain_collection.rs | 7 +- .../network/src/sync/range_sync/mod.rs | 2 + .../network/src/sync/range_sync/range.rs | 470 +----------------- beacon_node/network/src/sync/tests/lookups.rs | 29 +- beacon_node/network/src/sync/tests/range.rs | 339 +++++++++++++ 6 files changed, 378 insertions(+), 479 deletions(-) diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 344e91711c4..5d02be2b4c1 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -362,6 +362,16 @@ impl SyncManager { self.sampling.get_request_status(block_root, index) } + #[cfg(test)] + pub(crate) fn range_sync_state(&self) -> super::range_sync::SyncChainStatus { + self.range_sync.state() + } + + #[cfg(test)] + pub(crate) fn update_execution_engine_state(&mut self, state: EngineState) { + self.handle_new_execution_engine_state(state); + } + fn network_globals(&self) -> &NetworkGlobals { self.network.network_globals() } diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 1217fbf8fed..d337c2505f4 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -37,6 +37,9 @@ pub enum RangeSyncState { Idle, } +pub type SyncChainStatus = + Result, &'static str>; + /// A collection of finalized and head chains currently being processed. pub struct ChainCollection { /// The beacon chain for processing. @@ -213,9 +216,7 @@ impl ChainCollection { } } - pub fn state( - &self, - ) -> Result, &'static str> { + pub fn state(&self) -> SyncChainStatus { match self.state { RangeSyncState::Finalized(ref syncing_id) => { let chain = self diff --git a/beacon_node/network/src/sync/range_sync/mod.rs b/beacon_node/network/src/sync/range_sync/mod.rs index d0f2f9217eb..3f8cf400c68 100644 --- a/beacon_node/network/src/sync/range_sync/mod.rs +++ b/beacon_node/network/src/sync/range_sync/mod.rs @@ -12,6 +12,8 @@ pub use batch::{ BatchConfig, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, ByRangeRequestType, }; +pub use block_storage::BlockStorage; pub use chain::{BatchId, ChainId, EPOCHS_PER_BATCH}; +pub use chain_collection::SyncChainStatus; pub use range::RangeSync; pub use sync_type::RangeSyncType; diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 0ef99838dee..0e9166c95cd 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -41,7 +41,7 @@ use super::block_storage::BlockStorage; use super::chain::{BatchId, ChainId, RemoveChain, SyncingChain}; -use super::chain_collection::ChainCollection; +use super::chain_collection::{ChainCollection, SyncChainStatus}; use super::sync_type::RangeSyncType; use crate::metrics; use crate::status::ToStatusMessage; @@ -56,7 +56,7 @@ use lru_cache::LRUTimeCache; use slog::{crit, debug, trace, warn}; use std::collections::HashMap; use std::sync::Arc; -use types::{Epoch, EthSpec, Hash256, Slot}; +use types::{Epoch, EthSpec, Hash256}; /// For how long we store failed finalized chains to prevent retries. const FAILED_CHAINS_EXPIRY_SECONDS: u64 = 30; @@ -96,9 +96,7 @@ where } } - pub fn state( - &self, - ) -> Result, &'static str> { + pub fn state(&self) -> SyncChainStatus { self.chains.state() } @@ -382,465 +380,3 @@ where } } } - -#[cfg(test)] -mod tests { - use crate::network_beacon_processor::NetworkBeaconProcessor; - use crate::sync::SyncMessage; - use crate::NetworkMessage; - - use super::*; - use crate::sync::network_context::{BlockOrBlob, RangeRequestId}; - use beacon_chain::builder::Witness; - use beacon_chain::eth1_chain::CachingEth1Backend; - use beacon_chain::parking_lot::RwLock; - use beacon_chain::test_utils::{BeaconChainHarness, EphemeralHarnessType}; - use beacon_chain::EngineState; - use beacon_processor::WorkEvent as BeaconWorkEvent; - use lighthouse_network::service::api_types::SyncRequestId; - use lighthouse_network::{ - rpc::StatusMessage, service::api_types::AppRequestId, NetworkConfig, NetworkGlobals, - }; - use slog::{o, Drain}; - use slot_clock::TestingSlotClock; - use std::collections::HashSet; - use store::MemoryStore; - use tokio::sync::mpsc; - use types::{FixedBytesExtended, ForkName, MinimalEthSpec as E}; - - #[derive(Debug)] - struct FakeStorage { - known_blocks: RwLock>, - status: RwLock, - } - - impl Default for FakeStorage { - fn default() -> Self { - FakeStorage { - known_blocks: RwLock::new(HashSet::new()), - status: RwLock::new(StatusMessage { - fork_digest: [0; 4], - finalized_root: Hash256::zero(), - finalized_epoch: 0usize.into(), - head_root: Hash256::zero(), - head_slot: 0usize.into(), - }), - } - } - } - - impl FakeStorage { - fn remember_block(&self, block_root: Hash256) { - self.known_blocks.write().insert(block_root); - } - - #[allow(dead_code)] - fn forget_block(&self, block_root: &Hash256) { - self.known_blocks.write().remove(block_root); - } - } - - impl BlockStorage for FakeStorage { - fn is_block_known(&self, block_root: &store::Hash256) -> bool { - self.known_blocks.read().contains(block_root) - } - } - - impl ToStatusMessage for FakeStorage { - fn status_message(&self) -> StatusMessage { - self.status.read().clone() - } - } - - type TestBeaconChainType = - Witness, E, MemoryStore, MemoryStore>; - - fn build_log(level: slog::Level, enabled: bool) -> slog::Logger { - let decorator = slog_term::TermDecorator::new().build(); - let drain = slog_term::FullFormat::new(decorator).build().fuse(); - let drain = slog_async::Async::new(drain).build().fuse(); - - if enabled { - slog::Logger::root(drain.filter_level(level).fuse(), o!()) - } else { - slog::Logger::root(drain.filter(|_| false).fuse(), o!()) - } - } - - #[allow(unused)] - struct TestRig { - log: slog::Logger, - /// To check what does sync send to the beacon processor. - beacon_processor_rx: mpsc::Receiver>, - /// To set up different scenarios where sync is told about known/unknown blocks. - chain: Arc, - /// Needed by range to handle communication with the network. - cx: SyncNetworkContext, - /// To check what the network receives from Range. - network_rx: mpsc::UnboundedReceiver>, - /// To modify what the network declares about various global variables, in particular about - /// the sync state of a peer. - globals: Arc>, - } - - impl RangeSync { - fn assert_state(&self, expected_state: RangeSyncType) { - assert_eq!( - self.state() - .expect("State is ok") - .expect("Range is syncing") - .0, - expected_state - ) - } - - #[allow(dead_code)] - fn assert_not_syncing(&self) { - assert!( - self.state().expect("State is ok").is_none(), - "Range should not be syncing." - ); - } - } - - impl TestRig { - fn local_info(&self) -> SyncInfo { - let StatusMessage { - fork_digest: _, - finalized_root, - finalized_epoch, - head_root, - head_slot, - } = self.chain.status.read().clone(); - SyncInfo { - head_slot, - head_root, - finalized_epoch, - finalized_root, - } - } - - /// Reads an BlocksByRange request to a given peer from the network receiver channel. - #[track_caller] - fn grab_request( - &mut self, - expected_peer: &PeerId, - fork_name: ForkName, - ) -> (AppRequestId, Option) { - let block_req_id = if let Ok(NetworkMessage::SendRequest { - peer_id, - request: _, - request_id, - }) = self.network_rx.try_recv() - { - assert_eq!(&peer_id, expected_peer); - request_id - } else { - panic!("Should have sent a batch request to the peer") - }; - let blob_req_id = if fork_name.deneb_enabled() { - if let Ok(NetworkMessage::SendRequest { - peer_id, - request: _, - request_id, - }) = self.network_rx.try_recv() - { - assert_eq!(&peer_id, expected_peer); - Some(request_id) - } else { - panic!("Should have sent a batch request to the peer") - } - } else { - None - }; - (block_req_id, blob_req_id) - } - - fn complete_range_block_and_blobs_response( - &mut self, - block_req: AppRequestId, - blob_req_opt: Option, - ) -> (ChainId, BatchId, Id) { - if blob_req_opt.is_some() { - match block_req { - AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }) => { - let _ = self - .cx - .range_block_and_blob_response(id, BlockOrBlob::Block(None)); - let response = self - .cx - .range_block_and_blob_response(id, BlockOrBlob::Blob(None)) - .unwrap(); - let (chain_id, batch_id) = - TestRig::unwrap_range_request_id(response.sender_id); - (chain_id, batch_id, id) - } - other => panic!("unexpected request {:?}", other), - } - } else { - match block_req { - AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }) => { - let response = self - .cx - .range_block_and_blob_response(id, BlockOrBlob::Block(None)) - .unwrap(); - let (chain_id, batch_id) = - TestRig::unwrap_range_request_id(response.sender_id); - (chain_id, batch_id, id) - } - other => panic!("unexpected request {:?}", other), - } - } - } - - fn unwrap_range_request_id(sender_id: RangeRequestId) -> (ChainId, BatchId) { - if let RangeRequestId::RangeSync { chain_id, batch_id } = sender_id { - (chain_id, batch_id) - } else { - panic!("expected RangeSync request: {:?}", sender_id) - } - } - - /// Produce a head peer - fn head_peer( - &self, - ) -> ( - PeerId, - SyncInfo, /* Local info */ - SyncInfo, /* Remote info */ - ) { - let local_info = self.local_info(); - - // Get a peer with an advanced head - let head_root = Hash256::random(); - let head_slot = local_info.head_slot + 1; - let remote_info = SyncInfo { - head_root, - head_slot, - ..local_info - }; - let peer_id = PeerId::random(); - (peer_id, local_info, remote_info) - } - - fn finalized_peer( - &self, - ) -> ( - PeerId, - SyncInfo, /* Local info */ - SyncInfo, /* Remote info */ - ) { - let local_info = self.local_info(); - - let finalized_root = Hash256::random(); - let finalized_epoch = local_info.finalized_epoch + 2; - let head_slot = finalized_epoch.start_slot(E::slots_per_epoch()); - let head_root = Hash256::random(); - let remote_info = SyncInfo { - finalized_epoch, - finalized_root, - head_slot, - head_root, - }; - - let peer_id = PeerId::random(); - (peer_id, local_info, remote_info) - } - - #[track_caller] - fn expect_empty_processor(&mut self) { - match self.beacon_processor_rx.try_recv() { - Ok(work) => { - panic!( - "Expected empty processor. Instead got {}", - work.work_type_str() - ); - } - Err(e) => match e { - mpsc::error::TryRecvError::Empty => {} - mpsc::error::TryRecvError::Disconnected => unreachable!("bad coded test?"), - }, - } - } - - #[track_caller] - fn expect_chain_segment(&mut self) { - match self.beacon_processor_rx.try_recv() { - Ok(work) => { - assert_eq!(work.work_type(), beacon_processor::WorkType::ChainSegment); - } - other => panic!("Expected chain segment process, found {:?}", other), - } - } - } - - fn range(log_enabled: bool) -> (TestRig, RangeSync) { - let log = build_log(slog::Level::Trace, log_enabled); - // Initialise a new beacon chain - let harness = BeaconChainHarness::>::builder(E) - .default_spec() - .logger(log.clone()) - .deterministic_keypairs(1) - .fresh_ephemeral_store() - .build(); - let chain = harness.chain; - - let fake_store = Arc::new(FakeStorage::default()); - let range_sync = RangeSync::::new( - fake_store.clone(), - log.new(o!("component" => "range")), - ); - let (network_tx, network_rx) = mpsc::unbounded_channel(); - let (sync_tx, _sync_rx) = mpsc::unbounded_channel::>(); - let network_config = Arc::new(NetworkConfig::default()); - let globals = Arc::new(NetworkGlobals::new_test_globals( - Vec::new(), - &log, - network_config, - chain.spec.clone(), - )); - let (network_beacon_processor, beacon_processor_rx) = - NetworkBeaconProcessor::null_for_testing( - globals.clone(), - sync_tx, - chain.clone(), - harness.runtime.task_executor.clone(), - log.clone(), - ); - let cx = SyncNetworkContext::new( - network_tx, - Arc::new(network_beacon_processor), - chain, - log.new(o!("component" => "network_context")), - ); - let test_rig = TestRig { - log, - beacon_processor_rx, - chain: fake_store, - cx, - network_rx, - globals, - }; - (test_rig, range_sync) - } - - #[test] - fn head_chain_removed_while_finalized_syncing() { - // NOTE: this is a regression test. - let (mut rig, mut range) = range(false); - - // Get a peer with an advanced head - let (head_peer, local_info, remote_info) = rig.head_peer(); - range.add_peer(&mut rig.cx, local_info, head_peer, remote_info); - range.assert_state(RangeSyncType::Head); - - let fork = rig - .cx - .chain - .spec - .fork_name_at_epoch(rig.cx.chain.epoch().unwrap()); - - // Sync should have requested a batch, grab the request. - let _ = rig.grab_request(&head_peer, fork); - - // Now get a peer with an advanced finalized epoch. - let (finalized_peer, local_info, remote_info) = rig.finalized_peer(); - range.add_peer(&mut rig.cx, local_info, finalized_peer, remote_info); - range.assert_state(RangeSyncType::Finalized); - - // Sync should have requested a batch, grab the request - let _ = rig.grab_request(&finalized_peer, fork); - - // Fail the head chain by disconnecting the peer. - range.remove_peer(&mut rig.cx, &head_peer); - range.assert_state(RangeSyncType::Finalized); - } - - #[test] - fn state_update_while_purging() { - // NOTE: this is a regression test. - let (mut rig, mut range) = range(true); - - // Get a peer with an advanced head - let (head_peer, local_info, head_info) = rig.head_peer(); - let head_peer_root = head_info.head_root; - range.add_peer(&mut rig.cx, local_info, head_peer, head_info); - range.assert_state(RangeSyncType::Head); - - let fork = rig - .cx - .chain - .spec - .fork_name_at_epoch(rig.cx.chain.epoch().unwrap()); - - // Sync should have requested a batch, grab the request. - let _ = rig.grab_request(&head_peer, fork); - - // Now get a peer with an advanced finalized epoch. - let (finalized_peer, local_info, remote_info) = rig.finalized_peer(); - let finalized_peer_root = remote_info.finalized_root; - range.add_peer(&mut rig.cx, local_info, finalized_peer, remote_info); - range.assert_state(RangeSyncType::Finalized); - - // Sync should have requested a batch, grab the request - let _ = rig.grab_request(&finalized_peer, fork); - - // Now the chain knows both chains target roots. - rig.chain.remember_block(head_peer_root); - rig.chain.remember_block(finalized_peer_root); - - // Add an additional peer to the second chain to make range update it's status - let (finalized_peer, local_info, remote_info) = rig.finalized_peer(); - range.add_peer(&mut rig.cx, local_info, finalized_peer, remote_info); - } - - #[test] - fn pause_and_resume_on_ee_offline() { - let (mut rig, mut range) = range(true); - let fork = rig - .cx - .chain - .spec - .fork_name_at_epoch(rig.cx.chain.epoch().unwrap()); - - // add some peers - let (peer1, local_info, head_info) = rig.head_peer(); - range.add_peer(&mut rig.cx, local_info, peer1, head_info); - let (block_req, blob_req_opt) = rig.grab_request(&peer1, fork); - - let (chain1, batch1, id1) = - rig.complete_range_block_and_blobs_response(block_req, blob_req_opt); - - // make the ee offline - rig.cx.update_execution_engine_state(EngineState::Offline); - - // send the response to the request - range.blocks_by_range_response(&mut rig.cx, peer1, chain1, batch1, id1, vec![]); - - // the beacon processor shouldn't have received any work - rig.expect_empty_processor(); - - // while the ee is offline, more peers might arrive. Add a new finalized peer. - let (peer2, local_info, finalized_info) = rig.finalized_peer(); - range.add_peer(&mut rig.cx, local_info, peer2, finalized_info); - let (block_req, blob_req_opt) = rig.grab_request(&peer2, fork); - - let (chain2, batch2, id2) = - rig.complete_range_block_and_blobs_response(block_req, blob_req_opt); - - // send the response to the request - range.blocks_by_range_response(&mut rig.cx, peer2, chain2, batch2, id2, vec![]); - - // the beacon processor shouldn't have received any work - rig.expect_empty_processor(); - - // make the beacon processor available again. - rig.cx.update_execution_engine_state(EngineState::Online); - - // now resume range, we should have two processing requests in the beacon processor. - range.resume(&mut rig.cx); - - rig.expect_chain_segment(); - rig.expect_chain_segment(); - } -} diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index 9f2c9ef66f0..c7c1a854474 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -144,7 +144,7 @@ impl TestRig { } } - fn test_setup() -> Self { + pub fn test_setup() -> Self { Self::test_setup_with_config(None) } @@ -168,11 +168,11 @@ impl TestRig { } } - fn log(&self, msg: &str) { + pub fn log(&self, msg: &str) { info!(self.log, "TEST_RIG"; "msg" => msg); } - fn after_deneb(&self) -> bool { + pub fn after_deneb(&self) -> bool { matches!(self.fork_name, ForkName::Deneb | ForkName::Electra) } @@ -238,7 +238,7 @@ impl TestRig { (parent, block, parent_root, block_root) } - fn send_sync_message(&mut self, sync_message: SyncMessage) { + pub fn send_sync_message(&mut self, sync_message: SyncMessage) { self.sync_manager.handle_message(sync_message); } @@ -369,7 +369,7 @@ impl TestRig { self.expect_empty_network(); } - fn new_connected_peer(&mut self) -> PeerId { + pub fn new_connected_peer(&mut self) -> PeerId { self.network_globals .peers .write() @@ -811,7 +811,7 @@ impl TestRig { } } - fn peer_disconnected(&mut self, peer_id: PeerId) { + pub fn peer_disconnected(&mut self, peer_id: PeerId) { self.send_sync_message(SyncMessage::Disconnect(peer_id)); } @@ -827,7 +827,7 @@ impl TestRig { } } - fn pop_received_network_event) -> Option>( + pub fn pop_received_network_event) -> Option>( &mut self, predicate_transform: F, ) -> Result { @@ -847,7 +847,7 @@ impl TestRig { } } - fn pop_received_processor_event) -> Option>( + pub fn pop_received_processor_event) -> Option>( &mut self, predicate_transform: F, ) -> Result { @@ -871,6 +871,16 @@ impl TestRig { } } + pub fn expect_empty_processor(&mut self) { + self.drain_processor_rx(); + if !self.beacon_processor_rx_queue.is_empty() { + panic!( + "Expected processor to be empty, but has events: {:?}", + self.beacon_processor_rx_queue + ); + } + } + fn find_block_lookup_request( &mut self, for_block: Hash256, @@ -2173,7 +2183,8 @@ fn custody_lookup_happy_path() { mod deneb_only { use super::*; use beacon_chain::{ - block_verification_types::RpcBlock, data_availability_checker::AvailabilityCheckError, + block_verification_types::{AsBlock, RpcBlock}, + data_availability_checker::AvailabilityCheckError, }; use ssz_types::VariableList; use std::collections::VecDeque; diff --git a/beacon_node/network/src/sync/tests/range.rs b/beacon_node/network/src/sync/tests/range.rs index 8b137891791..c28f10b71fa 100644 --- a/beacon_node/network/src/sync/tests/range.rs +++ b/beacon_node/network/src/sync/tests/range.rs @@ -1 +1,340 @@ +use super::*; +use crate::status::ToStatusMessage; +use crate::sync::manager::SLOT_IMPORT_TOLERANCE; +use crate::sync::range_sync::{BatchId, BlockStorage, RangeSyncType}; +use crate::sync::{ChainId, SyncMessage}; +use beacon_chain::parking_lot::RwLock; +use beacon_chain::test_utils::BlockStrategy; +use beacon_chain::EngineState; +use bls::FixedBytesExtended; +use lighthouse_network::rpc::{RequestType, StatusMessage}; +use lighthouse_network::service::api_types::{AppRequestId, Id, SyncRequestId}; +use lighthouse_network::{PeerId, SyncInfo}; +use std::collections::HashSet; +use std::sync::Arc; +use std::time::Duration; +use types::{EthSpec, Hash256, MinimalEthSpec as E, SignedBeaconBlock, Slot}; +const D: Duration = Duration::new(0, 0); + +#[derive(Debug)] +pub struct FakeStorage { + known_blocks: RwLock>, + status: RwLock, +} + +impl Default for FakeStorage { + fn default() -> Self { + FakeStorage { + known_blocks: RwLock::new(HashSet::new()), + status: RwLock::new(StatusMessage { + fork_digest: [0; 4], + finalized_root: Hash256::zero(), + finalized_epoch: 0usize.into(), + head_root: Hash256::zero(), + head_slot: 0usize.into(), + }), + } + } +} + +impl FakeStorage { + fn remember_block(&self, block_root: Hash256) { + self.known_blocks.write().insert(block_root); + } + + #[allow(dead_code)] + fn forget_block(&self, block_root: &Hash256) { + self.known_blocks.write().remove(block_root); + } +} + +impl BlockStorage for FakeStorage { + fn is_block_known(&self, block_root: &store::Hash256) -> bool { + self.known_blocks.read().contains(block_root) + } +} + +impl ToStatusMessage for FakeStorage { + fn status_message(&self) -> StatusMessage { + self.status.read().clone() + } +} + +type PeerTestInfo = (PeerId, SyncInfo /* Remote info */); + +impl TestRig { + /// Produce a head peer with an advanced head + fn add_head_peer(&mut self) -> PeerTestInfo { + let local_info = self.local_info(); + self.add_peer(SyncInfo { + head_root: Hash256::random(), + head_slot: local_info.head_slot + 1 + Slot::new(SLOT_IMPORT_TOLERANCE as u64), + ..local_info + }) + } + + // Produce a finalized peer with an advanced finalized epoch + fn add_finalized_peer(&mut self) -> PeerTestInfo { + let local_info = self.local_info(); + let finalized_epoch = local_info.finalized_epoch + 2; + self.add_peer(SyncInfo { + finalized_epoch, + finalized_root: Hash256::random(), + head_slot: finalized_epoch.start_slot(E::slots_per_epoch()), + head_root: Hash256::random(), + }) + } + + fn local_info(&self) -> SyncInfo { + let StatusMessage { + fork_digest: _, + finalized_root, + finalized_epoch, + head_root, + head_slot, + } = self.harness.chain.status_message(); + SyncInfo { + head_slot, + head_root, + finalized_epoch, + finalized_root, + } + } + + fn add_peer(&mut self, remote_info: SyncInfo) -> PeerTestInfo { + // Create valid peer known to network globals + let peer_id = self.new_connected_peer(); + // Send peer to sync + self.send_sync_message(SyncMessage::AddPeer(peer_id, remote_info.clone())); + (peer_id, remote_info) + } + + fn assert_state(&self, state: RangeSyncType) { + assert_eq!( + self.sync_manager + .range_sync_state() + .expect("State is ok") + .expect("Range should be syncing") + .0, + state, + "not expected range sync state" + ); + } + + #[track_caller] + fn expect_chain_segment(&mut self) { + self.pop_received_processor_event(|ev| { + (ev.work_type() == beacon_processor::WorkType::ChainSegment).then_some(()) + }) + .unwrap_or_else(|e| panic!("Expect ChainSegment work event")); + } + + fn update_execution_engine_state(&mut self, state: EngineState) { + self.log(&format!("execution engine state updated: {state:?}")); + self.sync_manager.update_execution_engine_state(state); + } + + fn send_blocks_by_range_response( + &mut self, + peer_id: PeerId, + beacon_block: Option>>, + range_blocks_req_id: Id, + ) { + self.log("send_blocks_by_range_response"); + self.send_sync_message(SyncMessage::RpcBlock { + request_id: SyncRequestId::RangeBlockAndBlobs { + id: range_blocks_req_id, + }, + peer_id, + beacon_block, + seen_timestamp: D, + }); + } + + fn send_empty_blocks_by_range_response( + &mut self, + _peer_id: PeerId, + _chain_id: ChainId, + _batch_id: BatchId, + _req_id: Id, + ) { + // Send empty vector of blocks to range sync + // todo!(); + } + + fn complete_range_block_and_blobs_response(&mut self, peer_id: PeerId, req_id: Id) { + // For all active requests associated with a block or blob request ID send the stream + // terminator without blocks + self.send_blocks_by_range_response(peer_id, None, req_id); + } + + fn find_blocks_by_range_request(&mut self, target_peer_id: &PeerId) -> (Id, Option) { + let block_req_id = self + .pop_received_network_event(|ev| match ev { + NetworkMessage::SendRequest { + peer_id, + request: RequestType::BlocksByRange(_), + request_id: AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), + } if peer_id == target_peer_id => Some(*id), + _ => None, + }) + .expect("Should have a blocks by range request"); + + let blob_req_id = if self.after_deneb() { + Some( + self.pop_received_network_event(|ev| match ev { + NetworkMessage::SendRequest { + peer_id, + request: RequestType::BlobsByRange(_), + request_id: AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), + } if peer_id == target_peer_id => Some(*id), + _ => None, + }) + .expect("Should have a blobs by range request"), + ) + } else { + None + }; + + (block_req_id, blob_req_id) + } + + fn find_and_complete_blocks_by_range_request(&mut self, target_peer_id: PeerId) { + let (blocks_req_id, blobs_req_id) = self.find_blocks_by_range_request(&target_peer_id); + + // Complete the request with a single stream termination + self.log(&format!( + "Completing BlocksByRange request {blocks_req_id} with empty stream" + )); + self.send_sync_message(SyncMessage::RpcBlock { + request_id: SyncRequestId::RangeBlockAndBlobs { id: blocks_req_id }, + peer_id: target_peer_id, + beacon_block: None, + seen_timestamp: D, + }); + + if let Some(blobs_req_id) = blobs_req_id { + // Complete the request with a single stream termination + self.log(&format!( + "Completing BlobsByRange request {blobs_req_id} with empty stream" + )); + self.send_sync_message(SyncMessage::RpcBlob { + request_id: SyncRequestId::RangeBlockAndBlobs { id: blobs_req_id }, + peer_id: target_peer_id, + blob_sidecar: None, + seen_timestamp: D, + }); + } + } + + fn create_remembered_block(&mut self) -> Hash256 { + // Add block to chain storage such that `knows block` returns true + // block_on(self.harness.extend_chain( + // 1, + // BlockStrategy::OnCanonicalHead, + // AttestationStrategy::AllValidators, + // )); + + let block_root = self + .harness + .chain + .canonical_head + .cached_head() + .head_block_root(); + + todo!(); + } + + fn remember_block(&mut self, block: Hash256) { + todo!(); + } +} + +#[test] +fn head_chain_removed_while_finalized_syncing() { + // NOTE: this is a regression test. + let mut rig = TestRig::test_setup(); + + // Get a peer with an advanced head + let (head_peer, _) = rig.add_head_peer(); + rig.assert_state(RangeSyncType::Head); + + // Sync should have requested a batch, grab the request. + let _ = rig.find_blocks_by_range_request(&head_peer); + + // Now get a peer with an advanced finalized epoch. + let (finalized_peer, _) = rig.add_finalized_peer(); + rig.assert_state(RangeSyncType::Finalized); + + // Sync should have requested a batch, grab the request + let _ = rig.find_blocks_by_range_request(&finalized_peer); + + // Fail the head chain by disconnecting the peer. + rig.peer_disconnected(head_peer); + rig.assert_state(RangeSyncType::Finalized); +} + +#[ignore] +#[test] +fn state_update_while_purging() { + // NOTE: this is a regression test. + let mut rig = TestRig::test_setup(); + + // TODO: Need to create blocks that can be inserted into the fork-choice and fit the "known + // conditions" below. + let known_block_root_1 = rig.create_remembered_block(); + let known_block_root_2 = rig.create_remembered_block(); + + // Get a peer with an advanced head + let (head_peer, head_info) = rig.add_head_peer(); + let head_peer_root = head_info.head_root; + rig.assert_state(RangeSyncType::Head); + + // Sync should have requested a batch, grab the request. + let _ = rig.find_blocks_by_range_request(&head_peer); + + // Now get a peer with an advanced finalized epoch. + let (finalized_peer, remote_info) = rig.add_finalized_peer(); + let finalized_peer_root = remote_info.finalized_root; + rig.assert_state(RangeSyncType::Finalized); + + // Sync should have requested a batch, grab the request + let _ = rig.find_blocks_by_range_request(&finalized_peer); + + // Now the chain knows both chains target roots. + rig.remember_block(head_peer_root); + rig.remember_block(finalized_peer_root); + + // Add an additional peer to the second chain to make range update it's status + rig.add_finalized_peer(); +} + +#[test] +fn pause_and_resume_on_ee_offline() { + let mut rig = TestRig::test_setup(); + + // add some peers + let (peer1, _) = rig.add_head_peer(); + // make the ee offline + rig.update_execution_engine_state(EngineState::Offline); + // send the response to the request + rig.find_and_complete_blocks_by_range_request(peer1); + // the beacon processor shouldn't have received any work + rig.expect_empty_processor(); + + // while the ee is offline, more peers might arrive. Add a new finalized peer. + let (peer2, _) = rig.add_finalized_peer(); + + // send the response to the request + rig.find_and_complete_blocks_by_range_request(peer2); + // the beacon processor shouldn't have received any work + rig.expect_empty_processor(); + // make the beacon processor available again. + // update_execution_engine_state implicitly calls resume + // now resume range, we should have two processing requests in the beacon processor. + rig.update_execution_engine_state(EngineState::Online); + + rig.expect_chain_segment(); + rig.expect_chain_segment(); +} From cb698a90032c22224e79aac9d0dccc5846eac3d9 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Wed, 27 Nov 2024 21:42:17 +0800 Subject: [PATCH 2/6] Fix remaining test --- .../network/src/sync/range_sync/mod.rs | 2 +- beacon_node/network/src/sync/tests/range.rs | 183 ++++++------------ 2 files changed, 60 insertions(+), 125 deletions(-) diff --git a/beacon_node/network/src/sync/range_sync/mod.rs b/beacon_node/network/src/sync/range_sync/mod.rs index 3f8cf400c68..0049fa9c118 100644 --- a/beacon_node/network/src/sync/range_sync/mod.rs +++ b/beacon_node/network/src/sync/range_sync/mod.rs @@ -12,8 +12,8 @@ pub use batch::{ BatchConfig, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, ByRangeRequestType, }; -pub use block_storage::BlockStorage; pub use chain::{BatchId, ChainId, EPOCHS_PER_BATCH}; +#[cfg(test)] pub use chain_collection::SyncChainStatus; pub use range::RangeSync; pub use sync_type::RangeSyncType; diff --git a/beacon_node/network/src/sync/tests/range.rs b/beacon_node/network/src/sync/tests/range.rs index c28f10b71fa..02f281ab8a6 100644 --- a/beacon_node/network/src/sync/tests/range.rs +++ b/beacon_node/network/src/sync/tests/range.rs @@ -1,86 +1,46 @@ use super::*; use crate::status::ToStatusMessage; use crate::sync::manager::SLOT_IMPORT_TOLERANCE; -use crate::sync::range_sync::{BatchId, BlockStorage, RangeSyncType}; -use crate::sync::{ChainId, SyncMessage}; -use beacon_chain::parking_lot::RwLock; -use beacon_chain::test_utils::BlockStrategy; +use crate::sync::range_sync::RangeSyncType; +use crate::sync::SyncMessage; +use beacon_chain::test_utils::{AttestationStrategy, BlockStrategy}; use beacon_chain::EngineState; -use bls::FixedBytesExtended; use lighthouse_network::rpc::{RequestType, StatusMessage}; use lighthouse_network::service::api_types::{AppRequestId, Id, SyncRequestId}; use lighthouse_network::{PeerId, SyncInfo}; -use std::collections::HashSet; -use std::sync::Arc; use std::time::Duration; use types::{EthSpec, Hash256, MinimalEthSpec as E, SignedBeaconBlock, Slot}; const D: Duration = Duration::new(0, 0); -#[derive(Debug)] -pub struct FakeStorage { - known_blocks: RwLock>, - status: RwLock, -} - -impl Default for FakeStorage { - fn default() -> Self { - FakeStorage { - known_blocks: RwLock::new(HashSet::new()), - status: RwLock::new(StatusMessage { - fork_digest: [0; 4], - finalized_root: Hash256::zero(), - finalized_epoch: 0usize.into(), - head_root: Hash256::zero(), - head_slot: 0usize.into(), - }), - } - } -} - -impl FakeStorage { - fn remember_block(&self, block_root: Hash256) { - self.known_blocks.write().insert(block_root); - } - - #[allow(dead_code)] - fn forget_block(&self, block_root: &Hash256) { - self.known_blocks.write().remove(block_root); - } -} - -impl BlockStorage for FakeStorage { - fn is_block_known(&self, block_root: &store::Hash256) -> bool { - self.known_blocks.read().contains(block_root) - } -} - -impl ToStatusMessage for FakeStorage { - fn status_message(&self) -> StatusMessage { - self.status.read().clone() +impl TestRig { + /// Produce a head peer with an advanced head + fn add_head_peer(&mut self) -> PeerId { + self.add_head_peer_with_root(Hash256::random()) } -} - -type PeerTestInfo = (PeerId, SyncInfo /* Remote info */); -impl TestRig { /// Produce a head peer with an advanced head - fn add_head_peer(&mut self) -> PeerTestInfo { + fn add_head_peer_with_root(&mut self, head_root: Hash256) -> PeerId { let local_info = self.local_info(); self.add_peer(SyncInfo { - head_root: Hash256::random(), + head_root, head_slot: local_info.head_slot + 1 + Slot::new(SLOT_IMPORT_TOLERANCE as u64), ..local_info }) } // Produce a finalized peer with an advanced finalized epoch - fn add_finalized_peer(&mut self) -> PeerTestInfo { + fn add_finalized_peer(&mut self) -> PeerId { + self.add_finalized_peer_with_root(Hash256::random()) + } + + // Produce a finalized peer with an advanced finalized epoch + fn add_finalized_peer_with_root(&mut self, finalized_root: Hash256) -> PeerId { let local_info = self.local_info(); let finalized_epoch = local_info.finalized_epoch + 2; self.add_peer(SyncInfo { finalized_epoch, - finalized_root: Hash256::random(), + finalized_root, head_slot: finalized_epoch.start_slot(E::slots_per_epoch()), head_root: Hash256::random(), }) @@ -102,12 +62,12 @@ impl TestRig { } } - fn add_peer(&mut self, remote_info: SyncInfo) -> PeerTestInfo { + fn add_peer(&mut self, remote_info: SyncInfo) -> PeerId { // Create valid peer known to network globals let peer_id = self.new_connected_peer(); // Send peer to sync self.send_sync_message(SyncMessage::AddPeer(peer_id, remote_info.clone())); - (peer_id, remote_info) + peer_id } fn assert_state(&self, state: RangeSyncType) { @@ -127,7 +87,7 @@ impl TestRig { self.pop_received_processor_event(|ev| { (ev.work_type() == beacon_processor::WorkType::ChainSegment).then_some(()) }) - .unwrap_or_else(|e| panic!("Expect ChainSegment work event")); + .unwrap_or_else(|e| panic!("Expect ChainSegment work event: {e:?}")); } fn update_execution_engine_state(&mut self, state: EngineState) { @@ -135,40 +95,6 @@ impl TestRig { self.sync_manager.update_execution_engine_state(state); } - fn send_blocks_by_range_response( - &mut self, - peer_id: PeerId, - beacon_block: Option>>, - range_blocks_req_id: Id, - ) { - self.log("send_blocks_by_range_response"); - self.send_sync_message(SyncMessage::RpcBlock { - request_id: SyncRequestId::RangeBlockAndBlobs { - id: range_blocks_req_id, - }, - peer_id, - beacon_block, - seen_timestamp: D, - }); - } - - fn send_empty_blocks_by_range_response( - &mut self, - _peer_id: PeerId, - _chain_id: ChainId, - _batch_id: BatchId, - _req_id: Id, - ) { - // Send empty vector of blocks to range sync - // todo!(); - } - - fn complete_range_block_and_blobs_response(&mut self, peer_id: PeerId, req_id: Id) { - // For all active requests associated with a block or blob request ID send the stream - // terminator without blocks - self.send_blocks_by_range_response(peer_id, None, req_id); - } - fn find_blocks_by_range_request(&mut self, target_peer_id: &PeerId) -> (Id, Option) { let block_req_id = self .pop_received_network_event(|ev| match ev { @@ -228,26 +154,34 @@ impl TestRig { } } - fn create_remembered_block(&mut self) -> Hash256 { - // Add block to chain storage such that `knows block` returns true - // block_on(self.harness.extend_chain( - // 1, - // BlockStrategy::OnCanonicalHead, - // AttestationStrategy::AllValidators, - // )); - - let block_root = self - .harness + fn create_canonical_block(&mut self) -> SignedBeaconBlock { + self.harness.advance_slot(); + + let block_root = + tokio::runtime::Runtime::new() + .unwrap() + .block_on(self.harness.extend_chain( + 1, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + )); + self.harness .chain - .canonical_head - .cached_head() - .head_block_root(); - - todo!(); + .store + .get_full_block(&block_root) + .unwrap() + .unwrap() } - fn remember_block(&mut self, block: Hash256) { - todo!(); + fn remember_block(&mut self, block: SignedBeaconBlock) { + tokio::runtime::Runtime::new() + .unwrap() + .block_on(self.harness.process_block( + block.slot(), + block.canonical_root(), + (block.into(), None), + )) + .unwrap(); } } @@ -257,14 +191,14 @@ fn head_chain_removed_while_finalized_syncing() { let mut rig = TestRig::test_setup(); // Get a peer with an advanced head - let (head_peer, _) = rig.add_head_peer(); + let head_peer = rig.add_head_peer(); rig.assert_state(RangeSyncType::Head); // Sync should have requested a batch, grab the request. let _ = rig.find_blocks_by_range_request(&head_peer); // Now get a peer with an advanced finalized epoch. - let (finalized_peer, _) = rig.add_finalized_peer(); + let finalized_peer = rig.add_finalized_peer(); rig.assert_state(RangeSyncType::Finalized); // Sync should have requested a batch, grab the request @@ -275,36 +209,37 @@ fn head_chain_removed_while_finalized_syncing() { rig.assert_state(RangeSyncType::Finalized); } -#[ignore] #[test] fn state_update_while_purging() { // NOTE: this is a regression test. let mut rig = TestRig::test_setup(); - // TODO: Need to create blocks that can be inserted into the fork-choice and fit the "known + // Create blocks on a separate harness + let mut rig_2 = TestRig::test_setup(); + // Need to create blocks that can be inserted into the fork-choice and fit the "known // conditions" below. - let known_block_root_1 = rig.create_remembered_block(); - let known_block_root_2 = rig.create_remembered_block(); + let head_peer_block = rig_2.create_canonical_block(); + let head_peer_root = head_peer_block.canonical_root(); + let finalized_peer_block = rig_2.create_canonical_block(); + let finalized_peer_root = finalized_peer_block.canonical_root(); // Get a peer with an advanced head - let (head_peer, head_info) = rig.add_head_peer(); - let head_peer_root = head_info.head_root; + let head_peer = rig.add_head_peer_with_root(head_peer_root); rig.assert_state(RangeSyncType::Head); // Sync should have requested a batch, grab the request. let _ = rig.find_blocks_by_range_request(&head_peer); // Now get a peer with an advanced finalized epoch. - let (finalized_peer, remote_info) = rig.add_finalized_peer(); - let finalized_peer_root = remote_info.finalized_root; + let finalized_peer = rig.add_finalized_peer_with_root(finalized_peer_root); rig.assert_state(RangeSyncType::Finalized); // Sync should have requested a batch, grab the request let _ = rig.find_blocks_by_range_request(&finalized_peer); // Now the chain knows both chains target roots. - rig.remember_block(head_peer_root); - rig.remember_block(finalized_peer_root); + rig.remember_block(head_peer_block); + rig.remember_block(finalized_peer_block); // Add an additional peer to the second chain to make range update it's status rig.add_finalized_peer(); @@ -315,7 +250,7 @@ fn pause_and_resume_on_ee_offline() { let mut rig = TestRig::test_setup(); // add some peers - let (peer1, _) = rig.add_head_peer(); + let peer1 = rig.add_head_peer(); // make the ee offline rig.update_execution_engine_state(EngineState::Offline); // send the response to the request @@ -324,7 +259,7 @@ fn pause_and_resume_on_ee_offline() { rig.expect_empty_processor(); // while the ee is offline, more peers might arrive. Add a new finalized peer. - let (peer2, _) = rig.add_finalized_peer(); + let peer2 = rig.add_finalized_peer(); // send the response to the request rig.find_and_complete_blocks_by_range_request(peer2); From 595d63eaff8e7befccbed514715d5cbae516cb20 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Wed, 27 Nov 2024 21:53:14 +0800 Subject: [PATCH 3/6] Drop unused generics --- .../network/src/sync/range_sync/block_storage.rs | 13 ------------- .../src/sync/range_sync/chain_collection.rs | 14 +++++++------- beacon_node/network/src/sync/range_sync/mod.rs | 1 - beacon_node/network/src/sync/range_sync/range.rs | 12 +++++------- .../network/src/sync/range_sync/sync_type.rs | 9 ++++----- 5 files changed, 16 insertions(+), 33 deletions(-) delete mode 100644 beacon_node/network/src/sync/range_sync/block_storage.rs diff --git a/beacon_node/network/src/sync/range_sync/block_storage.rs b/beacon_node/network/src/sync/range_sync/block_storage.rs deleted file mode 100644 index df49543a6b6..00000000000 --- a/beacon_node/network/src/sync/range_sync/block_storage.rs +++ /dev/null @@ -1,13 +0,0 @@ -use beacon_chain::{BeaconChain, BeaconChainTypes}; -use types::Hash256; - -/// Trait that helps maintain RangeSync's implementation split from the BeaconChain -pub trait BlockStorage { - fn is_block_known(&self, block_root: &Hash256) -> bool; -} - -impl BlockStorage for BeaconChain { - fn is_block_known(&self, block_root: &Hash256) -> bool { - self.block_is_known_to_fork_choice(block_root) - } -} diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index d337c2505f4..c030d0a19e8 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -3,12 +3,11 @@ //! Each chain type is stored in it's own map. A variety of helper functions are given along with //! this struct to simplify the logic of the other layers of sync. -use super::block_storage::BlockStorage; use super::chain::{ChainId, ProcessingResult, RemoveChain, SyncingChain}; use super::sync_type::RangeSyncType; use crate::metrics; use crate::sync::network_context::SyncNetworkContext; -use beacon_chain::BeaconChainTypes; +use beacon_chain::{BeaconChain, BeaconChainTypes}; use fnv::FnvHashMap; use lighthouse_network::PeerId; use lighthouse_network::SyncInfo; @@ -41,9 +40,9 @@ pub type SyncChainStatus = Result, &'static str>; /// A collection of finalized and head chains currently being processed. -pub struct ChainCollection { +pub struct ChainCollection { /// The beacon chain for processing. - beacon_chain: Arc, + beacon_chain: Arc>, /// The set of finalized chains being synced. finalized_chains: FnvHashMap>, /// The set of head chains being synced. @@ -54,8 +53,8 @@ pub struct ChainCollection { log: slog::Logger, } -impl ChainCollection { - pub fn new(beacon_chain: Arc, log: slog::Logger) -> Self { +impl ChainCollection { + pub fn new(beacon_chain: Arc>, log: slog::Logger) -> Self { ChainCollection { beacon_chain, finalized_chains: FnvHashMap::default(), @@ -410,7 +409,8 @@ impl ChainCollection { let log_ref = &self.log; let is_outdated = |target_slot: &Slot, target_root: &Hash256| { - target_slot <= &local_finalized_slot || beacon_chain.is_block_known(target_root) + target_slot <= &local_finalized_slot + || beacon_chain.block_is_known_to_fork_choice(target_root) }; // Retain only head peers that remain relevant diff --git a/beacon_node/network/src/sync/range_sync/mod.rs b/beacon_node/network/src/sync/range_sync/mod.rs index 0049fa9c118..8f881fba90f 100644 --- a/beacon_node/network/src/sync/range_sync/mod.rs +++ b/beacon_node/network/src/sync/range_sync/mod.rs @@ -2,7 +2,6 @@ //! peers. mod batch; -mod block_storage; mod chain; mod chain_collection; mod range; diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 0e9166c95cd..78679403bb4 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -39,7 +39,6 @@ //! Each chain is downloaded in batches of blocks. The batched blocks are processed sequentially //! and further batches are requested as current blocks are being processed. -use super::block_storage::BlockStorage; use super::chain::{BatchId, ChainId, RemoveChain, SyncingChain}; use super::chain_collection::{ChainCollection, SyncChainStatus}; use super::sync_type::RangeSyncType; @@ -64,27 +63,26 @@ const FAILED_CHAINS_EXPIRY_SECONDS: u64 = 30; /// The primary object dealing with long range/batch syncing. This contains all the active and /// non-active chains that need to be processed before the syncing is considered complete. This /// holds the current state of the long range sync. -pub struct RangeSync> { +pub struct RangeSync { /// The beacon chain for processing. - beacon_chain: Arc, + beacon_chain: Arc>, /// Last known sync info of our useful connected peers. We use this information to create Head /// chains after all finalized chains have ended. awaiting_head_peers: HashMap, /// A collection of chains that need to be downloaded. This stores any head or finalized chains /// that need to be downloaded. - chains: ChainCollection, + chains: ChainCollection, /// Chains that have failed and are stored to prevent being retried. failed_chains: LRUTimeCache, /// The syncing logger. log: slog::Logger, } -impl RangeSync +impl RangeSync where - C: BlockStorage + ToStatusMessage, T: BeaconChainTypes, { - pub fn new(beacon_chain: Arc, log: slog::Logger) -> Self { + pub fn new(beacon_chain: Arc>, log: slog::Logger) -> Self { RangeSync { beacon_chain: beacon_chain.clone(), chains: ChainCollection::new(beacon_chain, log.clone()), diff --git a/beacon_node/network/src/sync/range_sync/sync_type.rs b/beacon_node/network/src/sync/range_sync/sync_type.rs index d6ffd4a5dfb..4ff7e393101 100644 --- a/beacon_node/network/src/sync/range_sync/sync_type.rs +++ b/beacon_node/network/src/sync/range_sync/sync_type.rs @@ -1,10 +1,9 @@ //! Contains logic about identifying which Sync to perform given PeerSyncInfo of ourselves and //! of a remote. +use beacon_chain::{BeaconChain, BeaconChainTypes}; use lighthouse_network::SyncInfo; -use super::block_storage::BlockStorage; - /// The type of Range sync that should be done relative to our current state. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum RangeSyncType { @@ -17,8 +16,8 @@ pub enum RangeSyncType { impl RangeSyncType { /// Determines the type of sync given our local `PeerSyncInfo` and the remote's /// `PeerSyncInfo`. - pub fn new( - chain: &C, + pub fn new( + chain: &BeaconChain, local_info: &SyncInfo, remote_info: &SyncInfo, ) -> RangeSyncType { @@ -29,7 +28,7 @@ impl RangeSyncType { // not seen the finalized hash before. if remote_info.finalized_epoch > local_info.finalized_epoch - && !chain.is_block_known(&remote_info.finalized_root) + && !chain.block_is_known_to_fork_choice(&remote_info.finalized_root) { RangeSyncType::Finalized } else { From 28da07bdd5c427b6ec920fe4a1e585f1a55e46a5 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Sat, 14 Dec 2024 20:31:24 +0800 Subject: [PATCH 4/6] Add reference to test author --- beacon_node/network/src/sync/tests/range.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/beacon_node/network/src/sync/tests/range.rs b/beacon_node/network/src/sync/tests/range.rs index 02f281ab8a6..f8dee9c8149 100644 --- a/beacon_node/network/src/sync/tests/range.rs +++ b/beacon_node/network/src/sync/tests/range.rs @@ -188,6 +188,7 @@ impl TestRig { #[test] fn head_chain_removed_while_finalized_syncing() { // NOTE: this is a regression test. + // Added in PR https://github.com/sigp/lighthouse/pull/2821 let mut rig = TestRig::test_setup(); // Get a peer with an advanced head @@ -212,6 +213,7 @@ fn head_chain_removed_while_finalized_syncing() { #[test] fn state_update_while_purging() { // NOTE: this is a regression test. + // Added in PR https://github.com/sigp/lighthouse/pull/2827 let mut rig = TestRig::test_setup(); // Create blocks on a separate harness From 2ac8e035355fb52f30c6df119a382af41552bfdd Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Sat, 14 Dec 2024 20:36:51 +0800 Subject: [PATCH 5/6] Use async await --- beacon_node/network/src/sync/tests/range.rs | 42 ++++++++++----------- 1 file changed, 19 insertions(+), 23 deletions(-) diff --git a/beacon_node/network/src/sync/tests/range.rs b/beacon_node/network/src/sync/tests/range.rs index f8dee9c8149..6faa8b72472 100644 --- a/beacon_node/network/src/sync/tests/range.rs +++ b/beacon_node/network/src/sync/tests/range.rs @@ -154,17 +154,17 @@ impl TestRig { } } - fn create_canonical_block(&mut self) -> SignedBeaconBlock { + async fn create_canonical_block(&mut self) -> SignedBeaconBlock { self.harness.advance_slot(); - let block_root = - tokio::runtime::Runtime::new() - .unwrap() - .block_on(self.harness.extend_chain( - 1, - BlockStrategy::OnCanonicalHead, - AttestationStrategy::AllValidators, - )); + let block_root = self + .harness + .extend_chain( + 1, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; self.harness .chain .store @@ -173,14 +173,10 @@ impl TestRig { .unwrap() } - fn remember_block(&mut self, block: SignedBeaconBlock) { - tokio::runtime::Runtime::new() - .unwrap() - .block_on(self.harness.process_block( - block.slot(), - block.canonical_root(), - (block.into(), None), - )) + async fn remember_block(&mut self, block: SignedBeaconBlock) { + self.harness + .process_block(block.slot(), block.canonical_root(), (block.into(), None)) + .await .unwrap(); } } @@ -210,8 +206,8 @@ fn head_chain_removed_while_finalized_syncing() { rig.assert_state(RangeSyncType::Finalized); } -#[test] -fn state_update_while_purging() { +#[tokio::test] +async fn state_update_while_purging() { // NOTE: this is a regression test. // Added in PR https://github.com/sigp/lighthouse/pull/2827 let mut rig = TestRig::test_setup(); @@ -220,9 +216,9 @@ fn state_update_while_purging() { let mut rig_2 = TestRig::test_setup(); // Need to create blocks that can be inserted into the fork-choice and fit the "known // conditions" below. - let head_peer_block = rig_2.create_canonical_block(); + let head_peer_block = rig_2.create_canonical_block().await; let head_peer_root = head_peer_block.canonical_root(); - let finalized_peer_block = rig_2.create_canonical_block(); + let finalized_peer_block = rig_2.create_canonical_block().await; let finalized_peer_root = finalized_peer_block.canonical_root(); // Get a peer with an advanced head @@ -240,8 +236,8 @@ fn state_update_while_purging() { let _ = rig.find_blocks_by_range_request(&finalized_peer); // Now the chain knows both chains target roots. - rig.remember_block(head_peer_block); - rig.remember_block(finalized_peer_block); + rig.remember_block(head_peer_block).await; + rig.remember_block(finalized_peer_block).await; // Add an additional peer to the second chain to make range update it's status rig.add_finalized_peer(); From ca9aaf4314e6d873da1886a7ce3f6b5bec4648da Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Mon, 16 Dec 2024 13:17:12 +1100 Subject: [PATCH 6/6] Fix failing test. Not sure how it was passing before without an EL. --- beacon_node/network/src/sync/tests/lookups.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index c7c1a854474..94aacad3e81 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -83,6 +83,7 @@ impl TestRig { .logger(log.clone()) .deterministic_keypairs(1) .fresh_ephemeral_store() + .mock_execution_layer() .testing_slot_clock(TestingSlotClock::new( Slot::new(0), Duration::from_secs(0),