From c50e8cc0d35b5fc447c99708ea0bea3e5473c746 Mon Sep 17 00:00:00 2001 From: SW van Heerden Date: Fri, 1 Nov 2024 14:21:11 +0200 Subject: [PATCH] chore: sync improvements (#129) Description --- added logging and improved sync block request --- src/server/grpc/p2pool.rs | 6 ++- src/server/p2p/messages.rs | 10 +++++ src/server/p2p/network.rs | 87 ++++++++++++++++++++++++------------ src/server/p2p/peer_store.rs | 2 +- src/server/server.rs | 2 + src/sharechain/in_memory.rs | 60 +++++++++++++------------ src/sharechain/mod.rs | 3 +- src/sharechain/p2chain.rs | 3 +- 8 files changed, 110 insertions(+), 63 deletions(-) diff --git a/src/server/grpc/p2pool.rs b/src/server/grpc/p2pool.rs index 7566da74..c1b8565a 100644 --- a/src/server/grpc/p2pool.rs +++ b/src/server/grpc/p2pool.rs @@ -8,6 +8,7 @@ use std::{ time::Instant, }; +use libp2p::PeerId; use log::{debug, error, info, warn}; use minotari_app_grpc::tari_rpc::{ base_node_client::BaseNodeClient, @@ -54,6 +55,7 @@ const LOG_TARGET: &str = "tari::p2pool::server::grpc::p2pool"; pub(crate) struct ShaP2PoolGrpc where S: ShareChain { + local_peer_id: PeerId, /// Base node client client: Arc>>, /// P2P service client @@ -80,6 +82,7 @@ impl ShaP2PoolGrpc where S: ShareChain { pub async fn new( + local_peer_id: PeerId, base_node_address: String, p2p_client: ServiceClient, share_chain_sha3x: Arc, @@ -92,6 +95,7 @@ where S: ShareChain squad: Squad, ) -> Result { Ok(Self { + local_peer_id, client: Arc::new(RwLock::new( util::connect_base_node(base_node_address, shutdown_signal).await?, )), @@ -131,7 +135,7 @@ where S: ShareChain } if new_tip { let total_pow = share_chain.get_total_chain_pow().await; - let notify = NotifyNewTipBlock::new(pow_algo, new_blocks, total_pow); + let notify = NotifyNewTipBlock::new(self.local_peer_id.clone(), pow_algo, new_blocks, total_pow); let res = self .p2p_client .broadcast_block(notify) diff --git a/src/server/p2p/messages.rs b/src/server/p2p/messages.rs index 27201ff7..3abd277b 100644 --- a/src/server/p2p/messages.rs +++ b/src/server/p2p/messages.rs @@ -220,30 +220,40 @@ pub struct DirectPeerInfoResponse { #[derive(Serialize, Deserialize, Debug, Clone)] pub struct NotifyNewTipBlock { pub version: u64, + peer_id: PeerId, pub algo: u64, pub new_blocks: Vec<(u64, FixedHash)>, pub total_accumulated_difficulty: u128, + pub timestamp: u64, } impl_conversions!(NotifyNewTipBlock); impl NotifyNewTipBlock { pub fn new( + peer_id: PeerId, algo: PowAlgorithm, new_blocks: Vec<(u64, FixedHash)>, total_acculumted_difficulty: AccumulatedDifficulty, ) -> Self { let total_acculumted_difficulty = total_acculumted_difficulty.as_u128(); + let timestamp = EpochTime::now().as_u64(); Self { version: PROTOCOL_VERSION, + peer_id, algo: algo.as_u64(), new_blocks, total_accumulated_difficulty: total_acculumted_difficulty, + timestamp, } } pub fn algo(&self) -> PowAlgorithm { PowAlgorithm::try_from(self.algo).unwrap_or(PowAlgorithm::RandomX) } + + pub fn peer_id(&self) -> &PeerId { + &self.peer_id + } } #[derive(Serialize, Deserialize, Debug, Clone)] diff --git a/src/server/p2p/network.rs b/src/server/p2p/network.rs index 79c450b6..962f3197 100644 --- a/src/server/p2p/network.rs +++ b/src/server/p2p/network.rs @@ -1,5 +1,6 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause + use std::{ collections::HashMap, fmt::Display, @@ -24,7 +25,7 @@ use libp2p::{ autonat::{self, NatStatus}, dcutr, futures::StreamExt, - gossipsub::{self, Hasher, IdentTopic, Message, MessageAcceptance, MessageId, PublishError}, + gossipsub::{self, IdentTopic, Message, MessageAcceptance, MessageId, PublishError}, identify::{self, Info}, identity::Keypair, kad::{self, store::MemoryStore, Event}, @@ -59,7 +60,7 @@ use tari_common::configuration::Network; use tari_common_types::types::FixedHash; use tari_core::proof_of_work::{AccumulatedDifficulty, PowAlgorithm}; use tari_shutdown::ShutdownSignal; -use tari_utilities::hex::Hex; +use tari_utilities::{epoch_time::EpochTime, hex::Hex}; use tokio::{ fs::File, io::{self, AsyncReadExt, AsyncWriteExt}, @@ -486,6 +487,10 @@ where S: ShareChain Ok(()) } + pub fn local_peer_id(&self) -> PeerId { + self.swarm.local_peer_id().clone() + } + async fn create_peer_info(&mut self, public_addresses: Vec) -> Result { let share_chain_sha3x = self.share_chain_sha3x.clone(); let share_chain_random_x = self.share_chain_random_x.clone(); @@ -657,17 +662,22 @@ where S: ShareChain debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} has an outdated version, skipping", peer); return Ok(MessageAcceptance::Reject); } + // lets check age + if payload.timestamp < EpochTime::now().as_u64().saturating_sub(60) { + debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a notify message that is too old, skipping", peer); + return Ok(MessageAcceptance::Ignore); + } let payload = Arc::new(payload); debug!(target: MESSAGE_LOGGING_LOG_TARGET, "[SQUAD_NEW_BLOCK_TOPIC] New block from gossip: {peer:?} -> {payload:?}"); - + let source_peer = payload.peer_id(); // If we don't have this peer, try do peer exchange - if !self.network_peer_store.exists(&peer) { - self.initiate_direct_peer_exchange(peer).await; + if !self.network_peer_store.exists(source_peer) { + self.initiate_direct_peer_exchange(source_peer).await; } // verify payload if payload.new_blocks.is_empty() { - warn!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent notify new tip with no blocks.", peer); + warn!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent notify new tip with no blocks.", source_peer); return Ok(MessageAcceptance::Reject); } @@ -684,7 +694,7 @@ where S: ShareChain payload.new_blocks.iter().map(|(h, _)| *h).max().unwrap_or(0) <= our_tip.saturating_sub(4) { - debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a block that is not better than ours, skipping", peer); + debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a block that is not better than ours, skipping", source_peer); return Ok(MessageAcceptance::Ignore); } @@ -696,11 +706,7 @@ where S: ShareChain missing_blocks.push(block.clone()); } if !missing_blocks.is_empty() { - if self.start_time.elapsed() < STARTUP_CATCH_UP_TIME { - warn!(target: LOG_TARGET, squad = &self.config.squad; "Still in startup catch up time, skipping block until we have synced"); - return Ok(MessageAcceptance::Accept); - } - self.sync_share_chain(algo, peer, missing_blocks, true).await; + self.sync_share_chain(algo, source_peer, missing_blocks, true).await; } return Ok(MessageAcceptance::Accept); }, @@ -743,7 +749,7 @@ where S: ShareChain match add_status { AddPeerStatus::NewPeer => { - self.initiate_direct_peer_exchange(peer).await; + self.initiate_direct_peer_exchange(&peer).await; return true; }, AddPeerStatus::Existing => {}, @@ -758,7 +764,7 @@ where S: ShareChain false } - async fn initiate_direct_peer_exchange(&mut self, peer: PeerId) { + async fn initiate_direct_peer_exchange(&mut self, peer: &PeerId) { if let Ok(my_info) = self .create_peer_info(self.swarm.external_addresses().cloned().collect()) .await @@ -774,7 +780,7 @@ where S: ShareChain self.swarm .behaviour_mut() .direct_peer_exchange - .send_request(&peer, DirectPeerInfoRequest { + .send_request(peer, DirectPeerInfoRequest { info: my_info, peer_id: local_peer_id.to_base58(), }); @@ -897,13 +903,28 @@ where S: ShareChain let blocks: Vec<_> = response.into_blocks().into_iter().map(|a| Arc::new(a)).collect(); info!(target: SYNC_REQUEST_LOG_TARGET, "Received sync response for chain {} from {} with blocks {}", algo, peer, blocks.iter().map(|a| a.height.to_string()).join(", ")); match share_chain.add_synced_blocks(&blocks).await { - Ok(result) => { - info!(target: LOG_TARGET, squad = &self.config.squad; "Synced blocks added to share chain: {result:?}"); - // Ok(()) + Ok(new_tip) => { + info!(target: LOG_TARGET, squad = &self.config.squad; "Synced blocks added to share chain, new tip added [{}]",new_tip); + if new_tip { + info!(target: SYNC_REQUEST_LOG_TARGET, "New tip block from sync: {}", blocks.iter().map(|a| a.height.to_string()).join(", ")); + let new_blocks = share_chain.get_tip_and_uncles().await; + + if new_blocks.is_empty() { + error!(target: SYNC_REQUEST_LOG_TARGET, "Could not get added new tip from chain storage"); + return; + }; + let total_pow = share_chain.get_total_chain_pow().await; + let _ = self.client_broadcast_block_tx.send(NotifyNewTipBlock::new( + self.local_peer_id(), + algo, + new_blocks, + total_pow, + )); + } }, Err(error) => match error { crate::sharechain::error::Error::BlockParentDoesNotExist { missing_parents } => { - self.sync_share_chain(algo, peer, missing_parents, false).await; + self.sync_share_chain(algo, &peer, missing_parents, false).await; return; }, _ => { @@ -924,7 +945,7 @@ where S: ShareChain async fn sync_share_chain( &mut self, algo: PowAlgorithm, - peer: PeerId, + peer: &PeerId, mut missing_parents: Vec<(u64, FixedHash)>, is_from_new_block_notify: bool, ) { @@ -940,7 +961,11 @@ where S: ShareChain // return; } - info!(target: SYNC_REQUEST_LOG_TARGET, "Sending sync to {} for blocks {}", peer, missing_parents.iter().map(|a| a.0.to_string()).join(", ")); + if is_from_new_block_notify { + info!(target: SYNC_REQUEST_LOG_TARGET, "[{}] Sending sync to {} for blocks {} from notify", algo, peer, missing_parents.iter().map(|a| a.0.to_string()).join(", ")); + } else { + info!(target: SYNC_REQUEST_LOG_TARGET, "[{}] Sending sync to {} for blocks {} from catchup", algo, peer, missing_parents.iter().map(|a| a.0.to_string()).join(", ")); + } if missing_parents.is_empty() { warn!(target: LOG_TARGET, squad = &self.config.squad; "Sync called but with no missing parents."); @@ -962,7 +987,7 @@ where S: ShareChain .swarm .behaviour_mut() .share_chain_sync - .send_request(&peer, ShareChainSyncRequest::new(algo, missing_parents)); + .send_request(peer, ShareChainSyncRequest::new(algo, missing_parents)); return; } @@ -1281,13 +1306,11 @@ where S: ShareChain .unwrap_or(MAX_CATCH_UP_ATTEMPTS) < MAX_CATCH_UP_ATTEMPTS { - dbg!("Must continue sync"); - dbg!(their_tip_hash); self.network_peer_store.add_catch_up_attempt(&peer); match self.perform_catch_up_sync(algo, peer, last_block_from_them).await { Ok(_) => {}, Err(error) => { - error!(target: LOG_TARGET, squad = &self.config.squad; "Failed to perform catch up sync: {error:?}"); + error!(target: LOG_TARGET, squad = &self.config.squad; "Catchup synced blocks added Failed to perform catch up sync: {error:?}"); }, } } else { @@ -1296,11 +1319,12 @@ where S: ShareChain }, Err(error) => match error { crate::sharechain::error::Error::BlockParentDoesNotExist { missing_parents } => { - self.sync_share_chain(algo, peer, missing_parents, false).await; + info!(target: LOG_TARGET, squad = &self.config.squad; "catchup sync Reporting missing blocks {}", missing_parents.len()); + self.sync_share_chain(algo, &peer, missing_parents, false).await; return; }, _ => { - error!(target: LOG_TARGET, squad = &self.config.squad; "Failed to add synced blocks to share chain: {error:?}"); + error!(target: LOG_TARGET, squad = &self.config.squad; "Failed to add Catchup synced blocks to share chain: {error:?}"); self.network_peer_store .move_to_grey_list(peer, format!("Block failed validation: {}", error)) .await; @@ -1672,6 +1696,11 @@ where S: ShareChain }; // info!(target: LOG_TARGET, squad = &self.config.squad; "Best peers to sync: {best_peers:?}"); + if best_peers.is_empty() { + info!(target: LOG_TARGET, squad = &self.config.squad; "No peers found to try and sync to"); + } else { + info!(target: LOG_TARGET, squad = &self.config.squad; "Found {} peers to try and sync to", best_peers.len()); + } for record in best_peers { let (their_height, their_pow) = match algo { @@ -1685,7 +1714,7 @@ where S: ShareChain ), }; if their_pow > our_pow { - info!(target: LOG_TARGET, squad = &self.config.squad; "[{:?}] Trying to sync from peer: {} with height{}", algo,record.peer_id, their_height); + info!(target: LOG_TARGET, squad = &self.config.squad; "[{:?}] Trying to perform catchup sync from peer: {} with height{}", algo,record.peer_id, their_height); let _ = self.perform_catch_up_sync(*algo, record.peer_id, None).await.inspect_err(|e| warn!(target: LOG_TARGET, squad = &self.config.squad; "Failed to perform catch up sync: {}", e) @@ -1722,7 +1751,7 @@ where S: ShareChain .unwrap(); } let formatter = human_format::Formatter::new(); - let blocks = chain.all_blocks(None, 0, 10000, false).await.expect("errored"); + let blocks = chain.all_blocks(None, 10000, false).await.expect("errored"); for b in blocks { file.write_all( format!( diff --git a/src/server/p2p/peer_store.rs b/src/server/p2p/peer_store.rs index 052d034b..e0ec619a 100644 --- a/src/server/p2p/peer_store.rs +++ b/src/server/p2p/peer_store.rs @@ -137,7 +137,7 @@ impl PeerStore { pub fn best_peers_to_sync(&self, count: usize, algo: PowAlgorithm) -> Vec { let mut peers = self.whitelist_peers.values().collect::>(); - // ignore all peers records that are older than 30 minutes + // ignore all peers records that are older than 1 minutes let timestamp = EpochTime::now().as_u64() - 60 * 1; peers.retain(|peer| peer.peer_info.timestamp > timestamp); match algo { diff --git a/src/server/server.rs b/src/server/server.rs index 43368c7a..aa7ff2a1 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -81,6 +81,7 @@ where S: ShareChain ) .await .map_err(Error::P2PService)?; + let local_peer_id = p2p_service.local_peer_id(); let mut base_node_grpc_server = None; let mut p2pool_server = None; @@ -95,6 +96,7 @@ where S: ShareChain base_node_grpc_server = Some(BaseNodeServer::new(base_node_grpc_service)); let p2pool_grpc_service = ShaP2PoolGrpc::new( + local_peer_id, config.base_node_address.clone(), p2p_service.client(), share_chain_sha3x.clone(), diff --git a/src/sharechain/in_memory.rs b/src/sharechain/in_memory.rs index 4c28fc55..cfed015d 100644 --- a/src/sharechain/in_memory.rs +++ b/src/sharechain/in_memory.rs @@ -424,6 +424,21 @@ impl ShareChain for InMemoryShareChain { } } + async fn get_tip_and_uncles(&self) -> Vec<(u64, FixedHash)> { + let mut res = Vec::new(); + let bl = self.p2_chain.read().await; + let tip_level = bl.get_tip(); + if let Some(tip_level) = tip_level { + res.push((tip_level.height, tip_level.chain_block)); + tip_level.block_in_main_chain().inspect(|block| { + for uncle in block.uncles.iter() { + res.push((uncle.0, uncle.1.clone())); + } + }); + } + res + } + async fn generate_shares(&self, new_tip_block: &P2Block) -> Result, Error> { let mut chain_read_lock = self.p2_chain.read().await; // first check if there is a cached hashmap of shares @@ -595,7 +610,6 @@ impl ShareChain for InMemoryShareChain { // Assume their blocks are in order highest first. let mut split_height = 0; - // let mut split_found = false; if let Some(last_block_received) = last_block_received { if let Some(level) = p2_chain_read.level_at_height(last_block_received.0) { @@ -603,7 +617,6 @@ impl ShareChain for InMemoryShareChain { // Only split if the block is in the main chain if level.chain_block == block.hash { split_height = block.height; - // split_found = true; } } } @@ -628,7 +641,7 @@ impl ShareChain for InMemoryShareChain { } } - self.all_blocks(Some(cmp::max(split_height, split_height2)), 0, limit, true) + self.all_blocks(Some(cmp::max(split_height, split_height2)), limit, true) .await } @@ -731,41 +744,29 @@ impl ShareChain for InMemoryShareChain { async fn all_blocks( &self, start_height: Option, - page: usize, page_size: usize, main_chain_only: bool, ) -> Result>, Error> { let chain_read_lock = self.p2_chain.read().await; let mut res = Vec::with_capacity(page_size); - let _skip = page * page_size; - let _num_skipped = 0; let mut num_main_chain_blocks = 0; - // TODO: This is very inefficient, we should refactor this. - for level in chain_read_lock.levels.iter().rev() { - if level.height < start_height.unwrap_or(0) { - continue; + let mut level = if let Some(level) = chain_read_lock.level_at_height(start_height.unwrap_or(0)) { + level + } else { + // we dont have that block, see if twe have a higher lowest block than they are asking for and start there + if start_height.unwrap_or(0) < chain_read_lock.levels.front().map(|l| l.height).unwrap_or(0) { + chain_read_lock.levels.front().unwrap() + } else { + return Ok(res); } + }; - // // First add the block in the main chain - // if let Some(block) = level.block_in_main_chain() { - // if num_skipped < skip { - // num_skipped += 1; - // continue; - // } - // res.push(block.clone()); - // if res.len() >= page_size { - // return Ok(res); - // } - - // } - + loop { for block in level.blocks.values() { if block.hash == level.chain_block { num_main_chain_blocks += 1; if main_chain_only { for uncle in &block.uncles { - // num_skipped += 1; - // Always include all the uncles, if we have them if let Some(uncle_block) = level.blocks.get(&uncle.1) { // Uncles should never exist in the main chain, so we don't need to worry about @@ -783,11 +784,12 @@ impl ShareChain for InMemoryShareChain { if res.len() >= page_size && (!main_chain_only || num_main_chain_blocks >= 2) { return Ok(res); } - // if num_skipped < skip { - // num_skipped += 1; - // continue; - // } } + level = if let Some(new_level) = chain_read_lock.level_at_height(level.height + 1) { + new_level + } else { + break; + }; } Ok(res) } diff --git a/src/sharechain/mod.rs b/src/sharechain/mod.rs index 09cac9df..08a81f01 100644 --- a/src/sharechain/mod.rs +++ b/src/sharechain/mod.rs @@ -110,6 +110,8 @@ pub(crate) trait ShareChain: Send + Sync + 'static { /// Returns the tip of the chain. async fn get_tip(&self) -> Result, Error>; + async fn get_tip_and_uncles(&self) -> Vec<(u64, FixedHash)>; + /// Generate shares based on the previous blocks. async fn generate_shares(&self, new_tip_block: &P2Block) -> Result, Error>; @@ -146,7 +148,6 @@ pub(crate) trait ShareChain: Send + Sync + 'static { async fn all_blocks( &self, start_height: Option, - page: usize, page_size: usize, main_chain_only: bool, ) -> Result>, Error>; diff --git a/src/sharechain/p2chain.rs b/src/sharechain/p2chain.rs index b9df7242..5e673eab 100644 --- a/src/sharechain/p2chain.rs +++ b/src/sharechain/p2chain.rs @@ -748,7 +748,6 @@ impl P2Chain { #[cfg(test)] mod test { - use digest::crypto_common::rand_core::block; use tari_common_types::types::BlockHash; use tari_core::{ blocks::{Block, BlockHeader}, @@ -1503,7 +1502,7 @@ mod test { // the tip is not set to the new block, because the uncle is missing. let mut chain = P2Chain::new_empty(10, 5); - let mut prev_hash = BlockHash::zero(); + let prev_hash = BlockHash::zero(); let block1 = P2Block::builder() .with_height(0)