Skip to content

Commit

Permalink
fix: limit in progress catch up syncs to 2 (#158)
Browse files Browse the repository at this point in the history
  • Loading branch information
stringhandler authored Nov 14, 2024
1 parent d7c2196 commit 6bfdb5d
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 163 deletions.
2 changes: 1 addition & 1 deletion src/server/p2p/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl CatchUpSyncRequest {

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct CatchUpSyncResponse {
version: u64,
pub version: u64,
peer_id: PeerId,
algo: u64,
blocks: Vec<P2Block>,
Expand Down
215 changes: 150 additions & 65 deletions src/server/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use libp2p::{
noise,
ping,
relay,
request_response::{self, cbor, OutboundFailure, ResponseChannel},
request_response::{self, cbor, OutboundFailure, OutboundRequestId, ResponseChannel},
swarm::{
behaviour::toggle::Toggle,
dial_opts::{DialOpts, PeerCondition},
Expand Down Expand Up @@ -74,7 +74,9 @@ use tokio::{
broadcast::{self, error::RecvError},
mpsc::{self, Sender},
oneshot,
OwnedSemaphorePermit,
RwLock,
Semaphore,
},
time::MissedTickBehavior,
};
Expand Down Expand Up @@ -185,6 +187,7 @@ pub(crate) struct Config {
pub peer_list_folder: PathBuf,
pub sha3x_enabled: bool,
pub randomx_enabled: bool,
pub num_concurrent_syncs: usize,
}

impl Default for Config {
Expand All @@ -210,6 +213,7 @@ impl Default for Config {
sync_job_enabled: true,
sha3x_enabled: true,
randomx_enabled: true,
num_concurrent_syncs: 2,
}
}
}
Expand Down Expand Up @@ -318,7 +322,6 @@ enum InnerRequest {
ResetCatchUpAttempts(PeerId),
CatchUpSyncRequest((ResponseChannel<CatchUpSyncResponse>, CatchUpSync)),
PerformCatchUpSync(PerformCatchUpSync),
SendCatchUpSyncRequest(SendCatchUpSyncRequest),
}

/// Service is the implementation that holds every peer-to-peer related logic
Expand All @@ -343,10 +346,13 @@ where S: ShareChain
inner_request_tx: mpsc::Sender<InnerRequest>,
inner_request_rx: mpsc::Receiver<InnerRequest>,

start_time: Instant,
relay_store: Arc<RwLock<RelayStore>>,
are_we_synced_with_p2pool: Arc<AtomicBool>,
stats_broadcast_client: StatsBroadcastClient,
randomx_sync_semaphore: Arc<Semaphore>,
sha3x_sync_semaphore: Arc<Semaphore>,
randomx_in_progress_syncs: HashMap<PeerId, (OutboundRequestId, OwnedSemaphorePermit)>,
sha3x_in_progress_syncs: HashMap<PeerId, (OutboundRequestId, OwnedSemaphorePermit)>,
}

impl<S> Service<S>
Expand Down Expand Up @@ -387,9 +393,12 @@ where S: ShareChain
query_tx,
query_rx,
relay_store: Arc::new(RwLock::new(RelayStore::default())),
start_time: Instant::now(),
are_we_synced_with_p2pool,
stats_broadcast_client,
randomx_sync_semaphore: Arc::new(Semaphore::new(config.p2p_service.num_concurrent_syncs)),
sha3x_sync_semaphore: Arc::new(Semaphore::new(config.p2p_service.num_concurrent_syncs)),
randomx_in_progress_syncs: HashMap::new(),
sha3x_in_progress_syncs: HashMap::new(),
})
}

Expand Down Expand Up @@ -1233,16 +1242,69 @@ where S: ShareChain
} => {
self.handle_catch_up_sync_request(channel, request).await;
},
request_response::Message::Response {
request_id: _request_id,
response,
} => {
request_response::Message::Response { request_id, response } => {
let should_remove: Vec<PeerId> = self
.randomx_in_progress_syncs
.iter()
.filter_map(|(peer, r)| if r.0 == request_id { Some(peer.clone()) } else { None })
.collect();
for peer in should_remove {
if let Some((r, permit)) = self.randomx_in_progress_syncs.remove(&peer) {
// Probably don't need to do this
drop(permit);
}
}

let should_remove: Vec<PeerId> = self
.sha3x_in_progress_syncs
.iter()
.filter_map(|(peer, r)| if r.0 == request_id { Some(peer.clone()) } else { None })
.collect();
for peer in should_remove {
if let Some((r, permit)) = self.sha3x_in_progress_syncs.remove(&peer) {
// Probably don't need to do this
drop(permit);
}
}

self.handle_catch_up_sync_response(response).await;
},
},
request_response::Event::OutboundFailure { peer, error, .. } => {
request_response::Event::OutboundFailure {
peer,
error,
request_id,
} => {
// Peers can be offline
debug!(target: LOG_TARGET, squad = &self.config.squad; "REQ-RES outbound failure: {peer:?} -> {error:?}");

let should_remove = self
.randomx_in_progress_syncs
.get(&peer)
.map(|r| r.0 == request_id)
.unwrap_or(false);
if should_remove {
if let Some((r, permit)) = self.randomx_in_progress_syncs.remove(&peer) {
// Probably don't need to do this
info!(target: SYNC_REQUEST_LOG_TARGET, "Removing randomx_in_progress_syncs: {peer:?} -> {error:?}");
drop(permit);
}
}

let should_remove = self
.sha3x_in_progress_syncs
.get(&peer)
.map(|r| r.0 == request_id)
.unwrap_or(false);
if should_remove {
if let Some((r, permit)) = self.sha3x_in_progress_syncs.remove(&peer) {
info!(target: SYNC_REQUEST_LOG_TARGET, "Removing sha3x_in_progress_syncs: {peer:?} -> {error:?}");

// Probably don't need to do this
drop(permit);
}
}

let mut should_grey_list = true;
match error {
OutboundFailure::DialFailure => {
Expand Down Expand Up @@ -1354,6 +1416,10 @@ where S: ShareChain

async fn handle_catch_up_sync_response(&mut self, response: CatchUpSyncResponse) {
debug!(target: MESSAGE_LOGGING_LOG_TARGET, "Catch up sync response: {response:?}");
if response.version != PROTOCOL_VERSION {
trace!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} has an outdated version, skipping", response.peer_id());
return;
}
let peer = *response.peer_id();

let timer = Instant::now();
Expand Down Expand Up @@ -1493,64 +1559,84 @@ where S: ShareChain
last_block_from_them,
their_height,
} = perform_catch_up_sync;
let share_chain = match algo {
PowAlgorithm::RandomX => self.share_chain_random_x.clone(),
PowAlgorithm::Sha3x => self.share_chain_sha3x.clone(),

// First check if we have a sync in progress for this peer.

let (share_chain, semaphore, in_progress_syncs) = match algo {
PowAlgorithm::RandomX => (
self.share_chain_random_x.clone(),
self.randomx_sync_semaphore.clone(),
&mut self.randomx_in_progress_syncs,
),
PowAlgorithm::Sha3x => (
self.share_chain_sha3x.clone(),
self.sha3x_sync_semaphore.clone(),
&mut self.sha3x_in_progress_syncs,
),
};

if !self.network_peer_store.is_whitelisted(&peer) {
warn!(target: SYNC_REQUEST_LOG_TARGET, "Peer is not whitelisted, will not try to catch up sync");
if in_progress_syncs.contains_key(&peer) {
warn!(target: SYNC_REQUEST_LOG_TARGET, "Already in progress with sync from {}", peer);
return Ok(());
}

// if !self.network_peer_store.is_whitelisted(&peer) {
// warn!(target: SYNC_REQUEST_LOG_TARGET, "Peer is not whitelisted, will not try to catch up sync");
// return Ok(());
// }

let permit = match semaphore.try_acquire_owned() {
Ok(permit) => permit,
Err(_) => {
warn!(target: SYNC_REQUEST_LOG_TARGET, "Could not acquire semaphore for catch up sync");
return Ok(());
},
};

// Only allow follow on catch up syncs if we've tried to sync from them recently
if last_block_from_them.is_none() &&
self.network_peer_store
.time_since_last_sync_attempt(&peer, algo)
.map(|d| d < CATCHUP_SYNC_TIMEOUT)
.unwrap_or(false)
{
warn!(target: SYNC_REQUEST_LOG_TARGET, "Already in progress with sync from {}", peer);
return Ok(());
}
// if last_block_from_them.is_none() &&
// self.network_peer_store
// .time_since_last_sync_attempt(&peer, algo)
// .map(|d| d < CATCHUP_SYNC_TIMEOUT)
// .unwrap_or(false)
// {
// warn!(target: SYNC_REQUEST_LOG_TARGET, "Already in progress with sync from {}", peer);
// return Ok(());
// }

let tx = self.inner_request_tx.clone();
tokio::spawn(async move {
let mut i_have_blocks = Vec::with_capacity(CATCH_UP_SYNC_BLOCKS_IN_I_HAVE);
if let Ok(Some(tip)) = share_chain.get_tip().await {
let tip_height = tip.0;
let tip_hash = tip.1;
let mut height = tip_height;
let mut hash = tip_hash;
for _ in 0..CATCH_UP_SYNC_BLOCKS_IN_I_HAVE {
if let Ok(blocks) = share_chain.get_blocks(&[(height, hash)]).await {
if let Some(block) = blocks.first() {
i_have_blocks.push((height, block.hash));
if height == 0 {
break;
}
height = block.height - 1;
hash = block.hash;
} else {
let mut i_have_blocks = Vec::with_capacity(CATCH_UP_SYNC_BLOCKS_IN_I_HAVE);
if let Ok(Some(tip)) = share_chain.get_tip().await {
let tip_height = tip.0;
let tip_hash = tip.1;
let mut height = tip_height;
let mut hash = tip_hash;
for _ in 0..CATCH_UP_SYNC_BLOCKS_IN_I_HAVE {
if let Ok(blocks) = share_chain.get_blocks(&[(height, hash)]).await {
if let Some(block) = blocks.first() {
i_have_blocks.push((height, block.hash));
if height == 0 {
break;
}
height = block.height - 1;
hash = block.hash;
} else {
break;
}
}
}
}

info!(target: SYNC_REQUEST_LOG_TARGET, "[{:?}] Sending catch up sync to {} for blocks {}, last block received {}. Their height:{}",
info!(target: SYNC_REQUEST_LOG_TARGET, "[{:?}] Sending catch up sync to {} for blocks {}, last block received {}. Their height:{}",
algo,
peer, i_have_blocks.iter().map(|a| a.0.to_string()).join(", "), last_block_from_them.map(|a| a.0.to_string()).unwrap_or_else(|| "None".to_string()), their_height);
let send_catch_up_sync_request = SendCatchUpSyncRequest {
peer,
algo,
i_have_blocks,
last_block_received: last_block_from_them,
};

let _ = tx
.send(InnerRequest::SendCatchUpSyncRequest(send_catch_up_sync_request))
.await;
});
let outbound_request_id = self.swarm.behaviour_mut().catch_up_sync.send_request(
&peer,
CatchUpSyncRequest::new(algo, i_have_blocks, last_block_from_them),
);
in_progress_syncs.insert(peer, (outbound_request_id, permit));
self.network_peer_store.update_last_sync_attempt(peer, algo);
Ok(())
}

Expand Down Expand Up @@ -1767,19 +1853,6 @@ where S: ShareChain
error!(target: LOG_TARGET, squad = &self.config.squad; "Failed to perform catch up sync: {e:?}");
}
},
InnerRequest::SendCatchUpSyncRequest(send_catch_up_sync_request) => {
let SendCatchUpSyncRequest {
peer,
algo,
i_have_blocks,
last_block_received,
} = send_catch_up_sync_request;
self.swarm
.behaviour_mut()
.catch_up_sync
.send_request(&peer, CatchUpSyncRequest::new(algo, i_have_blocks, last_block_received));
self.network_peer_store.update_last_sync_attempt(peer, algo);
},
}
}

Expand Down Expand Up @@ -1990,16 +2063,27 @@ where S: ShareChain
}

for record in best_peers {
let (their_height, their_pow) = match algo {
let (their_height, their_pow, last_sync_attempt) = match algo {
PowAlgorithm::RandomX => (
record.peer_info.current_random_x_height,
AccumulatedDifficulty::from_u128(record.peer_info.current_random_x_pow).unwrap_or_default(),
record.last_rx_sync_attempt,
),
PowAlgorithm::Sha3x => (
record.peer_info.current_sha3x_height,
AccumulatedDifficulty::from_u128(record.peer_info.current_sha3x_pow).unwrap_or_default(),
record.last_sha3x_sync_attempt,
),
};

if last_sync_attempt
.map(|d| d.elapsed() < CATCHUP_SYNC_TIMEOUT)
.unwrap_or(false)
{
info!(target: LOG_TARGET, squad = &self.config.squad; "Already in progress with sync from {}", record.peer_id);
continue;
}

if their_pow > our_pow {
peer_with_better_pow = true;
info!(target: LOG_TARGET, squad = &self.config.squad; "[{:?}] Trying to perform catchup sync from peer: {} with height {}", algo,record.peer_id, their_height);
Expand All @@ -2010,6 +2094,7 @@ where S: ShareChain
last_block_from_them: None,
their_height,
};

let _ = self
.inner_request_tx
.send(InnerRequest::PerformCatchUpSync(perform_catch_up_sync))
Expand Down
31 changes: 14 additions & 17 deletions src/server/p2p/peer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ impl PeerStore {
match algo {
PowAlgorithm::RandomX => peers.sort_by(|a, b| {
b.peer_info
.current_random_x_height
.cmp(&a.peer_info.current_random_x_height)
.current_random_x_pow
.cmp(&a.peer_info.current_random_x_pow)
.then(
b.last_new_tip_notify
.as_ref()
Expand All @@ -266,21 +266,18 @@ impl PeerStore {
}),
PowAlgorithm::Sha3x => {
peers.sort_by(|a, b| {
b.peer_info
.current_sha3x_height
.cmp(&a.peer_info.current_sha3x_height)
.then(
b.last_new_tip_notify
.as_ref()
.map(|n| n.timestamp)
.unwrap_or(b.peer_info.timestamp)
.cmp(
&a.last_new_tip_notify
.as_ref()
.map(|n| n.timestamp)
.unwrap_or(a.peer_info.timestamp),
),
)
b.peer_info.current_sha3x_pow.cmp(&a.peer_info.current_sha3x_pow).then(
b.last_new_tip_notify
.as_ref()
.map(|n| n.timestamp)
.unwrap_or(b.peer_info.timestamp)
.cmp(
&a.last_new_tip_notify
.as_ref()
.map(|n| n.timestamp)
.unwrap_or(a.peer_info.timestamp),
),
)
});
},
}
Expand Down
Loading

0 comments on commit 6bfdb5d

Please sign in to comment.