diff --git a/sync/src/synchronizer/block_fetcher.rs b/sync/src/synchronizer/block_fetcher.rs index b4d87cb3f6..2fdebbeed4 100644 --- a/sync/src/synchronizer/block_fetcher.rs +++ b/sync/src/synchronizer/block_fetcher.rs @@ -9,6 +9,7 @@ use ckb_network::PeerIndex; use ckb_shared::block_status::BlockStatus; use ckb_shared::types::{HeaderIndex, HeaderIndexView}; use ckb_systemtime::unix_time_as_millis; +use ckb_types::core::BlockNumber; use ckb_types::packed; use ckb_types::BlockNumberAndHash; use std::cmp::min; @@ -91,7 +92,7 @@ impl BlockFetcher { Some(last_common) } - pub fn fetch(self) -> Option>> { + pub fn fetch(self, fetch_end: BlockNumber) -> Option>> { let _trace_timecost: Option = { ckb_metrics::handle().map(|handle| handle.ckb_sync_block_fetch_duration.start_timer()) }; @@ -186,7 +187,10 @@ impl BlockFetcher { IBDState::Out => last_common.number() + 1, } }; - let mut end = min(best_known.number(), start + BLOCK_DOWNLOAD_WINDOW); + let mut end = min( + fetch_end, + min(best_known.number(), start + BLOCK_DOWNLOAD_WINDOW), + ); let n_fetch = min( end.saturating_sub(start) as usize + 1, state.read_inflight_blocks().peer_can_fetch_count(self.peer), diff --git a/sync/src/synchronizer/mod.rs b/sync/src/synchronizer/mod.rs index 83aa1213c3..b156478fee 100644 --- a/sync/src/synchronizer/mod.rs +++ b/sync/src/synchronizer/mod.rs @@ -67,6 +67,7 @@ const NOT_IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(200); #[derive(Copy, Clone)] enum CanStart { + FetchToTarget(BlockNumber), Ready, MinWorkNotReach, AssumeValidNotFound, @@ -90,25 +91,34 @@ impl BlockFetchCMD { fn process_fetch_cmd(&mut self, cmd: FetchCMD) { let FetchCMD { peers, ibd_state }: FetchCMD = cmd; - match self.can_start() { - CanStart::Ready => { - for peer in peers { - if ckb_stop_handler::has_received_stop_signal() { - return; - } + let fetch_blocks_fn = |cmd: &mut BlockFetchCMD, assume_target: BlockNumber| { + for peer in peers { + if ckb_stop_handler::has_received_stop_signal() { + return; + } - if let Some(fetch) = - BlockFetcher::new(Arc::clone(&self.sync_shared), peer, ibd_state).fetch() - { - for item in fetch { - if ckb_stop_handler::has_received_stop_signal() { - return; - } - BlockFetchCMD::send_getblocks(item, &self.p2p_control, peer); + let mut fetch_end: BlockNumber = u64::MAX; + if assume_target != 0 { + fetch_end = assume_target + } + + if let Some(fetch) = + BlockFetcher::new(Arc::clone(&cmd.sync_shared), peer, ibd_state) + .fetch(fetch_end) + { + for item in fetch { + if ckb_stop_handler::has_received_stop_signal() { + return; } + BlockFetchCMD::send_getblocks(item, &cmd.p2p_control, peer); } } } + }; + + match self.can_start() { + CanStart::FetchToTarget(assume_target) => fetch_blocks_fn(self, assume_target), + CanStart::Ready => fetch_blocks_fn(self, BlockNumber::MAX), CanStart::MinWorkNotReach => { let best_known = self.sync_shared.state().shared_best_header_ref(); let number = best_known.number(); @@ -129,8 +139,9 @@ impl BlockFetchCMD { let best_known = state.shared_best_header_ref(); let number = best_known.number(); let assume_valid_target: Byte32 = shared - .assume_valid_target() + .assume_valid_targets() .as_ref() + .and_then(|targets| targets.first()) .map(Pack::pack) .expect("assume valid target must exist"); @@ -234,15 +245,28 @@ impl BlockFetchCMD { }; let assume_valid_target_find = |flag: &mut CanStart| { - let mut assume_valid_target = shared.assume_valid_target(); - if let Some(ref target) = *assume_valid_target { - match shared.header_map().get(&target.pack()) { + let mut assume_valid_targets = shared.assume_valid_targets(); + if let Some(ref targets) = *assume_valid_targets { + if targets.is_empty() { + assume_valid_targets.take(); + *flag = CanStart::Ready; + return; + } + let first_target = targets + .first() + .expect("has checked targets is not empty, assume valid target must exist"); + match shared.header_map().get(&first_target.pack()) { Some(header) => { - *flag = CanStart::Ready; - info!("assume valid target found in header_map; CKB will start fetch blocks now"); + if matches!(*flag, CanStart::FetchToTarget(fetch_target) if fetch_target == header.number()) + { + // BlockFetchCMD has set the fetch target, no need to set it again + } else { + *flag = CanStart::FetchToTarget(header.number()); + info!("assume valid target found in header_map; CKB will start fetch blocks to {:?} now", header.number_and_hash()); + } // Blocks that are no longer in the scope of ibd must be forced to verify if unix_time_as_millis().saturating_sub(header.timestamp()) < MAX_TIP_AGE { - assume_valid_target.take(); + assume_valid_targets.take(); warn!("the duration gap between 'assume valid target' and 'now' is less than 24h; CKB will ignore the specified assume valid target and do full verification from now on"); } } @@ -254,7 +278,7 @@ impl BlockFetchCMD { { warn!("the duration gap between 'shared_best_header' and 'now' is less than 24h, but CKB haven't found the assume valid target in header_map; CKB will ignore the specified assume valid target and do full verification from now on"); *flag = CanStart::Ready; - assume_valid_target.take(); + assume_valid_targets.take(); } } } @@ -264,6 +288,10 @@ impl BlockFetchCMD { }; match self.can_start { + CanStart::FetchToTarget(_) => { + assume_valid_target_find(&mut self.can_start); + self.can_start + } CanStart::Ready => self.can_start, CanStart::MinWorkNotReach => { min_work_reach(&mut self.can_start); @@ -453,7 +481,7 @@ impl Synchronizer { peer: PeerIndex, ibd: IBDState, ) -> Option>> { - BlockFetcher::new(Arc::clone(&self.shared), peer, ibd).fetch() + BlockFetcher::new(Arc::clone(&self.shared), peer, ibd).fetch(BlockNumber::MAX) } pub(crate) fn on_connected(&self, nc: &dyn CKBProtocolContext, peer: PeerIndex) {