Skip to content

Commit

Permalink
fix: ignore old peer messages (#132)
Browse files Browse the repository at this point in the history
  • Loading branch information
stringhandler authored Nov 1, 2024
1 parent 4b3cbd7 commit 463a682
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 50 deletions.
42 changes: 42 additions & 0 deletions log4rs_sample.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,38 @@ appenders:
encoder:
pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} {l:5} {m} // {f}:{L}{n}"

peer_info:
kind: rolling_file
path: "{{log_dir}}/log/peer_info.log"
policy:
kind: compound
trigger:
kind: size
limit: 10mb
roller:
kind: fixed_window
base: 1
count: 5
pattern: "{{log_dir}}/log/peer_info.{}.log"
encoder:
pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} {l:5} {m} // {f}:{L}{n}"

new_tip_notify:
kind: rolling_file
path: "{{log_dir}}/log/new_tip_notify.log"
policy:
kind: compound
trigger:
kind: size
limit: 10mb
roller:
kind: fixed_window
base: 1
count: 5
pattern: "{{log_dir}}/log/new_tip_notify.{}.log"
encoder:
pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} {l:5} {m} // {f}:{L}{n}"

# Set the default logging level to "warn" and attach the "stdout" appender to the root
root:
level: info
Expand All @@ -145,6 +177,16 @@ loggers:
- stdout
- peers
additive: false
tari::p2pool::topics::peer_info:
level: debug
appenders:
- peer_info
additive: false
tari::p2pool::topics::new_tip_notify:
level: debug
appenders:
- new_tip_notify
additive: false
tari::p2pool::server::p2p:
level: info
appenders:
Expand Down
3 changes: 3 additions & 0 deletions src/server/p2p/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ where T: Serialize {
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct PeerInfo {
pub version: u64,
pub peer_id: Option<PeerId>,
pub current_sha3x_height: u64,
pub current_random_x_height: u64,
pub current_sha3x_pow: u128,
Expand All @@ -60,6 +61,7 @@ pub struct PeerInfo {
impl_conversions!(PeerInfo);
impl PeerInfo {
pub fn new(
peer_id: PeerId,
current_sha3x_height: u64,
current_random_x_height: u64,
current_sha3x_pow: u128,
Expand All @@ -71,6 +73,7 @@ impl PeerInfo {
let timestamp = EpochTime::now();
Self {
version: PROTOCOL_VERSION,
peer_id: Some(peer_id),
current_sha3x_height,
current_random_x_height,
current_sha3x_pow,
Expand Down
154 changes: 104 additions & 50 deletions src/server/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,16 @@ const CATCH_UP_SYNC_REQUEST_RESPONSE_PROTOCOL: &str = "/catch_up_sync/4";
const LOG_TARGET: &str = "tari::p2pool::server::p2p";
const SYNC_REQUEST_LOG_TARGET: &str = "sync_request";
const MESSAGE_LOGGING_LOG_TARGET: &str = "tari::p2pool::message_logging";
const PEER_INFO_LOGGING_LOG_TARGET: &str = "tari::p2pool::topics::peer_info";
const NEW_TIP_NOTIFY_LOGGING_LOG_TARGET: &str = "tari::p2pool::topics::new_tip_notify";
pub const STABLE_PRIVATE_KEY_FILE: &str = "p2pool_private.key";

const MAX_ACCEPTABLE_P2P_MESSAGE_TIMEOUT: Duration = Duration::from_millis(500);
const SYNC_TIMEOUT: Duration = Duration::from_secs(60);
const MAX_ACCEPTABLE_NETWORK_EVENT_TIMEOUT: Duration = Duration::from_millis(100);
const CATCH_UP_SYNC_BLOCKS_IN_I_HAVE: usize = 100;
const MAX_CATCH_UP_ATTEMPTS: usize = 150;
// Time to start up and catch up before we start processing new tip messages
const STARTUP_CATCH_UP_TIME: Duration = Duration::from_secs(120);
const STARTUP_CATCH_UP_TIME: Duration = Duration::from_secs(1);

#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
pub struct Squad {
Expand Down Expand Up @@ -415,14 +416,14 @@ where S: ShareChain
StreamProtocol::new(SHARE_CHAIN_SYNC_REQ_RESP_PROTOCOL),
request_response::ProtocolSupport::Full,
)],
request_response::Config::default().with_request_timeout(Duration::from_secs(30)), // 10 is the default
request_response::Config::default().with_request_timeout(Duration::from_secs(10)), // 10 is the default
),
direct_peer_exchange: cbor::Behaviour::<DirectPeerInfoRequest, DirectPeerInfoResponse>::new(
[(
StreamProtocol::new(DIRECT_PEER_EXCHANGE_REQ_RESP_PROTOCOL),
request_response::ProtocolSupport::Full,
)],
request_response::Config::default().with_request_timeout(Duration::from_secs(30)), // 10 is the default
request_response::Config::default().with_request_timeout(Duration::from_secs(10)), // 10 is the default
),
catch_up_sync: cbor::Behaviour::<CatchUpSyncRequest, CatchUpSyncResponse>::new(
[(
Expand Down Expand Up @@ -499,6 +500,7 @@ where S: ShareChain
let current_pow_sha3x = share_chain_sha3x.chain_pow().await.as_u128();
let current_pow_random_x = share_chain_random_x.chain_pow().await.as_u128();
let peer_info_squad_raw = PeerInfo::new(
self.swarm.local_peer_id().clone(),
current_height_sha3x,
current_height_random_x,
current_pow_sha3x,
Expand Down Expand Up @@ -596,47 +598,74 @@ where S: ShareChain
#[allow(clippy::too_many_lines)]
async fn handle_new_gossipsub_message(&mut self, message: Message) -> Result<MessageAcceptance, Error> {
debug!(target: MESSAGE_LOGGING_LOG_TARGET, "New gossipsub message: {message:?}");
let peer = message.source;
if let Some(peer) = peer {
let source_peer = message.source;
if let Some(source_peer) = source_peer {
let topic = message.topic.to_string();
match topic {
topic if topic == Self::network_topic(PEER_INFO_TOPIC) => match messages::PeerInfo::try_from(message) {
Ok(payload) => {
debug!(target: MESSAGE_LOGGING_LOG_TARGET, "[PEERINFO_TOPIC] New peer info: {peer:?} -> {payload:?}");
debug!(target: LOG_TARGET, squad = &self.config.squad; "[NETWORK] New peer info: {peer:?} -> {payload:?}");
if payload.version != PROTOCOL_VERSION {
trace!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} has an outdated version, skipping", peer);
return Ok(MessageAcceptance::Reject);
}
if payload.squad != self.config.squad.as_string() {
debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} is not in the same squad, skipping. Our squad: {}, their squad:{}", peer, self.config.squad, payload.squad);
return Ok(MessageAcceptance::Ignore);
}
if !self.config.is_seed_peer {
if self.add_peer(payload, peer.clone()).await {
self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer);
}
}
return Ok(MessageAcceptance::Accept);
},
Err(error) => {
debug!(target: LOG_TARGET, squad = &self.config.squad; "Can't deserialize peer info payload: {:?}", error);
return Ok(MessageAcceptance::Reject);
},
},
// topic if topic == Self::network_topic(PEER_INFO_TOPIC) => match messages::PeerInfo::try_from(message)
// { Ok(payload) => {
// debug!(target: PEER_INFO_LOGGING_LOG_TARGET, "[PEERINFO_TOPIC] New peer info: {source_peer:?}
// -> {payload:?}"); debug!(target: LOG_TARGET, squad = &self.config.squad;
// "[NETWORK] New peer info: {source_peer:?} -> {payload:?}"); if
// payload.version != PROTOCOL_VERSION { trace!(target: LOG_TARGET, squad =
// &self.config.squad; "Peer {} has an outdated version, skipping", source_peer);
// return Ok(MessageAcceptance::Reject);
// }
// if payload.squad != self.config.squad.as_string() {
// debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} is not in the same squad,
// skipping. Our squad: {}, their squad:{}", source_peer, self.config.squad, payload.squad);
// return Ok(MessageAcceptance::Ignore);
// }

// if payload.peer_id.as_ref() == Some(self.swarm.local_peer_id()) {
// return Ok(MessageAcceptance::Ignore);
// }

// if payload.peer_id != Some(source_peer) {
// warn!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a peer info message
// with a different peer id: {}, skipping", source_peer, payload.peer_id.as_ref().map(|p|
// p.to_string()).unwrap_or("None".to_string())); // return
// Ok(MessageAcceptance::Ignore); }
// if !self.config.is_seed_peer {
// if self.add_peer(payload, source_peer.clone()).await {
// self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&source_peer);
// }
// }
// return Ok(MessageAcceptance::Accept);
// },
// Err(error) => {
// debug!(target: LOG_TARGET, squad = &self.config.squad; "Can't deserialize peer info payload:
// {:?}", error); return Ok(MessageAcceptance::Reject);
// },
// },
topic if topic == Self::squad_topic(&self.config.squad, PEER_INFO_TOPIC) => {
match messages::PeerInfo::try_from(message) {
Ok(payload) => {
debug!(target: MESSAGE_LOGGING_LOG_TARGET, "[SQUAD_PEERINFO_TOPIC] New peer info: {peer:?} -> {payload:?}");

debug!(target: LOG_TARGET, squad = &self.config.squad; "[squad] New peer info: {peer:?} -> {payload:?}");
debug!(target: LOG_TARGET, squad = &self.config.squad; "[squad] New peer info: {source_peer:?} -> {payload:?}");
if payload.version != PROTOCOL_VERSION {
debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} has an outdated version, skipping", peer);
debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} has an outdated version, skipping", source_peer);
return Ok(MessageAcceptance::Reject);
}

// 60 seconds. TODO: make config
if payload.timestamp < EpochTime::now().as_u64().saturating_sub(60) {
debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a peer info message that is too old, skipping", source_peer);
// TODO: should be punish
return Ok(MessageAcceptance::Ignore);
}
if payload.peer_id.as_ref() == Some(self.swarm.local_peer_id()) {
return Ok(MessageAcceptance::Ignore);
}

if payload.peer_id != Some(source_peer) {
warn!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a peer info message with a different peer id: {}, skipping", source_peer, payload.peer_id.as_ref().map(|p| p.to_string()).unwrap_or("None".to_string()));
// return Ok(MessageAcceptance::Ignore);
}
debug!(target: PEER_INFO_LOGGING_LOG_TARGET, "[SQUAD_PEERINFO_TOPIC] New peer info: {source_peer:?} -> {payload:?}");

if !self.config.is_seed_peer {
if self.add_peer(payload, peer.clone()).await {
self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer);
if self.add_peer(payload, source_peer.clone()).await {
self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&source_peer);
}
}
return Ok(MessageAcceptance::Accept);
Expand All @@ -651,33 +680,35 @@ where S: ShareChain
// TODO: (sender peer's wallet address should be included always in the conibases with a fixed percent
// (like 20%))
topic if topic == Self::squad_topic(&self.config.squad, BLOCK_NOTIFY_TOPIC) => {
debug!(target: MESSAGE_LOGGING_LOG_TARGET, "[SQUAD_NEW_BLOCK_TOPIC] New block from gossip: {peer:?}");

// if self.sync_in_progress.load(Ordering::SeqCst) {
// return;
// }
match NotifyNewTipBlock::try_from(message) {
Ok(payload) => {
if payload.version != PROTOCOL_VERSION {
debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} has an outdated version, skipping", peer);
debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} has an outdated version, skipping", source_peer);
return Ok(MessageAcceptance::Reject);
}
// lets check age
if payload.timestamp < EpochTime::now().as_u64().saturating_sub(60) {
debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a notify message that is too old, skipping", peer);
if payload.timestamp < EpochTime::now().as_u64().saturating_sub(10) {
debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a notify message that is too old, skipping", source_peer);
return Ok(MessageAcceptance::Ignore);
}
let payload = Arc::new(payload);
debug!(target: MESSAGE_LOGGING_LOG_TARGET, "[SQUAD_NEW_BLOCK_TOPIC] New block from gossip: {peer:?} -> {payload:?}");
let source_peer = payload.peer_id();
let message_peer = payload.peer_id();
if message_peer.to_string() != source_peer.to_string() {
warn!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a block with a different peer id: {}, skipping", source_peer, message_peer);
}
debug!(target: NEW_TIP_NOTIFY_LOGGING_LOG_TARGET, "[SQUAD_NEW_BLOCK_TOPIC] New block from gossip: {source_peer:?} -> {payload:?}");

// If we don't have this peer, try do peer exchange
if !self.network_peer_store.exists(source_peer) {
self.initiate_direct_peer_exchange(source_peer).await;
if !self.network_peer_store.exists(message_peer) {
self.initiate_direct_peer_exchange(message_peer).await;
}

// verify payload
if payload.new_blocks.is_empty() {
warn!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent notify new tip with no blocks.", source_peer);
warn!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent notify new tip with no blocks.", message_peer);
return Ok(MessageAcceptance::Reject);
}

Expand All @@ -694,7 +725,14 @@ where S: ShareChain
payload.new_blocks.iter().map(|(h, _)| *h).max().unwrap_or(0) <=
our_tip.saturating_sub(4)
{
debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a block that is not better than ours, skipping", source_peer);
debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a block that is not better than ours, skipping", message_peer);
return Ok(MessageAcceptance::Ignore);
}

// if payload.
if payload.new_blocks.iter().map(|(h, _)| *h).max().unwrap_or(0) > our_tip.saturating_add(2)
{
debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a block that is too far ahead, skipping", message_peer);
return Ok(MessageAcceptance::Ignore);
}

Expand All @@ -706,7 +744,7 @@ where S: ShareChain
missing_blocks.push(block.clone());
}
if !missing_blocks.is_empty() {
self.sync_share_chain(algo, source_peer, missing_blocks, true).await;
self.sync_share_chain(algo, message_peer, missing_blocks, true).await;
}
return Ok(MessageAcceptance::Accept);
},
Expand All @@ -715,7 +753,7 @@ where S: ShareChain
debug!(target: LOG_TARGET, squad = &self.config.squad; "Can't deserialize broadcast block payload: {:?}", error);
self.network_peer_store
.move_to_grey_list(
peer,
source_peer.clone(),
format!("Node sent a block that could not be deserialized: {:?}", error),
)
.await;
Expand Down Expand Up @@ -773,6 +811,10 @@ where S: ShareChain
})
{
let local_peer_id = self.swarm.local_peer_id().clone();
if peer == &local_peer_id {
return;
}

// TODO: Should we send them our details? The problem is that if we send too many of these, libp2p
// starts dropping requests with "libp2p_relay::priv_client::handler Dropping in-flight connect
// request because we are at capacity"
Expand Down Expand Up @@ -1687,14 +1729,26 @@ where S: ShareChain

// dbg!(&missing_blocks);

let best_peers = self
let mut best_peers = self
.network_peer_store
.best_peers_to_sync(self.config.num_peers_to_sync, *algo);
let our_pow = match algo {
PowAlgorithm::RandomX => self.share_chain_random_x.chain_pow().await,
PowAlgorithm::Sha3x => self.share_chain_sha3x.chain_pow().await,
};

let min_catch_up_difficulty = 1000;
best_peers.retain(|x| match algo {
PowAlgorithm::RandomX => {
let their_pow = x.peer_info.current_random_x_pow;
their_pow > min_catch_up_difficulty
},
PowAlgorithm::Sha3x => {
let their_pow = x.peer_info.current_sha3x_pow;
their_pow > min_catch_up_difficulty
},
});

// info!(target: LOG_TARGET, squad = &self.config.squad; "Best peers to sync: {best_peers:?}");
if best_peers.is_empty() {
info!(target: LOG_TARGET, squad = &self.config.squad; "No peers found to try and sync to");
Expand Down

0 comments on commit 463a682

Please sign in to comment.