diff --git a/Cargo.lock b/Cargo.lock index b1e9d96..d8fdbc0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -52,18 +52,6 @@ dependencies = [ "subtle", ] -[[package]] -name = "ahash" -version = "0.8.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" -dependencies = [ - "cfg-if", - "once_cell", - "version_check", - "zerocopy", -] - [[package]] name = "aho-corasick" version = "1.1.3" @@ -1373,6 +1361,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f81ec6369c545a7d40e4589b5597581fa1c441fe1cce96dd1de43159910a36a2" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -1653,9 +1647,16 @@ name = "hashbrown" version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + +[[package]] +name = "hashbrown" +version = "0.15.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" dependencies = [ - "ahash", "allocator-api2", + "equivalent", + "foldhash", ] [[package]] @@ -2926,11 +2927,11 @@ dependencies = [ [[package]] name = "lru" -version = "0.12.4" +version = "0.12.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37ee39891760e7d94734f6f63fedc29a2e4a152f836120753a72503f09fcf904" +checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" dependencies = [ - "hashbrown 0.14.5", + "hashbrown 0.15.2", ] [[package]] @@ -4617,6 +4618,7 @@ dependencies = [ "libp2p", "log", "log4rs", + "lru", "minotari_app_grpc", "minotari_node_grpc_client", "num", diff --git a/Cargo.toml b/Cargo.toml index 6a08a87..7d78623 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,6 +61,7 @@ tari_utilities = { version = "0.8", features = ["borsh"] } thiserror = "1.0" tokio = { version = "1.41.0", features = ["full"] } tonic = "0.12.3" +lru = "0.12.5" [package.metadata.cargo-machete] ignored = ["log4rs"] diff --git a/src/server/p2p/network.rs b/src/server/p2p/network.rs index cf071f8..c1bb6db 100644 --- a/src/server/p2p/network.rs +++ b/src/server/p2p/network.rs @@ -8,6 +8,7 @@ use std::{ hash::Hash, io::Write, net::IpAddr, + num::NonZeroUsize, path::PathBuf, str::FromStr, sync::{atomic::AtomicBool, Arc}, @@ -52,6 +53,7 @@ use log::{ trace, warn, }; +use lru::LruCache; use serde::{Deserialize, Serialize}; use tari_common::configuration::Network; use tari_common_types::types::FixedHash; @@ -168,6 +170,7 @@ pub(crate) struct Config { pub sha3x_enabled: bool, pub randomx_enabled: bool, pub num_concurrent_syncs: usize, + pub num_sync_tips_to_keep: usize, } impl Default for Config { @@ -193,6 +196,7 @@ impl Default for Config { sha3x_enabled: true, randomx_enabled: true, num_concurrent_syncs: 1, + num_sync_tips_to_keep: 10, } } } @@ -285,6 +289,9 @@ enum InnerRequest { /// Service is the implementation that holds every peer-to-peer related logic /// that makes sure that all the communications, syncing, broadcasting etc... are done. +// Allow type complexity for now. It is caused by the hashmap of arc of rwlock of lru cache +// it should be removed in future. +#[allow(clippy::type_complexity)] pub struct Service where S: ShareChain { @@ -314,6 +321,7 @@ where S: ShareChain sha3x_last_sync_requested_block: Option<(u64, FixedHash)>, randomx_in_progress_syncs: HashMap, sha3x_in_progress_syncs: HashMap, + recent_synced_tips: HashMap>>>, } impl Service @@ -340,6 +348,19 @@ where S: ShareChain // This should not be unbounded but we need to find out what is using up all the permits let (inner_request_tx, inner_request_rx) = mpsc::unbounded_channel(); + let mut recent_synced_tips = HashMap::new(); + recent_synced_tips.insert( + PowAlgorithm::RandomX, + Arc::new(RwLock::new(LruCache::new( + NonZeroUsize::new(config.p2p_service.num_sync_tips_to_keep).unwrap(), + ))), + ); + recent_synced_tips.insert( + PowAlgorithm::Sha3x, + Arc::new(RwLock::new(LruCache::new( + NonZeroUsize::new(config.p2p_service.num_sync_tips_to_keep).unwrap(), + ))), + ); Ok(Self { swarm, port: config.p2p_port, @@ -363,6 +384,7 @@ where S: ShareChain sha3x_last_sync_requested_block: None, randomx_in_progress_syncs: HashMap::new(), sha3x_in_progress_syncs: HashMap::new(), + recent_synced_tips, }) } @@ -1509,6 +1531,7 @@ where S: ShareChain None } + #[allow(clippy::too_many_lines)] async fn handle_catch_up_sync_response( &mut self, response: CatchUpSyncResponse, @@ -1545,6 +1568,7 @@ where S: ShareChain let squad = self.config.squad.clone(); let network_peer_store = self.network_peer_store.clone(); let synced_bool = self.are_we_synced_with_p2pool.clone(); + let recent_synced_tips = self.recent_synced_tips.get(&algo).cloned().unwrap(); tokio::spawn(async move { blocks.sort_by(|a, b| a.height.cmp(&b.height)); @@ -1567,6 +1591,13 @@ where S: ShareChain }, } } + { + if let Some(ref last_block) = last_block_from_them { + let mut lock = recent_synced_tips.write().await; + lock.put(peer, *last_block); + } + } + info!(target: LOG_TARGET, "[{:?}] Blocks via catchup sync added {:?}", algo, blocks_added); info!(target: LOG_TARGET, "[{:?}] Blocks via catchup sync result {}", algo, new_tip); let missing_parents = new_tip.into_missing_parents_vec(); @@ -1712,7 +1743,7 @@ where S: ShareChain let permit = permit.unwrap(); - let (i_have_blocks, last_block_from_them) = match (last_block_from_them, last_progress) { + let (mut i_have_blocks, last_block_from_them) = match (last_block_from_them, last_progress) { (None, Some(last_progress)) => { // this is most likely a new catchup sync request, while the previous attempt failed so we ask with both // I have blocks and last block @@ -1739,6 +1770,13 @@ where S: ShareChain }, }; + { + let lock = self.recent_synced_tips.get(&algo).unwrap().read().await; + for item in lock.iter() { + i_have_blocks.insert(0, *item.1); + } + } + info!(target: SYNC_REQUEST_LOG_TARGET, "[{:?}] Sending catch up sync to {} for blocks {}, last block received {}. Their height:{}", algo, peer, i_have_blocks.iter().map(|a| a.0.to_string()).join(", "), last_block_from_them.map(|a| a.0.to_string()).unwrap_or_else(|| "None".to_string()), their_height); diff --git a/src/sharechain/in_memory.rs b/src/sharechain/in_memory.rs index 2415d81..3bc0330 100644 --- a/src/sharechain/in_memory.rs +++ b/src/sharechain/in_memory.rs @@ -185,7 +185,10 @@ impl InMemoryShareChain { info!(target: LOG_TARGET, "[{:?}] ✅ Block already added: {}:{}, verified: {}", self.pow_algo, block.height, &block.hash.to_hex()[0..8], block_in_chain.verified); - return Ok(ChainAddResult::default()); + if block_in_chain.verified { + return Ok(ChainAddResult::default()); + } + info!(target: LOG_TARGET, "[{:?}] ❌ Block already added, but not verified: {}:{}, verifying...", self.pow_algo, block.height, &block.hash.to_hex()[0..8]); } } diff --git a/src/sharechain/p2chain.rs b/src/sharechain/p2chain.rs index 10691f2..3ce423e 100644 --- a/src/sharechain/p2chain.rs +++ b/src/sharechain/p2chain.rs @@ -265,6 +265,7 @@ impl P2Chain { new_block_height: u64, hash: FixedHash, ) -> Result<(ChainAddResult, Vec<(u64, FixedHash)>), ShareChainError> { + info!(target: LOG_TARGET, "Trying to verify new block to add: {}:{}", new_block_height, &hash.to_hex()[0..8]); // we should validate what we can if a block is invalid, we should delete it. let mut new_tip = ChainAddResult::default(); let block = self