Skip to content

Commit

Permalink
Merge branch 'development' of github.com:tari-project/sha-p2pool into…
Browse files Browse the repository at this point in the history
… development
  • Loading branch information
stringhandler committed Dec 17, 2024
2 parents 859caa8 + a95dd7b commit 9855ac1
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 61 deletions.
197 changes: 143 additions & 54 deletions src/server/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ use crate::{
},
sharechain::{
p2block::{P2Block, CURRENT_CHAIN_ID},
p2chain::ChainAddResult,
ShareChain,
},
};
Expand Down Expand Up @@ -215,6 +214,7 @@ struct PerformCatchUpSync {
pub peer: PeerId,
pub last_block_from_them: Option<(u64, FixedHash)>,
pub their_height: u64,
// pub their_pow: u128,
pub permit: Option<OwnedSemaphorePermit>,
}

Expand Down Expand Up @@ -287,6 +287,11 @@ pub(crate) struct ConnectionCounters {
enum InnerRequest {
DoSyncMissingBlocks(SyncMissingBlocks),
PerformCatchUpSync(PerformCatchUpSync),
AddSyncedBlock {
algo: PowAlgorithm,
block: Arc<P2Block>,
source_peer: PeerId,
},
}

/// Service is the implementation that holds every peer-to-peer related logic
Expand Down Expand Up @@ -928,6 +933,7 @@ where S: ShareChain
peer: peer_id,
last_block_from_them: None,
their_height: response.info.current_sha3x_height,
// their_pow: response.info.current_sha3x_pow,
permit: None,
};
let _unused = self
Expand All @@ -942,6 +948,7 @@ where S: ShareChain
peer: peer_id,
last_block_from_them: None,
their_height: response.info.current_random_x_height,
// their_pow: response.info.current_random_x_pow,
permit: None,
};
let _unused = self
Expand Down Expand Up @@ -1066,7 +1073,7 @@ where S: ShareChain
let SyncMissingBlocks {
peer,
algo,
missing_parents,
mut missing_parents,
is_from_new_block_notify,
depth,
} = sync_share_chain;
Expand All @@ -1087,6 +1094,22 @@ where S: ShareChain
return;
}

let share_chain = match algo {
PowAlgorithm::RandomX => self.share_chain_random_x.clone(),
PowAlgorithm::Sha3x => self.share_chain_sha3x.clone(),
};
let blocks_already_received = share_chain.get_blocks(&missing_parents).await;
missing_parents.retain(|(height, hash)| {
!blocks_already_received
.iter()
.any(|b| b.height == *height && b.hash == *hash)
});

if missing_parents.is_empty() {
info!(target: SYNC_REQUEST_LOG_TARGET, squad = &self.config.squad; "All missing blocks have already been received");
return;
}

// If it's not from new_block_notify, ask only the peer that sent the blocks
if !is_from_new_block_notify {
info!(target: SYNC_REQUEST_LOG_TARGET, squad = &self.config.squad; "Sending sync request direct to peer {} for blocks {:?} because we did not receive it from sync", peer, missing_parents.iter().map(|(height, hash)|format!("{}({:x}{:x}{:x}{:x})",height, hash[0], hash[1], hash[2], hash[3])).collect::<Vec<String>>());
Expand Down Expand Up @@ -1583,30 +1606,25 @@ where S: ShareChain

let timer = Instant::now();
let algo = response.algo();
let (share_chain, synced_bool) = match algo {
let share_chain = match algo {
PowAlgorithm::RandomX => {
self.randomx_last_sync_requested_block = None;
(
self.share_chain_random_x.clone(),
self.are_we_synced_with_randomx_p2pool.clone(),
)
self.share_chain_random_x.clone()
},
PowAlgorithm::Sha3x => {
self.sha3x_last_sync_requested_block = None;
(
self.share_chain_sha3x.clone(),
self.are_we_synced_with_sha3x_p2pool.clone(),
)
self.share_chain_sha3x.clone()
},
};
let their_tip_hash = *response.tip_hash();
let their_height = response.tip_height();
let their_pow = response.achieved_pow();
let mut blocks: Vec<_> = response.into_blocks().into_iter().map(Arc::new).collect();
info!(target: SYNC_REQUEST_LOG_TARGET, "Received catch up sync response for chain {} from {} with blocks {}. Their tip: {}:{}", algo, peer, blocks.iter().map(|a| a.height.to_string()).join(", "), their_height, &their_tip_hash.to_hex()[0..8]);
if blocks.is_empty() {
warn!(target: SYNC_REQUEST_LOG_TARGET, "Peer {} sent 0 blocks for catch up sync", peer);
return;
}
info!(target: SYNC_REQUEST_LOG_TARGET, "Received catch up sync response for chain {} from {} with blocks {}. Their tip: {}:{}", algo, peer, blocks.iter().map(|a| a.height.to_string()).join(", "), their_height, &their_tip_hash.to_hex()[0..8]);
let tx = self.inner_request_tx.clone();
let squad = self.config.squad.clone();
let network_peer_store = self.network_peer_store.clone();
Expand All @@ -1616,23 +1634,29 @@ where S: ShareChain
tokio::spawn(async move {
blocks.sort_by(|a, b| a.height.cmp(&b.height));
let last_block_from_them = blocks.last().map(|b| (b.height, b.hash));
let mut new_tip = ChainAddResult::default();
let mut blocks_added = Vec::new();
// let mut new_tip = ChainAddResult::default();
// let mut blocks_added = Vec::new();
for b in &blocks {
match share_chain.add_synced_blocks(&[b.clone()]).await {
Ok(result) => {
blocks_added.push(format!("{}({})", b.height, &b.hash.to_hex()[0..8]));
new_tip.combine(result);
},
Err(error) => {
error!(target: SYNC_REQUEST_LOG_TARGET, squad; "Failed to add Catchup synced blocks to share chain: {error:?}");
network_peer_store
.write()
.await
.move_to_grey_list(peer, format!("Block failed validation: {error}"));
return;
},
}
let message = InnerRequest::AddSyncedBlock {
algo,
block: b.clone(),
source_peer: peer,
};
let _unused = tx.send(message);
// match share_chain.add_synced_blocks(&[b.clone()]).await {
// Ok(result) => {
// blocks_added.push(format!("{}({})", b.height, &b.hash.to_hex()[0..8]));
// new_tip.combine(result);
// },
// Err(error) => {
// error!(target: SYNC_REQUEST_LOG_TARGET, squad; "Failed to add Catchup synced blocks to share
// chain: {error:?}"); network_peer_store
// .write()
// .await
// .move_to_grey_list(peer, format!("Block failed validation: {error}"));
// return;
// },
// }
}
{
if let Some(ref last_block) = last_block_from_them {
Expand All @@ -1641,21 +1665,21 @@ where S: ShareChain
}
}

info!(target: LOG_TARGET, "[{:?}] Blocks via catchup sync added {:?}", algo, blocks_added);
info!(target: LOG_TARGET, "[{:?}] Blocks via catchup sync result {}", algo, new_tip);
let missing_parents = new_tip.into_missing_parents_vec();
if !missing_parents.is_empty() {
let sync_share_chain = SyncMissingBlocks {
algo,
peer,
missing_parents,
is_from_new_block_notify: false,
depth: 0,
};
let _unused = tx.send(InnerRequest::DoSyncMissingBlocks(sync_share_chain));
}

info!(target: SYNC_REQUEST_LOG_TARGET, squad = &squad; "Synced blocks added to share chain");
info!(target: LOG_TARGET, "[{:?}] Blocks via catchup sync added {:?}", algo, blocks.iter().map(|a| format!("{}({:x}{:x}{:x}{:x})",a.height, a.hash[0], a.hash[1], a.hash[2], a.hash[3])).collect::<Vec<String>>());
// info!(target: LOG_TARGET, "[{:?}] Blocks via catchup sync result {}", algo, new_tip);
// let missing_parents = new_tip.into_missing_parents_vec();
// if !missing_parents.is_empty() {
// let sync_share_chain = SyncMissingBlocks {
// algo,
// peer,
// missing_parents,
// is_from_new_block_notify: false,
// depth: 0,
// };
// let _unused = tx.send(InnerRequest::DoSyncMissingBlocks(sync_share_chain));
// }

// info!(target: SYNC_REQUEST_LOG_TARGET, squad = &squad; "Synced blocks added to share chain");
let our_pow = share_chain.get_total_chain_pow().await;
let mut must_continue_sync = their_pow > our_pow.as_u128();
info!(target: SYNC_REQUEST_LOG_TARGET, "[{:?}] must continue: {}", algo, must_continue_sync);
Expand All @@ -1675,21 +1699,13 @@ where S: ShareChain
peer,
last_block_from_them,
their_height,
// their_pow,
permit,
};
let _unused = tx.send(InnerRequest::PerformCatchUpSync(perform_catch_up_sync));
} else {
info!(target: SYNC_REQUEST_LOG_TARGET, "Catch up sync completed for chain {} from {} after {} catchups", algo, peer, num_catchups.map(|s| s.to_string()).unwrap_or_else(|| "None".to_string()));
// this only gets called after sync completes, lets set synced status = true
let (max_known_network_height, max_known_network_pow, peer_with_best) =
peer_store_write_lock.max_known_network_height(algo);

if our_pow.as_u128() >= max_known_network_pow || Some(&peer) == peer_with_best.as_ref() {
info!(target: SYNC_REQUEST_LOG_TARGET, "[{}] our pow is greater than max known network pow, we are now synced", algo);
synced_bool.store(true, std::sync::atomic::Ordering::SeqCst);
} else {
info!(target: SYNC_REQUEST_LOG_TARGET, "[{}] our pow is less than max known network pow, we are not synced, will continue to search for better POW. Best peer is {} at height {}", algo, peer_with_best.map(|p| p.to_base58()).unwrap_or_else(|| "None".to_string()), max_known_network_height);
}

peer_store_write_lock.reset_catch_up_attempts(&peer);
}
if timer.elapsed() > MAX_ACCEPTABLE_P2P_MESSAGE_TIMEOUT {
Expand Down Expand Up @@ -1756,6 +1772,7 @@ where S: ShareChain
peer,
last_block_from_them,
their_height,
// their_pow: _,
mut permit,
} = perform_catch_up_sync;

Expand All @@ -1775,6 +1792,7 @@ where S: ShareChain
self.sha3x_last_sync_requested_block.take(),
),
};
// let our_pow = share_chain.get_total_chain_pow().await;

if in_progress_syncs.contains_key(&peer) {
warn!(target: SYNC_REQUEST_LOG_TARGET, "Already in progress with sync from {}", peer);
Expand All @@ -1789,10 +1807,25 @@ where S: ShareChain
return Ok(());
},
};
} else {
// Check to see if there is a better peer and release the permit if that is the case
// {
// let peer_store_lock = self.network_peer_store.read().await;
// // this only gets called after sync completes, lets set synced status = true
// let (_max_known_network_height, max_known_network_pow, peer_with_best) =
// peer_store_lock.max_known_network_height(algo);

// if let Some(peer_with_best) = peer_with_best {
// if their_pow < max_known_network_pow {
// warn!(target: SYNC_REQUEST_LOG_TARGET, "There is a peer with better POW than {}, so not
// trying to sync with them. Best peer is currently:{}", peer, peer_with_best);
// return Ok(());
// }
// }
// }
}

let permit = permit.unwrap();

let (mut i_have_blocks, last_block_from_them) = match (last_block_from_them, last_progress) {
(None, Some(last_progress)) => {
// this is most likely a new catchup sync request, while the previous attempt failed so we ask with both
Expand Down Expand Up @@ -2053,6 +2086,61 @@ where S: ShareChain
error!(target: LOG_TARGET, squad = &self.config.squad; "Failed to perform catch up sync: {e:?}");
}
},
InnerRequest::AddSyncedBlock {
algo,
block,
source_peer,
} => {
let (share_chain, synced_bool) = match algo {
PowAlgorithm::RandomX => (
self.share_chain_random_x.clone(),
self.are_we_synced_with_randomx_p2pool.clone(),
),
PowAlgorithm::Sha3x => (
self.share_chain_sha3x.clone(),
self.are_we_synced_with_sha3x_p2pool.clone(),
),
};
info!(target: SYNC_REQUEST_LOG_TARGET, "Adding block {}({:x}{:x}{:x}{:x}) to share chain from peer {}", block.height, block.hash[0], block.hash[1], block.hash[2], block.hash[3], source_peer);
match share_chain.add_synced_blocks(&[block]).await {
Ok(result) => {
info!(target: LOG_TARGET, "[{:?}] Blocks via catchup sync result {}", algo, result);
let missing_parents = result.into_missing_parents_vec();
if !missing_parents.is_empty() {
let sync_share_chain = SyncMissingBlocks {
algo,
peer: source_peer,
missing_parents,
is_from_new_block_notify: false,
depth: 0,
};
let tx = self.inner_request_tx.clone();
let _unused = tx.send(InnerRequest::DoSyncMissingBlocks(sync_share_chain));
}
let our_pow = share_chain.get_total_chain_pow().await;
let peer_store_lock = self.network_peer_store.read().await;
// this only gets called after sync completes, lets set synced status = true
let (max_known_network_height, max_known_network_pow, peer_with_best) =
peer_store_lock.max_known_network_height(algo);

if our_pow.as_u128() >= max_known_network_pow {
info!(target: SYNC_REQUEST_LOG_TARGET, "[{}] our pow is greater than max known network pow, we are now synced", algo);
synced_bool.store(true, std::sync::atomic::Ordering::SeqCst);
} else {
info!(target: SYNC_REQUEST_LOG_TARGET, "[{}] our pow is less than max known network pow, we are not synced, will continue to search for better POW. Best peer is {} at height {}", algo, peer_with_best.map(|p| p.to_base58()).unwrap_or_else(|| "None".to_string()), max_known_network_height);
}
},
Err(error) => {
error!(target: SYNC_REQUEST_LOG_TARGET, "Failed to add Catchup synced blocks to share
chain: {error:?}");
// network_peer_store
// .write()
// .await
// .move_to_grey_list(peer, format!("Block failed validation: {error}"));
},
}
// info!(target: LOG_TARGET, "[{:?}] Blocks via catchup sync added {:?}", algo, blocks_added);
},
}
}

Expand Down Expand Up @@ -2093,6 +2181,7 @@ where S: ShareChain
tokio::pin!(seek_connections_interval);

loop {
// info!(target: LOG_TARGET, "P2P service main loop iter");
select! {
// biased;
_ = &mut shutdown_signal => {
Expand Down
3 changes: 3 additions & 0 deletions src/server/p2p/peer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ impl PeerStore {
if record.last_ping.map(|t| t.as_u64() < now - 60).unwrap_or(true) {
continue;
}
if record.peer_info.timestamp < now - 60 {
continue;
}
match algo {
PowAlgorithm::RandomX => {
let achieved_pow = record.peer_info.current_random_x_pow;
Expand Down
12 changes: 5 additions & 7 deletions src/sharechain/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,22 +321,20 @@ impl InMemoryShareChain {
fn all_blocks_with_lock(
&self,
p2_chain: &RwLockReadGuard<'_, P2Chain<LmdbBlockStorage>>,
start_height: Option<u64>,
mut start_height: Option<u64>,
page_size: usize,
main_chain_only: bool,
) -> Result<Vec<Arc<P2Block>>, ShareChainError> {
let mut res = Vec::with_capacity(page_size);
let mut num_actual_blocks = 0;
let lowest_height = p2_chain.lowest_chain_level_height().unwrap_or(0);
if start_height.unwrap_or(0) < lowest_height {
start_height = Some(lowest_height);
}
let mut level = if let Some(level) = p2_chain.level_at_height(start_height.unwrap_or(0)) {
level
} else {
// we dont have that block, see if we have a higher lowest block than they are asking for and start there
if start_height.unwrap_or(0) < lowest_height {
p2_chain.level_at_height(lowest_height).unwrap()
} else {
return Ok(res);
}
return Ok(res);
};

loop {
Expand Down

0 comments on commit 9855ac1

Please sign in to comment.