From 463a682deec6fa6ed2f26220c1bae23c15ac4de4 Mon Sep 17 00:00:00 2001 From: stringhandler Date: Fri, 1 Nov 2024 17:03:18 +0200 Subject: [PATCH] fix: ignore old peer messages (#132) --- log4rs_sample.yml | 42 ++++++++++ src/server/p2p/messages.rs | 3 + src/server/p2p/network.rs | 154 +++++++++++++++++++++++++------------ 3 files changed, 149 insertions(+), 50 deletions(-) diff --git a/log4rs_sample.yml b/log4rs_sample.yml index cec45e8..3c5cc49 100644 --- a/log4rs_sample.yml +++ b/log4rs_sample.yml @@ -119,6 +119,38 @@ appenders: encoder: pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} {l:5} {m} // {f}:{L}{n}" + peer_info: + kind: rolling_file + path: "{{log_dir}}/log/peer_info.log" + policy: + kind: compound + trigger: + kind: size + limit: 10mb + roller: + kind: fixed_window + base: 1 + count: 5 + pattern: "{{log_dir}}/log/peer_info.{}.log" + encoder: + pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} {l:5} {m} // {f}:{L}{n}" + + new_tip_notify: + kind: rolling_file + path: "{{log_dir}}/log/new_tip_notify.log" + policy: + kind: compound + trigger: + kind: size + limit: 10mb + roller: + kind: fixed_window + base: 1 + count: 5 + pattern: "{{log_dir}}/log/new_tip_notify.{}.log" + encoder: + pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} {l:5} {m} // {f}:{L}{n}" + # Set the default logging level to "warn" and attach the "stdout" appender to the root root: level: info @@ -145,6 +177,16 @@ loggers: - stdout - peers additive: false + tari::p2pool::topics::peer_info: + level: debug + appenders: + - peer_info + additive: false + tari::p2pool::topics::new_tip_notify: + level: debug + appenders: + - new_tip_notify + additive: false tari::p2pool::server::p2p: level: info appenders: diff --git a/src/server/p2p/messages.rs b/src/server/p2p/messages.rs index 3abd277..3deda9e 100644 --- a/src/server/p2p/messages.rs +++ b/src/server/p2p/messages.rs @@ -47,6 +47,7 @@ where T: Serialize { #[derive(Serialize, Deserialize, Debug, Clone)] pub struct PeerInfo { pub version: u64, + pub peer_id: Option, pub current_sha3x_height: u64, pub current_random_x_height: u64, pub current_sha3x_pow: u128, @@ -60,6 +61,7 @@ pub struct PeerInfo { impl_conversions!(PeerInfo); impl PeerInfo { pub fn new( + peer_id: PeerId, current_sha3x_height: u64, current_random_x_height: u64, current_sha3x_pow: u128, @@ -71,6 +73,7 @@ impl PeerInfo { let timestamp = EpochTime::now(); Self { version: PROTOCOL_VERSION, + peer_id: Some(peer_id), current_sha3x_height, current_random_x_height, current_sha3x_pow, diff --git a/src/server/p2p/network.rs b/src/server/p2p/network.rs index 962f319..4f939bc 100644 --- a/src/server/p2p/network.rs +++ b/src/server/p2p/network.rs @@ -105,15 +105,16 @@ const CATCH_UP_SYNC_REQUEST_RESPONSE_PROTOCOL: &str = "/catch_up_sync/4"; const LOG_TARGET: &str = "tari::p2pool::server::p2p"; const SYNC_REQUEST_LOG_TARGET: &str = "sync_request"; const MESSAGE_LOGGING_LOG_TARGET: &str = "tari::p2pool::message_logging"; +const PEER_INFO_LOGGING_LOG_TARGET: &str = "tari::p2pool::topics::peer_info"; +const NEW_TIP_NOTIFY_LOGGING_LOG_TARGET: &str = "tari::p2pool::topics::new_tip_notify"; pub const STABLE_PRIVATE_KEY_FILE: &str = "p2pool_private.key"; const MAX_ACCEPTABLE_P2P_MESSAGE_TIMEOUT: Duration = Duration::from_millis(500); -const SYNC_TIMEOUT: Duration = Duration::from_secs(60); const MAX_ACCEPTABLE_NETWORK_EVENT_TIMEOUT: Duration = Duration::from_millis(100); const CATCH_UP_SYNC_BLOCKS_IN_I_HAVE: usize = 100; const MAX_CATCH_UP_ATTEMPTS: usize = 150; // Time to start up and catch up before we start processing new tip messages -const STARTUP_CATCH_UP_TIME: Duration = Duration::from_secs(120); +const STARTUP_CATCH_UP_TIME: Duration = Duration::from_secs(1); #[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Default)] pub struct Squad { @@ -415,14 +416,14 @@ where S: ShareChain StreamProtocol::new(SHARE_CHAIN_SYNC_REQ_RESP_PROTOCOL), request_response::ProtocolSupport::Full, )], - request_response::Config::default().with_request_timeout(Duration::from_secs(30)), // 10 is the default + request_response::Config::default().with_request_timeout(Duration::from_secs(10)), // 10 is the default ), direct_peer_exchange: cbor::Behaviour::::new( [( StreamProtocol::new(DIRECT_PEER_EXCHANGE_REQ_RESP_PROTOCOL), request_response::ProtocolSupport::Full, )], - request_response::Config::default().with_request_timeout(Duration::from_secs(30)), // 10 is the default + request_response::Config::default().with_request_timeout(Duration::from_secs(10)), // 10 is the default ), catch_up_sync: cbor::Behaviour::::new( [( @@ -499,6 +500,7 @@ where S: ShareChain let current_pow_sha3x = share_chain_sha3x.chain_pow().await.as_u128(); let current_pow_random_x = share_chain_random_x.chain_pow().await.as_u128(); let peer_info_squad_raw = PeerInfo::new( + self.swarm.local_peer_id().clone(), current_height_sha3x, current_height_random_x, current_pow_sha3x, @@ -596,47 +598,74 @@ where S: ShareChain #[allow(clippy::too_many_lines)] async fn handle_new_gossipsub_message(&mut self, message: Message) -> Result { debug!(target: MESSAGE_LOGGING_LOG_TARGET, "New gossipsub message: {message:?}"); - let peer = message.source; - if let Some(peer) = peer { + let source_peer = message.source; + if let Some(source_peer) = source_peer { let topic = message.topic.to_string(); match topic { - topic if topic == Self::network_topic(PEER_INFO_TOPIC) => match messages::PeerInfo::try_from(message) { - Ok(payload) => { - debug!(target: MESSAGE_LOGGING_LOG_TARGET, "[PEERINFO_TOPIC] New peer info: {peer:?} -> {payload:?}"); - debug!(target: LOG_TARGET, squad = &self.config.squad; "[NETWORK] New peer info: {peer:?} -> {payload:?}"); - if payload.version != PROTOCOL_VERSION { - trace!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} has an outdated version, skipping", peer); - return Ok(MessageAcceptance::Reject); - } - if payload.squad != self.config.squad.as_string() { - debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} is not in the same squad, skipping. Our squad: {}, their squad:{}", peer, self.config.squad, payload.squad); - return Ok(MessageAcceptance::Ignore); - } - if !self.config.is_seed_peer { - if self.add_peer(payload, peer.clone()).await { - self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer); - } - } - return Ok(MessageAcceptance::Accept); - }, - Err(error) => { - debug!(target: LOG_TARGET, squad = &self.config.squad; "Can't deserialize peer info payload: {:?}", error); - return Ok(MessageAcceptance::Reject); - }, - }, + // topic if topic == Self::network_topic(PEER_INFO_TOPIC) => match messages::PeerInfo::try_from(message) + // { Ok(payload) => { + // debug!(target: PEER_INFO_LOGGING_LOG_TARGET, "[PEERINFO_TOPIC] New peer info: {source_peer:?} + // -> {payload:?}"); debug!(target: LOG_TARGET, squad = &self.config.squad; + // "[NETWORK] New peer info: {source_peer:?} -> {payload:?}"); if + // payload.version != PROTOCOL_VERSION { trace!(target: LOG_TARGET, squad = + // &self.config.squad; "Peer {} has an outdated version, skipping", source_peer); + // return Ok(MessageAcceptance::Reject); + // } + // if payload.squad != self.config.squad.as_string() { + // debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} is not in the same squad, + // skipping. Our squad: {}, their squad:{}", source_peer, self.config.squad, payload.squad); + // return Ok(MessageAcceptance::Ignore); + // } + + // if payload.peer_id.as_ref() == Some(self.swarm.local_peer_id()) { + // return Ok(MessageAcceptance::Ignore); + // } + + // if payload.peer_id != Some(source_peer) { + // warn!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a peer info message + // with a different peer id: {}, skipping", source_peer, payload.peer_id.as_ref().map(|p| + // p.to_string()).unwrap_or("None".to_string())); // return + // Ok(MessageAcceptance::Ignore); } + // if !self.config.is_seed_peer { + // if self.add_peer(payload, source_peer.clone()).await { + // self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&source_peer); + // } + // } + // return Ok(MessageAcceptance::Accept); + // }, + // Err(error) => { + // debug!(target: LOG_TARGET, squad = &self.config.squad; "Can't deserialize peer info payload: + // {:?}", error); return Ok(MessageAcceptance::Reject); + // }, + // }, topic if topic == Self::squad_topic(&self.config.squad, PEER_INFO_TOPIC) => { match messages::PeerInfo::try_from(message) { Ok(payload) => { - debug!(target: MESSAGE_LOGGING_LOG_TARGET, "[SQUAD_PEERINFO_TOPIC] New peer info: {peer:?} -> {payload:?}"); - - debug!(target: LOG_TARGET, squad = &self.config.squad; "[squad] New peer info: {peer:?} -> {payload:?}"); + debug!(target: LOG_TARGET, squad = &self.config.squad; "[squad] New peer info: {source_peer:?} -> {payload:?}"); if payload.version != PROTOCOL_VERSION { - debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} has an outdated version, skipping", peer); + debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} has an outdated version, skipping", source_peer); return Ok(MessageAcceptance::Reject); } + + // 60 seconds. TODO: make config + if payload.timestamp < EpochTime::now().as_u64().saturating_sub(60) { + debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a peer info message that is too old, skipping", source_peer); + // TODO: should be punish + return Ok(MessageAcceptance::Ignore); + } + if payload.peer_id.as_ref() == Some(self.swarm.local_peer_id()) { + return Ok(MessageAcceptance::Ignore); + } + + if payload.peer_id != Some(source_peer) { + warn!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a peer info message with a different peer id: {}, skipping", source_peer, payload.peer_id.as_ref().map(|p| p.to_string()).unwrap_or("None".to_string())); + // return Ok(MessageAcceptance::Ignore); + } + debug!(target: PEER_INFO_LOGGING_LOG_TARGET, "[SQUAD_PEERINFO_TOPIC] New peer info: {source_peer:?} -> {payload:?}"); + if !self.config.is_seed_peer { - if self.add_peer(payload, peer.clone()).await { - self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer); + if self.add_peer(payload, source_peer.clone()).await { + self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&source_peer); } } return Ok(MessageAcceptance::Accept); @@ -651,33 +680,35 @@ where S: ShareChain // TODO: (sender peer's wallet address should be included always in the conibases with a fixed percent // (like 20%)) topic if topic == Self::squad_topic(&self.config.squad, BLOCK_NOTIFY_TOPIC) => { - debug!(target: MESSAGE_LOGGING_LOG_TARGET, "[SQUAD_NEW_BLOCK_TOPIC] New block from gossip: {peer:?}"); - // if self.sync_in_progress.load(Ordering::SeqCst) { // return; // } match NotifyNewTipBlock::try_from(message) { Ok(payload) => { if payload.version != PROTOCOL_VERSION { - debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} has an outdated version, skipping", peer); + debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} has an outdated version, skipping", source_peer); return Ok(MessageAcceptance::Reject); } // lets check age - if payload.timestamp < EpochTime::now().as_u64().saturating_sub(60) { - debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a notify message that is too old, skipping", peer); + if payload.timestamp < EpochTime::now().as_u64().saturating_sub(10) { + debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a notify message that is too old, skipping", source_peer); return Ok(MessageAcceptance::Ignore); } let payload = Arc::new(payload); - debug!(target: MESSAGE_LOGGING_LOG_TARGET, "[SQUAD_NEW_BLOCK_TOPIC] New block from gossip: {peer:?} -> {payload:?}"); - let source_peer = payload.peer_id(); + let message_peer = payload.peer_id(); + if message_peer.to_string() != source_peer.to_string() { + warn!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a block with a different peer id: {}, skipping", source_peer, message_peer); + } + debug!(target: NEW_TIP_NOTIFY_LOGGING_LOG_TARGET, "[SQUAD_NEW_BLOCK_TOPIC] New block from gossip: {source_peer:?} -> {payload:?}"); + // If we don't have this peer, try do peer exchange - if !self.network_peer_store.exists(source_peer) { - self.initiate_direct_peer_exchange(source_peer).await; + if !self.network_peer_store.exists(message_peer) { + self.initiate_direct_peer_exchange(message_peer).await; } // verify payload if payload.new_blocks.is_empty() { - warn!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent notify new tip with no blocks.", source_peer); + warn!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent notify new tip with no blocks.", message_peer); return Ok(MessageAcceptance::Reject); } @@ -694,7 +725,14 @@ where S: ShareChain payload.new_blocks.iter().map(|(h, _)| *h).max().unwrap_or(0) <= our_tip.saturating_sub(4) { - debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a block that is not better than ours, skipping", source_peer); + debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a block that is not better than ours, skipping", message_peer); + return Ok(MessageAcceptance::Ignore); + } + + // if payload. + if payload.new_blocks.iter().map(|(h, _)| *h).max().unwrap_or(0) > our_tip.saturating_add(2) + { + debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a block that is too far ahead, skipping", message_peer); return Ok(MessageAcceptance::Ignore); } @@ -706,7 +744,7 @@ where S: ShareChain missing_blocks.push(block.clone()); } if !missing_blocks.is_empty() { - self.sync_share_chain(algo, source_peer, missing_blocks, true).await; + self.sync_share_chain(algo, message_peer, missing_blocks, true).await; } return Ok(MessageAcceptance::Accept); }, @@ -715,7 +753,7 @@ where S: ShareChain debug!(target: LOG_TARGET, squad = &self.config.squad; "Can't deserialize broadcast block payload: {:?}", error); self.network_peer_store .move_to_grey_list( - peer, + source_peer.clone(), format!("Node sent a block that could not be deserialized: {:?}", error), ) .await; @@ -773,6 +811,10 @@ where S: ShareChain }) { let local_peer_id = self.swarm.local_peer_id().clone(); + if peer == &local_peer_id { + return; + } + // TODO: Should we send them our details? The problem is that if we send too many of these, libp2p // starts dropping requests with "libp2p_relay::priv_client::handler Dropping in-flight connect // request because we are at capacity" @@ -1687,7 +1729,7 @@ where S: ShareChain // dbg!(&missing_blocks); - let best_peers = self + let mut best_peers = self .network_peer_store .best_peers_to_sync(self.config.num_peers_to_sync, *algo); let our_pow = match algo { @@ -1695,6 +1737,18 @@ where S: ShareChain PowAlgorithm::Sha3x => self.share_chain_sha3x.chain_pow().await, }; + let min_catch_up_difficulty = 1000; + best_peers.retain(|x| match algo { + PowAlgorithm::RandomX => { + let their_pow = x.peer_info.current_random_x_pow; + their_pow > min_catch_up_difficulty + }, + PowAlgorithm::Sha3x => { + let their_pow = x.peer_info.current_sha3x_pow; + their_pow > min_catch_up_difficulty + }, + }); + // info!(target: LOG_TARGET, squad = &self.config.squad; "Best peers to sync: {best_peers:?}"); if best_peers.is_empty() { info!(target: LOG_TARGET, squad = &self.config.squad; "No peers found to try and sync to");