Skip to content

Commit

Permalink
feat: keep track of previous sync tips (#214)
Browse files Browse the repository at this point in the history
  • Loading branch information
stringhandler authored Dec 6, 2024
1 parent 17bbedf commit a597dfc
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 18 deletions.
34 changes: 18 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ tari_utilities = { version = "0.8", features = ["borsh"] }
thiserror = "1.0"
tokio = { version = "1.41.0", features = ["full"] }
tonic = "0.12.3"
lru = "0.12.5"

[package.metadata.cargo-machete]
ignored = ["log4rs"]
40 changes: 39 additions & 1 deletion src/server/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
hash::Hash,
io::Write,
net::IpAddr,
num::NonZeroUsize,
path::PathBuf,
str::FromStr,
sync::{atomic::AtomicBool, Arc},
Expand Down Expand Up @@ -52,6 +53,7 @@ use log::{
trace,
warn,
};
use lru::LruCache;
use serde::{Deserialize, Serialize};
use tari_common::configuration::Network;
use tari_common_types::types::FixedHash;
Expand Down Expand Up @@ -168,6 +170,7 @@ pub(crate) struct Config {
pub sha3x_enabled: bool,
pub randomx_enabled: bool,
pub num_concurrent_syncs: usize,
pub num_sync_tips_to_keep: usize,
}

impl Default for Config {
Expand All @@ -193,6 +196,7 @@ impl Default for Config {
sha3x_enabled: true,
randomx_enabled: true,
num_concurrent_syncs: 1,
num_sync_tips_to_keep: 10,
}
}
}
Expand Down Expand Up @@ -285,6 +289,9 @@ enum InnerRequest {

/// Service is the implementation that holds every peer-to-peer related logic
/// that makes sure that all the communications, syncing, broadcasting etc... are done.
// Allow type complexity for now. It is caused by the hashmap of arc of rwlock of lru cache
// it should be removed in future.
#[allow(clippy::type_complexity)]
pub struct Service<S>
where S: ShareChain
{
Expand Down Expand Up @@ -314,6 +321,7 @@ where S: ShareChain
sha3x_last_sync_requested_block: Option<(u64, FixedHash)>,
randomx_in_progress_syncs: HashMap<PeerId, (OutboundRequestId, OwnedSemaphorePermit)>,
sha3x_in_progress_syncs: HashMap<PeerId, (OutboundRequestId, OwnedSemaphorePermit)>,
recent_synced_tips: HashMap<PowAlgorithm, Arc<RwLock<LruCache<PeerId, (u64, FixedHash)>>>>,
}

impl<S> Service<S>
Expand All @@ -340,6 +348,19 @@ where S: ShareChain
// This should not be unbounded but we need to find out what is using up all the permits
let (inner_request_tx, inner_request_rx) = mpsc::unbounded_channel();

let mut recent_synced_tips = HashMap::new();
recent_synced_tips.insert(
PowAlgorithm::RandomX,
Arc::new(RwLock::new(LruCache::new(
NonZeroUsize::new(config.p2p_service.num_sync_tips_to_keep).unwrap(),
))),
);
recent_synced_tips.insert(
PowAlgorithm::Sha3x,
Arc::new(RwLock::new(LruCache::new(
NonZeroUsize::new(config.p2p_service.num_sync_tips_to_keep).unwrap(),
))),
);
Ok(Self {
swarm,
port: config.p2p_port,
Expand All @@ -363,6 +384,7 @@ where S: ShareChain
sha3x_last_sync_requested_block: None,
randomx_in_progress_syncs: HashMap::new(),
sha3x_in_progress_syncs: HashMap::new(),
recent_synced_tips,
})
}

Expand Down Expand Up @@ -1509,6 +1531,7 @@ where S: ShareChain
None
}

#[allow(clippy::too_many_lines)]
async fn handle_catch_up_sync_response(
&mut self,
response: CatchUpSyncResponse,
Expand Down Expand Up @@ -1545,6 +1568,7 @@ where S: ShareChain
let squad = self.config.squad.clone();
let network_peer_store = self.network_peer_store.clone();
let synced_bool = self.are_we_synced_with_p2pool.clone();
let recent_synced_tips = self.recent_synced_tips.get(&algo).cloned().unwrap();

tokio::spawn(async move {
blocks.sort_by(|a, b| a.height.cmp(&b.height));
Expand All @@ -1567,6 +1591,13 @@ where S: ShareChain
},
}
}
{
if let Some(ref last_block) = last_block_from_them {
let mut lock = recent_synced_tips.write().await;
lock.put(peer, *last_block);
}
}

info!(target: LOG_TARGET, "[{:?}] Blocks via catchup sync added {:?}", algo, blocks_added);
info!(target: LOG_TARGET, "[{:?}] Blocks via catchup sync result {}", algo, new_tip);
let missing_parents = new_tip.into_missing_parents_vec();
Expand Down Expand Up @@ -1712,7 +1743,7 @@ where S: ShareChain

let permit = permit.unwrap();

let (i_have_blocks, last_block_from_them) = match (last_block_from_them, last_progress) {
let (mut i_have_blocks, last_block_from_them) = match (last_block_from_them, last_progress) {
(None, Some(last_progress)) => {
// this is most likely a new catchup sync request, while the previous attempt failed so we ask with both
// I have blocks and last block
Expand All @@ -1739,6 +1770,13 @@ where S: ShareChain
},
};

{
let lock = self.recent_synced_tips.get(&algo).unwrap().read().await;
for item in lock.iter() {
i_have_blocks.insert(0, *item.1);
}
}

info!(target: SYNC_REQUEST_LOG_TARGET, "[{:?}] Sending catch up sync to {} for blocks {}, last block received {}. Their height:{}",
algo,
peer, i_have_blocks.iter().map(|a| a.0.to_string()).join(", "), last_block_from_them.map(|a| a.0.to_string()).unwrap_or_else(|| "None".to_string()), their_height);
Expand Down
5 changes: 4 additions & 1 deletion src/sharechain/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,10 @@ impl InMemoryShareChain {

info!(target: LOG_TARGET, "[{:?}] ✅ Block already added: {}:{}, verified: {}", self.pow_algo, block.height, &block.hash.to_hex()[0..8], block_in_chain.verified);

return Ok(ChainAddResult::default());
if block_in_chain.verified {
return Ok(ChainAddResult::default());
}
info!(target: LOG_TARGET, "[{:?}] ❌ Block already added, but not verified: {}:{}, verifying...", self.pow_algo, block.height, &block.hash.to_hex()[0..8]);
}
}

Expand Down
1 change: 1 addition & 0 deletions src/sharechain/p2chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ impl P2Chain {
new_block_height: u64,
hash: FixedHash,
) -> Result<(ChainAddResult, Vec<(u64, FixedHash)>), ShareChainError> {
info!(target: LOG_TARGET, "Trying to verify new block to add: {}:{}", new_block_height, &hash.to_hex()[0..8]);
// we should validate what we can if a block is invalid, we should delete it.
let mut new_tip = ChainAddResult::default();
let block = self
Expand Down

0 comments on commit a597dfc

Please sign in to comment.