Skip to content

Commit

Permalink
feat: resume sync attempt (#212)
Browse files Browse the repository at this point in the history
Description
---
attempt to resume sync with another peer after failing 1 peer.
  • Loading branch information
SWvheerden authored Dec 6, 2024
1 parent 66b251a commit 17bbedf
Showing 1 changed file with 51 additions and 27 deletions.
78 changes: 51 additions & 27 deletions src/server/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,8 @@ where S: ShareChain
stats_broadcast_client: StatsBroadcastClient,
randomx_sync_semaphore: Arc<Semaphore>,
sha3x_sync_semaphore: Arc<Semaphore>,
randomx_last_sync_requested_block: Option<(u64, FixedHash)>,
sha3x_last_sync_requested_block: Option<(u64, FixedHash)>,
randomx_in_progress_syncs: HashMap<PeerId, (OutboundRequestId, OwnedSemaphorePermit)>,
sha3x_in_progress_syncs: HashMap<PeerId, (OutboundRequestId, OwnedSemaphorePermit)>,
}
Expand Down Expand Up @@ -357,6 +359,8 @@ where S: ShareChain
stats_broadcast_client,
randomx_sync_semaphore: Arc::new(Semaphore::new(config.p2p_service.num_concurrent_syncs)),
sha3x_sync_semaphore: Arc::new(Semaphore::new(config.p2p_service.num_concurrent_syncs)),
randomx_last_sync_requested_block: None,
sha3x_last_sync_requested_block: None,
randomx_in_progress_syncs: HashMap::new(),
sha3x_in_progress_syncs: HashMap::new(),
})
Expand Down Expand Up @@ -1042,7 +1046,7 @@ where S: ShareChain

// If it's not from new_block_notify, ask only the peer that sent the blocks
if !is_from_new_block_notify {
info!(target: SYNC_REQUEST_LOG_TARGET, squad = &self.config.squad; "Sending sync request direct to peer {} for blocks {:?} because we did not receive it from new tip notify", peer, missing_parents.iter().map(|(height, hash)|format!("{}({:x}{:x}{:x}{:x})",height, hash[0], hash[1], hash[2], hash[3])).collect::<Vec<String>>());
info!(target: SYNC_REQUEST_LOG_TARGET, squad = &self.config.squad; "Sending sync request direct to peer {} for blocks {:?} because we did not receive it from sync", peer, missing_parents.iter().map(|(height, hash)|format!("{}({:x}{:x}{:x}{:x})",height, hash[0], hash[1], hash[2], hash[3])).collect::<Vec<String>>());

let _outbound_id = self
.swarm
Expand Down Expand Up @@ -1520,8 +1524,14 @@ where S: ShareChain
let timer = Instant::now();
let algo = response.algo();
let share_chain = match algo {
PowAlgorithm::RandomX => self.share_chain_random_x.clone(),
PowAlgorithm::Sha3x => self.share_chain_sha3x.clone(),
PowAlgorithm::RandomX => {
self.randomx_last_sync_requested_block = None;
self.share_chain_random_x.clone()
},
PowAlgorithm::Sha3x => {
self.sha3x_last_sync_requested_block = None;
self.share_chain_sha3x.clone()
},
};
let their_tip_hash = *response.tip_hash();
let their_height = response.tip_height();
Expand Down Expand Up @@ -1670,16 +1680,18 @@ where S: ShareChain

// First check if we have a sync in progress for this peer.

let (share_chain, semaphore, in_progress_syncs) = match algo {
let (share_chain, semaphore, in_progress_syncs, last_progress) = match algo {
PowAlgorithm::RandomX => (
self.share_chain_random_x.clone(),
self.randomx_sync_semaphore.clone(),
&mut self.randomx_in_progress_syncs,
self.randomx_last_sync_requested_block.take(),
),
PowAlgorithm::Sha3x => (
self.share_chain_sha3x.clone(),
self.sha3x_sync_semaphore.clone(),
&mut self.sha3x_in_progress_syncs,
self.sha3x_last_sync_requested_block.take(),
),
};

Expand All @@ -1688,11 +1700,6 @@ where S: ShareChain
return Ok(());
}

// if !self.network_peer_store.is_whitelisted(&peer) {
// warn!(target: SYNC_REQUEST_LOG_TARGET, "Peer is not whitelisted, will not try to catch up sync");
// return Ok(());
// }

if permit.is_none() {
permit = match semaphore.try_acquire_owned() {
Ok(permit) => Some(permit),
Expand All @@ -1705,30 +1712,47 @@ where S: ShareChain

let permit = permit.unwrap();

// Only allow follow on catch up syncs if we've tried to sync from them recently
// if last_block_from_them.is_none() &&
// self.network_peer_store
// .time_since_last_sync_attempt(&peer, algo)
// .map(|d| d < CATCHUP_SYNC_TIMEOUT)
// .unwrap_or(false)
// {
// warn!(target: SYNC_REQUEST_LOG_TARGET, "Already in progress with sync from {}", peer);
// return Ok(());
// }

// let tx = self.inner_request_tx.clone();
let i_have_blocks = if last_block_from_them.is_none() {
share_chain
.create_catchup_sync_blocks(CATCH_UP_SYNC_BLOCKS_IN_I_HAVE)
.await
} else {
vec![]
let (i_have_blocks, last_block_from_them) = match (last_block_from_them, last_progress) {
(None, Some(last_progress)) => {
// this is most likely a new catchup sync request, while the previous attempt failed so we ask with both
// I have blocks and last block
let i_have = share_chain
.create_catchup_sync_blocks(CATCH_UP_SYNC_BLOCKS_IN_I_HAVE)
.await;
(i_have, Some(last_progress))
},
(None, None) => {
// new catchup sync request, no previous attempt
let i_have = share_chain
.create_catchup_sync_blocks(CATCH_UP_SYNC_BLOCKS_IN_I_HAVE)
.await;
(i_have, None)
},
(Some(last_block_from_them), None) => {
// This is a continuation of catchup sync, we dont need to return any I have blocks
(vec![], Some(last_block_from_them))
},
(Some(last_block_from_them), Some(_last_progress)) => {
// This should not happen, but as a fail safe we will return the last block we requested not
// last_progress
(vec![], Some(last_block_from_them))
},
};

info!(target: SYNC_REQUEST_LOG_TARGET, "[{:?}] Sending catch up sync to {} for blocks {}, last block received {}. Their height:{}",
algo,
peer, i_have_blocks.iter().map(|a| a.0.to_string()).join(", "), last_block_from_them.map(|a| a.0.to_string()).unwrap_or_else(|| "None".to_string()), their_height);

// lets save the last attempted sync last block
match algo {
PowAlgorithm::RandomX => {
self.randomx_last_sync_requested_block = last_block_from_them;
},
PowAlgorithm::Sha3x => {
self.sha3x_last_sync_requested_block = last_block_from_them;
},
}

let outbound_request_id = self.swarm.behaviour_mut().catch_up_sync.send_request(
&peer,
CatchUpSyncRequest::new(algo, i_have_blocks, last_block_from_them),
Expand Down

0 comments on commit 17bbedf

Please sign in to comment.