Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backport: Bring litep2p fixes and latest version to stable2409 #6497

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
384 changes: 117 additions & 267 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ linked-hash-map = { version = "0.5.4" }
linked_hash_set = { version = "0.1.4" }
linregress = { version = "0.5.1" }
lite-json = { version = "0.2.0", default-features = false }
litep2p = { version = "0.6.2" }
litep2p = { version = "0.8.1", features = ["websocket"] }
log = { version = "0.4.22", default-features = false }
macro_magic = { version = "0.5.1" }
maplit = { version = "1.0.2" }
Expand Down
13 changes: 13 additions & 0 deletions prdoc/pr_6497.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json

title: Bring litep2p fixes and latest version to stable2409

doc:
- audience: [ Node Dev, Node Operator ]
description: |
This PR affects only litep2p (experimental) backend and contains critical fixes for connection stability and memory leaks.

crates:
- name: sc-network
bump: patch
115 changes: 86 additions & 29 deletions substrate/client/network/src/litep2p/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,6 @@ pub enum DiscoveryEvent {
/// Peer ID.
peer: PeerId,

/// Identify protocol version.
protocol_version: Option<String>,

/// Identify user agent version.
user_agent: Option<String>,

/// Observed address.
observed_address: Multiaddr,

/// Listen addresses.
listen_addresses: Vec<Multiaddr>,

Expand All @@ -125,7 +116,16 @@ pub enum DiscoveryEvent {

/// New external address discovered.
ExternalAddressDiscovered {
/// Discovered addresses.
/// Discovered address.
address: Multiaddr,
},

/// The external address has expired.
///
/// This happens when the internal buffers exceed the maximum number of external addresses,
/// and this address is the oldest one.
ExternalAddressExpired {
/// Expired address.
address: Multiaddr,
},

Expand Down Expand Up @@ -162,6 +162,9 @@ pub enum DiscoveryEvent {

/// Discovery.
pub struct Discovery {
/// Local peer ID.
local_peer_id: litep2p::PeerId,

/// Ping event stream.
ping_event_stream: Box<dyn Stream<Item = PingEvent> + Send + Unpin>,

Expand Down Expand Up @@ -233,6 +236,7 @@ impl Discovery {
/// Enables `/ipfs/ping/1.0.0` and `/ipfs/identify/1.0.0` by default and starts
/// the mDNS peer discovery if it was enabled.
pub fn new<Hash: AsRef<[u8]> + Clone>(
local_peer_id: litep2p::PeerId,
config: &NetworkConfiguration,
genesis_hash: Hash,
fork_id: Option<&str>,
Expand All @@ -243,11 +247,9 @@ impl Discovery {
) -> (Self, PingConfig, IdentifyConfig, KademliaConfig, Option<MdnsConfig>) {
let (ping_config, ping_event_stream) = PingConfig::default();
let user_agent = format!("{} ({})", config.client_version, config.node_name);
let (identify_config, identify_event_stream) = IdentifyConfig::new(
"/substrate/1.0".to_string(),
Some(user_agent),
config.public_addresses.clone().into_iter().map(Into::into).collect(),
);

let (identify_config, identify_event_stream) =
IdentifyConfig::new("/substrate/1.0".to_string(), Some(user_agent));

let (mdns_config, mdns_event_stream) = match config.transport {
crate::config::TransportConfig::Normal { enable_mdns, .. } => match enable_mdns {
Expand Down Expand Up @@ -275,6 +277,7 @@ impl Discovery {

(
Self {
local_peer_id,
ping_event_stream,
identify_event_stream,
mdns_event_stream,
Expand Down Expand Up @@ -434,7 +437,13 @@ impl Discovery {
}

/// Check if `address` can be considered a new external address.
fn is_new_external_address(&mut self, address: &Multiaddr, peer: PeerId) -> bool {
///
/// If this address replaces an older address, the expired address is returned.
fn is_new_external_address(
&mut self,
address: &Multiaddr,
peer: PeerId,
) -> (bool, Option<Multiaddr>) {
log::trace!(target: LOG_TARGET, "verify new external address: {address}");

// is the address one of our known addresses
Expand All @@ -445,23 +454,39 @@ impl Discovery {
.chain(self.public_addresses.iter())
.any(|known_address| Discovery::is_known_address(&known_address, &address))
{
return true
return (true, None)
}

match self.address_confirmations.get(address) {
Some(confirmations) => {
confirmations.insert(peer);

if confirmations.len() >= MIN_ADDRESS_CONFIRMATIONS {
return true
return (true, None)
}
},
None => {
let oldest = (self.address_confirmations.len() >=
self.address_confirmations.limiter().max_length() as usize)
.then(|| {
self.address_confirmations.pop_oldest().map(|(address, peers)| {
if peers.len() >= MIN_ADDRESS_CONFIRMATIONS {
return Some(address)
} else {
None
}
})
})
.flatten()
.flatten();

self.address_confirmations.insert(address.clone(), Default::default());

return (false, oldest)
},
}

false
(false, None)
}
}

Expand Down Expand Up @@ -533,7 +558,7 @@ impl Stream for Discovery {

return Poll::Ready(Some(DiscoveryEvent::GetRecordSuccess { query_id, records }));
},
Poll::Ready(Some(KademliaEvent::PutRecordSucess { query_id, key: _ })) =>
Poll::Ready(Some(KademliaEvent::PutRecordSuccess { query_id, key: _ })) =>
return Poll::Ready(Some(DiscoveryEvent::PutRecordSuccess { query_id })),
Poll::Ready(Some(KademliaEvent::QueryFailed { query_id })) => {
match this.find_node_query_id == Some(query_id) {
Expand All @@ -556,31 +581,63 @@ impl Stream for Discovery {

return Poll::Ready(Some(DiscoveryEvent::IncomingRecord { record }))
},
// Content provider events are ignored for now.
Poll::Ready(Some(KademliaEvent::GetProvidersSuccess { .. })) |
Poll::Ready(Some(KademliaEvent::IncomingProvider { .. })) => {},
}

match Pin::new(&mut this.identify_event_stream).poll_next(cx) {
Poll::Pending => {},
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(IdentifyEvent::PeerIdentified {
peer,
protocol_version,
user_agent,
listen_addresses,
supported_protocols,
observed_address,
..
})) => {
if this.is_new_external_address(&observed_address, peer) {
this.pending_events.push_back(DiscoveryEvent::ExternalAddressDiscovered {
address: observed_address.clone(),
});
let observed_address =
if let Some(Protocol::P2p(peer_id)) = observed_address.iter().last() {
if peer_id != *this.local_peer_id.as_ref() {
log::warn!(
target: LOG_TARGET,
"Discovered external address for a peer that is not us: {observed_address}",
);
None
} else {
Some(observed_address)
}
} else {
Some(observed_address.with(Protocol::P2p(this.local_peer_id.into())))
};

// Ensure that an external address with a different peer ID does not have
// side effects of evicting other external addresses via `ExternalAddressExpired`.
if let Some(observed_address) = observed_address {
let (is_new, expired_address) =
this.is_new_external_address(&observed_address, peer);

if let Some(expired_address) = expired_address {
log::trace!(
target: LOG_TARGET,
"Removing expired external address expired={expired_address} is_new={is_new} observed={observed_address}",
);

this.pending_events.push_back(DiscoveryEvent::ExternalAddressExpired {
address: expired_address,
});
}

if is_new {
this.pending_events.push_back(DiscoveryEvent::ExternalAddressDiscovered {
address: observed_address.clone(),
});
}
}

return Poll::Ready(Some(DiscoveryEvent::Identified {
peer,
protocol_version,
user_agent,
listen_addresses,
observed_address,
supported_protocols,
}));
},
Expand Down
Loading
Loading