Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transition block lookup sync to range sync #6122

Merged
merged 5 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2790,12 +2790,12 @@ pub fn build_log(level: slog::Level, logger_type: LoggerType) -> Logger {
match logger_type {
LoggerType::Test => {
let drain = FullFormat::new(TermDecorator::new().build()).build().fuse();
let drain = Async::new(drain).build().fuse();
let drain = Async::new(drain).chan_size(10_000).build().fuse();
Logger::root(drain.filter_level(level).fuse(), o!())
}
LoggerType::CI => {
let drain = FullFormat::new(ci_decorator()).build().fuse();
let drain = Async::new(drain).build().fuse();
let drain = Async::new(drain).chan_size(10_000).build().fuse();
Logger::root(drain.filter_level(level).fuse(), o!())
}
LoggerType::Null => {
Expand Down
5 changes: 3 additions & 2 deletions beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::sync::Arc;
use std::time::Duration;
use store::MemoryStore;
use task_executor::TaskExecutor;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::mpsc::{self, error::TrySendError};
use types::*;

Expand Down Expand Up @@ -831,7 +832,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/// Send a message to `sync_tx`.
///
/// Creates a log if there is an internal error.
fn send_sync_message(&self, message: SyncMessage<T::EthSpec>) {
pub(crate) fn send_sync_message(&self, message: SyncMessage<T::EthSpec>) {
self.sync_tx.send(message).unwrap_or_else(|e| {
debug!(self.log, "Could not send message to the sync service";
"error" => %e)
Expand Down Expand Up @@ -859,6 +860,7 @@ impl<E: EthSpec> NetworkBeaconProcessor<TestBeaconChainType<E>> {
// processor (but not much else).
pub fn null_for_testing(
network_globals: Arc<NetworkGlobals<E>>,
sync_tx: UnboundedSender<SyncMessage<E>>,
chain: Arc<BeaconChain<TestBeaconChainType<E>>>,
executor: TaskExecutor,
log: Logger,
Expand All @@ -871,7 +873,6 @@ impl<E: EthSpec> NetworkBeaconProcessor<TestBeaconChainType<E>> {
} = <_>::default();

let (network_tx, _network_rx) = mpsc::unbounded_channel();
let (sync_tx, _sync_rx) = mpsc::unbounded_channel();

let network_beacon_processor = Self {
beacon_processor_send: beacon_processor_tx,
Expand Down
67 changes: 54 additions & 13 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use super::network_context::{PeerGroup, RpcResponseError, SyncNetworkContext};
use crate::metrics;
use crate::sync::block_lookups::common::ResponseType;
use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor;
use crate::sync::SyncMessage;
use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::data_availability_checker::{
AvailabilityCheckError, AvailabilityCheckErrorCategory,
Expand Down Expand Up @@ -55,7 +56,10 @@ mod tests;
/// The maximum depth we will search for a parent block. In principle we should have sync'd any
/// canonical chain to its head once the peer connects. A chain should not appear where it's depth
/// is further back than the most recent head slot.
pub(crate) const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE * 2;
///
/// Have the same value as range's sync tolerance to consider a peer synced. Once sync lookup
/// reaches the maximum depth it will force trigger range sync.
pub(crate) const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE;

const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60;
pub const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 4;
Expand Down Expand Up @@ -254,22 +258,59 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// blocks on top of A forming A -> C. The malicious peer forces us to fetch C
// from it, which will result in parent A hitting the chain_too_long error. Then
// the valid chain A -> B is dropped too.
if let Ok(block_to_drop) = find_oldest_fork_ancestor(parent_chains, chain_idx) {
// Drop all lookups descending from the child of the too long parent chain
if let Some((lookup_id, lookup)) = self
//
// `find_oldest_fork_ancestor` should never return Err, unwrapping to tip for
// complete-ness
let parent_chain_tip = parent_chain.tip;
let block_to_drop =
find_oldest_fork_ancestor(parent_chains, chain_idx).unwrap_or(parent_chain_tip);
// Drop all lookups descending from the child of the too long parent chain
if let Some((lookup_id, lookup)) = self
.single_block_lookups
.iter()
.find(|(_, l)| l.block_root() == block_to_drop)
{
// If a lookup chain is too long, we can't distinguish a valid chain from a
// malicious one. We must attempt to sync this chain to not lose liveness. If
// the chain grows too long, we stop lookup sync and transition this head to
// forward range sync. We need to tell range sync which head to sync to, and
// from which peers. The lookup of the very tip of this chain may contain zero
// peers if it's the parent-child lookup. So we do a bit of a trick here:
// - Tell range sync to sync to the tip's root (if available, else its ancestor)
// - But use all peers in the ancestor lookup, which should have at least one
// peer, and its peer set is a strict superset of the tip's lookup.
if let Some((_, tip_lookup)) = self
.single_block_lookups
.iter()
.find(|(_, l)| l.block_root() == block_to_drop)
.find(|(_, l)| l.block_root() == parent_chain_tip)
{
for &peer_id in lookup.all_peers() {
cx.report_peer(
peer_id,
PeerAction::LowToleranceError,
"chain_too_long",
);
}
self.drop_lookup_and_children(*lookup_id);
cx.send_sync_message(SyncMessage::AddPeersForceRangeSync {
peers: lookup.all_peers().copied().collect(),
head_slot: tip_lookup.peek_downloaded_block_slot(),
head_root: parent_chain_tip,
});
} else {
// Should never happen, log error and continue the lookup drop
error!(self.log, "Unable to transition lookup to range sync";
"error" => "Parent chain tip lookup not found",
"block_root" => ?parent_chain_tip
);
}

// Do not downscore peers here. Because we can't distinguish a valid chain from
// a malicious one we may penalize honest peers for attempting to discover us a
// valid chain. Until blocks_by_range allows to specify a tip, for example with
// https://github.com/ethereum/consensus-specs/pull/3845 we will have poor
// attributability. A peer can send us garbage blocks over blocks_by_root, and
// then correct blocks via blocks_by_range.

self.drop_lookup_and_children(*lookup_id);
} else {
// Should never happen
error!(self.log, "Unable to transition lookup to range sync";
"error" => "Block to drop lookup not found",
"block_root" => ?block_to_drop
);
}

return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::time::{Duration, Instant};
use store::Hash256;
use strum::IntoStaticStr;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock};
use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock, Slot};

// Dedicated enum for LookupResult to force its usage
#[must_use = "LookupResult must be handled with on_lookup_result"]
Expand Down Expand Up @@ -91,6 +91,14 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
}
}

/// Return the slot of this lookup's block if it's currently cached as `AwaitingProcessing`
pub fn peek_downloaded_block_slot(&self) -> Option<Slot> {
self.block_request_state
.state
.peek_downloaded_data()
.map(|block| block.slot())
}

/// Get the block root that is being requested.
pub fn block_root(&self) -> Hash256 {
self.block_root
Expand Down
48 changes: 43 additions & 5 deletions beacon_node/network/src/sync/block_lookups/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::network_beacon_processor::NetworkBeaconProcessor;
use crate::sync::manager::{BlockProcessType, SyncManager};
use crate::sync::peer_sampling::SamplingConfig;
use crate::sync::range_sync::RangeSyncType;
use crate::sync::{SamplingId, SyncMessage};
use crate::NetworkMessage;
use std::sync::Arc;
Expand Down Expand Up @@ -78,6 +79,8 @@ struct TestRig {
network_rx: mpsc::UnboundedReceiver<NetworkMessage<E>>,
/// Stores all `NetworkMessage`s received from `network_recv`. (e.g. outgoing RPC requests)
network_rx_queue: Vec<NetworkMessage<E>>,
/// Receiver for `SyncMessage` from the network
sync_rx: mpsc::UnboundedReceiver<SyncMessage<E>>,
/// To send `SyncMessage`. For sending RPC responses or block processing results to sync.
sync_manager: SyncManager<T>,
/// To manipulate sync state and peer connection status
Expand Down Expand Up @@ -137,6 +140,7 @@ impl TestRig {
let chain = harness.chain.clone();

let (network_tx, network_rx) = mpsc::unbounded_channel();
let (sync_tx, sync_rx) = mpsc::unbounded_channel::<SyncMessage<E>>();
// TODO(das): make the generation of the ENR use the deterministic rng to have consistent
// column assignments
let network_config = Arc::new(NetworkConfig::default());
Expand All @@ -148,13 +152,12 @@ impl TestRig {
));
let (beacon_processor, beacon_processor_rx) = NetworkBeaconProcessor::null_for_testing(
globals,
sync_tx,
chain.clone(),
harness.runtime.task_executor.clone(),
log.clone(),
);

let (_sync_send, sync_recv) = mpsc::unbounded_channel::<SyncMessage<E>>();

let fork_name = chain.spec.fork_name_at_slot::<E>(chain.slot().unwrap());

// All current tests expect synced and EL online state
Expand All @@ -168,13 +171,15 @@ impl TestRig {
beacon_processor_rx_queue: vec![],
network_rx,
network_rx_queue: vec![],
sync_rx,
rng,
network_globals: beacon_processor.network_globals.clone(),
sync_manager: SyncManager::new(
chain,
network_tx,
beacon_processor.into(),
sync_recv,
// Pass empty recv not tied to any tx
mpsc::unbounded_channel().1,
SamplingConfig::Custom {
required_successes: vec![SAMPLING_REQUIRED_SUCCESSES],
},
Expand Down Expand Up @@ -237,6 +242,13 @@ impl TestRig {
self.send_sync_message(SyncMessage::SampleBlock(block_root, block_slot))
}

/// Drain all sync messages in the sync_rx attached to the beacon processor
fn drain_sync_rx(&mut self) {
while let Ok(sync_message) = self.sync_rx.try_recv() {
self.send_sync_message(sync_message);
}
}

fn rand_block(&mut self) -> SignedBeaconBlock<E> {
self.rand_block_and_blobs(NumBlobs::None).0
}
Expand Down Expand Up @@ -293,6 +305,10 @@ impl TestRig {
self.sync_manager.active_parent_lookups().len()
}

fn active_range_sync_chain(&self) -> (RangeSyncType, Slot, Slot) {
self.sync_manager.get_range_sync_chains().unwrap().unwrap()
}

fn assert_single_lookups_count(&self, count: usize) {
assert_eq!(
self.active_single_lookups_count(),
Expand Down Expand Up @@ -1696,7 +1712,18 @@ fn test_parent_lookup_too_deep_grow_ancestor() {
)
}

rig.expect_penalty(peer_id, "chain_too_long");
// Should create a new syncing chain
rig.drain_sync_rx();
assert_eq!(
rig.active_range_sync_chain(),
(
RangeSyncType::Head,
Slot::new(0),
Slot::new(PARENT_DEPTH_TOLERANCE as u64 - 1)
)
);
// Should not penalize peer, but network is not clear because of the blocks_by_range requests
rig.expect_no_penalty_for(peer_id);
rig.assert_failed_chain(chain_hash);
}

Expand All @@ -1723,7 +1750,18 @@ fn test_parent_lookup_too_deep_grow_tip() {
);
}

rig.expect_penalty(peer_id, "chain_too_long");
// Should create a new syncing chain
rig.drain_sync_rx();
assert_eq!(
rig.active_range_sync_chain(),
(
RangeSyncType::Head,
Slot::new(0),
Slot::new(PARENT_DEPTH_TOLERANCE as u64 - 2)
)
);
// Should not penalize peer, but network is not clear because of the blocks_by_range requests
rig.expect_no_penalty_for(peer_id);
rig.assert_failed_chain(tip.canonical_root());
}

Expand Down
Loading
Loading