Skip to content

Commit

Permalink
chore: sync improvements (#129)
Browse files Browse the repository at this point in the history
Description
---
added logging and improved sync block request
  • Loading branch information
SWvheerden authored Nov 1, 2024
1 parent c6e0424 commit c50e8cc
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 63 deletions.
6 changes: 5 additions & 1 deletion src/server/grpc/p2pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
time::Instant,
};

use libp2p::PeerId;
use log::{debug, error, info, warn};
use minotari_app_grpc::tari_rpc::{
base_node_client::BaseNodeClient,
Expand Down Expand Up @@ -54,6 +55,7 @@ const LOG_TARGET: &str = "tari::p2pool::server::grpc::p2pool";
pub(crate) struct ShaP2PoolGrpc<S>
where S: ShareChain
{
local_peer_id: PeerId,
/// Base node client
client: Arc<RwLock<BaseNodeClient<tonic::transport::Channel>>>,
/// P2P service client
Expand All @@ -80,6 +82,7 @@ impl<S> ShaP2PoolGrpc<S>
where S: ShareChain
{
pub async fn new(
local_peer_id: PeerId,
base_node_address: String,
p2p_client: ServiceClient,
share_chain_sha3x: Arc<S>,
Expand All @@ -92,6 +95,7 @@ where S: ShareChain
squad: Squad,
) -> Result<Self, Error> {
Ok(Self {
local_peer_id,
client: Arc::new(RwLock::new(
util::connect_base_node(base_node_address, shutdown_signal).await?,
)),
Expand Down Expand Up @@ -131,7 +135,7 @@ where S: ShareChain
}
if new_tip {
let total_pow = share_chain.get_total_chain_pow().await;
let notify = NotifyNewTipBlock::new(pow_algo, new_blocks, total_pow);
let notify = NotifyNewTipBlock::new(self.local_peer_id.clone(), pow_algo, new_blocks, total_pow);
let res = self
.p2p_client
.broadcast_block(notify)
Expand Down
10 changes: 10 additions & 0 deletions src/server/p2p/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,30 +220,40 @@ pub struct DirectPeerInfoResponse {
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct NotifyNewTipBlock {
pub version: u64,
peer_id: PeerId,
pub algo: u64,
pub new_blocks: Vec<(u64, FixedHash)>,
pub total_accumulated_difficulty: u128,
pub timestamp: u64,
}
impl_conversions!(NotifyNewTipBlock);

impl NotifyNewTipBlock {
pub fn new(
peer_id: PeerId,
algo: PowAlgorithm,
new_blocks: Vec<(u64, FixedHash)>,
total_acculumted_difficulty: AccumulatedDifficulty,
) -> Self {
let total_acculumted_difficulty = total_acculumted_difficulty.as_u128();
let timestamp = EpochTime::now().as_u64();
Self {
version: PROTOCOL_VERSION,
peer_id,
algo: algo.as_u64(),
new_blocks,
total_accumulated_difficulty: total_acculumted_difficulty,
timestamp,
}
}

pub fn algo(&self) -> PowAlgorithm {
PowAlgorithm::try_from(self.algo).unwrap_or(PowAlgorithm::RandomX)
}

pub fn peer_id(&self) -> &PeerId {
&self.peer_id
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down
87 changes: 58 additions & 29 deletions src/server/p2p/network.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright 2024 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::{
collections::HashMap,
fmt::Display,
Expand All @@ -24,7 +25,7 @@ use libp2p::{
autonat::{self, NatStatus},
dcutr,
futures::StreamExt,
gossipsub::{self, Hasher, IdentTopic, Message, MessageAcceptance, MessageId, PublishError},
gossipsub::{self, IdentTopic, Message, MessageAcceptance, MessageId, PublishError},
identify::{self, Info},
identity::Keypair,
kad::{self, store::MemoryStore, Event},
Expand Down Expand Up @@ -59,7 +60,7 @@ use tari_common::configuration::Network;
use tari_common_types::types::FixedHash;
use tari_core::proof_of_work::{AccumulatedDifficulty, PowAlgorithm};
use tari_shutdown::ShutdownSignal;
use tari_utilities::hex::Hex;
use tari_utilities::{epoch_time::EpochTime, hex::Hex};
use tokio::{
fs::File,
io::{self, AsyncReadExt, AsyncWriteExt},
Expand Down Expand Up @@ -486,6 +487,10 @@ where S: ShareChain
Ok(())
}

pub fn local_peer_id(&self) -> PeerId {
self.swarm.local_peer_id().clone()
}

async fn create_peer_info(&mut self, public_addresses: Vec<Multiaddr>) -> Result<PeerInfo, Error> {
let share_chain_sha3x = self.share_chain_sha3x.clone();
let share_chain_random_x = self.share_chain_random_x.clone();
Expand Down Expand Up @@ -657,17 +662,22 @@ where S: ShareChain
debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} has an outdated version, skipping", peer);
return Ok(MessageAcceptance::Reject);
}
// lets check age
if payload.timestamp < EpochTime::now().as_u64().saturating_sub(60) {
debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a notify message that is too old, skipping", peer);
return Ok(MessageAcceptance::Ignore);
}
let payload = Arc::new(payload);
debug!(target: MESSAGE_LOGGING_LOG_TARGET, "[SQUAD_NEW_BLOCK_TOPIC] New block from gossip: {peer:?} -> {payload:?}");

let source_peer = payload.peer_id();
// If we don't have this peer, try do peer exchange
if !self.network_peer_store.exists(&peer) {
self.initiate_direct_peer_exchange(peer).await;
if !self.network_peer_store.exists(source_peer) {
self.initiate_direct_peer_exchange(source_peer).await;
}

// verify payload
if payload.new_blocks.is_empty() {
warn!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent notify new tip with no blocks.", peer);
warn!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent notify new tip with no blocks.", source_peer);
return Ok(MessageAcceptance::Reject);
}

Expand All @@ -684,7 +694,7 @@ where S: ShareChain
payload.new_blocks.iter().map(|(h, _)| *h).max().unwrap_or(0) <=
our_tip.saturating_sub(4)
{
debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a block that is not better than ours, skipping", peer);
debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a block that is not better than ours, skipping", source_peer);
return Ok(MessageAcceptance::Ignore);
}

Expand All @@ -696,11 +706,7 @@ where S: ShareChain
missing_blocks.push(block.clone());
}
if !missing_blocks.is_empty() {
if self.start_time.elapsed() < STARTUP_CATCH_UP_TIME {
warn!(target: LOG_TARGET, squad = &self.config.squad; "Still in startup catch up time, skipping block until we have synced");
return Ok(MessageAcceptance::Accept);
}
self.sync_share_chain(algo, peer, missing_blocks, true).await;
self.sync_share_chain(algo, source_peer, missing_blocks, true).await;
}
return Ok(MessageAcceptance::Accept);
},
Expand Down Expand Up @@ -743,7 +749,7 @@ where S: ShareChain

match add_status {
AddPeerStatus::NewPeer => {
self.initiate_direct_peer_exchange(peer).await;
self.initiate_direct_peer_exchange(&peer).await;
return true;
},
AddPeerStatus::Existing => {},
Expand All @@ -758,7 +764,7 @@ where S: ShareChain
false
}

async fn initiate_direct_peer_exchange(&mut self, peer: PeerId) {
async fn initiate_direct_peer_exchange(&mut self, peer: &PeerId) {
if let Ok(my_info) = self
.create_peer_info(self.swarm.external_addresses().cloned().collect())
.await
Expand All @@ -774,7 +780,7 @@ where S: ShareChain
self.swarm
.behaviour_mut()
.direct_peer_exchange
.send_request(&peer, DirectPeerInfoRequest {
.send_request(peer, DirectPeerInfoRequest {
info: my_info,
peer_id: local_peer_id.to_base58(),
});
Expand Down Expand Up @@ -897,13 +903,28 @@ where S: ShareChain
let blocks: Vec<_> = response.into_blocks().into_iter().map(|a| Arc::new(a)).collect();
info!(target: SYNC_REQUEST_LOG_TARGET, "Received sync response for chain {} from {} with blocks {}", algo, peer, blocks.iter().map(|a| a.height.to_string()).join(", "));
match share_chain.add_synced_blocks(&blocks).await {
Ok(result) => {
info!(target: LOG_TARGET, squad = &self.config.squad; "Synced blocks added to share chain: {result:?}");
// Ok(())
Ok(new_tip) => {
info!(target: LOG_TARGET, squad = &self.config.squad; "Synced blocks added to share chain, new tip added [{}]",new_tip);
if new_tip {
info!(target: SYNC_REQUEST_LOG_TARGET, "New tip block from sync: {}", blocks.iter().map(|a| a.height.to_string()).join(", "));
let new_blocks = share_chain.get_tip_and_uncles().await;

if new_blocks.is_empty() {
error!(target: SYNC_REQUEST_LOG_TARGET, "Could not get added new tip from chain storage");
return;
};
let total_pow = share_chain.get_total_chain_pow().await;
let _ = self.client_broadcast_block_tx.send(NotifyNewTipBlock::new(
self.local_peer_id(),
algo,
new_blocks,
total_pow,
));
}
},
Err(error) => match error {
crate::sharechain::error::Error::BlockParentDoesNotExist { missing_parents } => {
self.sync_share_chain(algo, peer, missing_parents, false).await;
self.sync_share_chain(algo, &peer, missing_parents, false).await;
return;
},
_ => {
Expand All @@ -924,7 +945,7 @@ where S: ShareChain
async fn sync_share_chain(
&mut self,
algo: PowAlgorithm,
peer: PeerId,
peer: &PeerId,
mut missing_parents: Vec<(u64, FixedHash)>,
is_from_new_block_notify: bool,
) {
Expand All @@ -940,7 +961,11 @@ where S: ShareChain
// return;
}

info!(target: SYNC_REQUEST_LOG_TARGET, "Sending sync to {} for blocks {}", peer, missing_parents.iter().map(|a| a.0.to_string()).join(", "));
if is_from_new_block_notify {
info!(target: SYNC_REQUEST_LOG_TARGET, "[{}] Sending sync to {} for blocks {} from notify", algo, peer, missing_parents.iter().map(|a| a.0.to_string()).join(", "));
} else {
info!(target: SYNC_REQUEST_LOG_TARGET, "[{}] Sending sync to {} for blocks {} from catchup", algo, peer, missing_parents.iter().map(|a| a.0.to_string()).join(", "));
}

if missing_parents.is_empty() {
warn!(target: LOG_TARGET, squad = &self.config.squad; "Sync called but with no missing parents.");
Expand All @@ -962,7 +987,7 @@ where S: ShareChain
.swarm
.behaviour_mut()
.share_chain_sync
.send_request(&peer, ShareChainSyncRequest::new(algo, missing_parents));
.send_request(peer, ShareChainSyncRequest::new(algo, missing_parents));
return;
}

Expand Down Expand Up @@ -1281,13 +1306,11 @@ where S: ShareChain
.unwrap_or(MAX_CATCH_UP_ATTEMPTS) <
MAX_CATCH_UP_ATTEMPTS
{
dbg!("Must continue sync");
dbg!(their_tip_hash);
self.network_peer_store.add_catch_up_attempt(&peer);
match self.perform_catch_up_sync(algo, peer, last_block_from_them).await {
Ok(_) => {},
Err(error) => {
error!(target: LOG_TARGET, squad = &self.config.squad; "Failed to perform catch up sync: {error:?}");
error!(target: LOG_TARGET, squad = &self.config.squad; "Catchup synced blocks added Failed to perform catch up sync: {error:?}");
},
}
} else {
Expand All @@ -1296,11 +1319,12 @@ where S: ShareChain
},
Err(error) => match error {
crate::sharechain::error::Error::BlockParentDoesNotExist { missing_parents } => {
self.sync_share_chain(algo, peer, missing_parents, false).await;
info!(target: LOG_TARGET, squad = &self.config.squad; "catchup sync Reporting missing blocks {}", missing_parents.len());
self.sync_share_chain(algo, &peer, missing_parents, false).await;
return;
},
_ => {
error!(target: LOG_TARGET, squad = &self.config.squad; "Failed to add synced blocks to share chain: {error:?}");
error!(target: LOG_TARGET, squad = &self.config.squad; "Failed to add Catchup synced blocks to share chain: {error:?}");
self.network_peer_store
.move_to_grey_list(peer, format!("Block failed validation: {}", error))
.await;
Expand Down Expand Up @@ -1672,6 +1696,11 @@ where S: ShareChain
};

// info!(target: LOG_TARGET, squad = &self.config.squad; "Best peers to sync: {best_peers:?}");
if best_peers.is_empty() {
info!(target: LOG_TARGET, squad = &self.config.squad; "No peers found to try and sync to");
} else {
info!(target: LOG_TARGET, squad = &self.config.squad; "Found {} peers to try and sync to", best_peers.len());
}

for record in best_peers {
let (their_height, their_pow) = match algo {
Expand All @@ -1685,7 +1714,7 @@ where S: ShareChain
),
};
if their_pow > our_pow {
info!(target: LOG_TARGET, squad = &self.config.squad; "[{:?}] Trying to sync from peer: {} with height{}", algo,record.peer_id, their_height);
info!(target: LOG_TARGET, squad = &self.config.squad; "[{:?}] Trying to perform catchup sync from peer: {} with height{}", algo,record.peer_id, their_height);

let _ = self.perform_catch_up_sync(*algo, record.peer_id, None).await.inspect_err(|e|
warn!(target: LOG_TARGET, squad = &self.config.squad; "Failed to perform catch up sync: {}", e)
Expand Down Expand Up @@ -1722,7 +1751,7 @@ where S: ShareChain
.unwrap();
}
let formatter = human_format::Formatter::new();
let blocks = chain.all_blocks(None, 0, 10000, false).await.expect("errored");
let blocks = chain.all_blocks(None, 10000, false).await.expect("errored");
for b in blocks {
file.write_all(
format!(
Expand Down
2 changes: 1 addition & 1 deletion src/server/p2p/peer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl PeerStore {

pub fn best_peers_to_sync(&self, count: usize, algo: PowAlgorithm) -> Vec<PeerStoreRecord> {
let mut peers = self.whitelist_peers.values().collect::<Vec<_>>();
// ignore all peers records that are older than 30 minutes
// ignore all peers records that are older than 1 minutes
let timestamp = EpochTime::now().as_u64() - 60 * 1;
peers.retain(|peer| peer.peer_info.timestamp > timestamp);
match algo {
Expand Down
2 changes: 2 additions & 0 deletions src/server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ where S: ShareChain
)
.await
.map_err(Error::P2PService)?;
let local_peer_id = p2p_service.local_peer_id();

let mut base_node_grpc_server = None;
let mut p2pool_server = None;
Expand All @@ -95,6 +96,7 @@ where S: ShareChain
base_node_grpc_server = Some(BaseNodeServer::new(base_node_grpc_service));

let p2pool_grpc_service = ShaP2PoolGrpc::new(
local_peer_id,
config.base_node_address.clone(),
p2p_service.client(),
share_chain_sha3x.clone(),
Expand Down
Loading

0 comments on commit c50e8cc

Please sign in to comment.