Skip to content

Commit

Permalink
Revert " litep2p: Update network backend to v0.7.0 (paritytech#5609)"
Browse files Browse the repository at this point in the history
This reverts commit 12eeb5d.
  • Loading branch information
Grigoriy Simonov committed Oct 30, 2024
1 parent ccb801e commit aae80c7
Show file tree
Hide file tree
Showing 9 changed files with 296 additions and 223 deletions.
353 changes: 257 additions & 96 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,7 @@ linked-hash-map = { version = "0.5.4" }
linked_hash_set = { version = "0.1.4" }
linregress = { version = "0.5.1" }
lite-json = { version = "0.2.0", default-features = false }
litep2p = { version = "0.7.0", features = ["websocket"] }
litep2p = { version = "0.6.2" }
log = { version = "0.4.22", default-features = false }
macro_magic = { version = "0.5.1" }
maplit = { version = "1.0.2" }
Expand Down
21 changes: 0 additions & 21 deletions prdoc/pr_5609.prdoc

This file was deleted.

8 changes: 5 additions & 3 deletions substrate/client/network/src/litep2p/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,11 @@ impl Discovery {
) -> (Self, PingConfig, IdentifyConfig, KademliaConfig, Option<MdnsConfig>) {
let (ping_config, ping_event_stream) = PingConfig::default();
let user_agent = format!("{} ({})", config.client_version, config.node_name);

let (identify_config, identify_event_stream) =
IdentifyConfig::new("/substrate/1.0".to_string(), Some(user_agent));
let (identify_config, identify_event_stream) = IdentifyConfig::new(
"/substrate/1.0".to_string(),
Some(user_agent),
config.public_addresses.clone().into_iter().map(Into::into).collect(),
);

let (mdns_config, mdns_event_stream) = match config.transport {
crate::config::TransportConfig::Normal { enable_mdns, .. } => match enable_mdns {
Expand Down
78 changes: 23 additions & 55 deletions substrate/client/network/src/litep2p/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ use libp2p::kad::{PeerRecord, Record as P2PRecord, RecordKey};
use litep2p::{
config::ConfigBuilder,
crypto::ed25519::Keypair,
error::{DialError, NegotiationError},
executor::Executor,
protocol::{
libp2p::{
Expand All @@ -65,14 +64,15 @@ use litep2p::{
},
transport::{
tcp::config::Config as TcpTransportConfig,
websocket::config::Config as WebSocketTransportConfig, ConnectionLimitsConfig, Endpoint,
websocket::config::Config as WebSocketTransportConfig, Endpoint,
},
types::{
multiaddr::{Multiaddr, Protocol},
ConnectionId,
},
Litep2p, Litep2pEvent, ProtocolName as Litep2pProtocolName,
Error as Litep2pError, Litep2p, Litep2pEvent, ProtocolName as Litep2pProtocolName,
};
use parking_lot::RwLock;
use prometheus_endpoint::Registry;

use sc_client_api::BlockBackend;
Expand Down Expand Up @@ -183,6 +183,9 @@ pub struct Litep2pNetworkBackend {

/// Prometheus metrics.
metrics: Option<Metrics>,

/// External addresses.
external_addresses: Arc<RwLock<HashSet<Multiaddr>>>,
}

impl Litep2pNetworkBackend {
Expand Down Expand Up @@ -554,9 +557,6 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
.with_libp2p_ping(ping_config)
.with_libp2p_identify(identify_config)
.with_libp2p_kademlia(kademlia_config)
.with_connection_limits(ConnectionLimitsConfig::default().max_incoming_connections(
Some(crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING as usize),
))
.with_executor(executor);

if let Some(config) = maybe_mdns_config {
Expand All @@ -570,22 +570,15 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
let litep2p =
Litep2p::new(config_builder.build()).map_err(|error| Error::Litep2p(error))?;

let external_addresses: Arc<RwLock<HashSet<Multiaddr>>> = Arc::new(RwLock::new(
HashSet::from_iter(network_config.public_addresses.iter().cloned().map(Into::into)),
));
litep2p.listen_addresses().for_each(|address| {
log::debug!(target: LOG_TARGET, "listening on: {address}");

listen_addresses.write().insert(address.clone());
});

let public_addresses = litep2p.public_addresses();
for address in network_config.public_addresses.iter() {
if let Err(err) = public_addresses.add_address(address.clone().into()) {
log::warn!(
target: LOG_TARGET,
"failed to add public address {address:?}: {err:?}",
);
}
}

let network_service = Arc::new(Litep2pNetworkService::new(
local_peer_id,
keypair.clone(),
Expand All @@ -595,7 +588,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
block_announce_protocol.clone(),
request_response_senders,
Arc::clone(&listen_addresses),
public_addresses,
Arc::clone(&external_addresses),
));

// register rest of the metrics now that `Litep2p` has been created
Expand All @@ -621,6 +614,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
event_streams: out_events::OutChannels::new(None)?,
peers: HashMap::new(),
litep2p,
external_addresses,
})
}

Expand Down Expand Up @@ -923,16 +917,10 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
self.discovery.add_self_reported_address(peer, supported_protocols, listen_addresses).await;
}
Some(DiscoveryEvent::ExternalAddressDiscovered { address }) => {
match self.litep2p.public_addresses().add_address(address.clone().into()) {
Ok(inserted) => if inserted {
log::info!(target: LOG_TARGET, "🔍 Discovered new external address for our node: {address}");
},
Err(err) => {
log::warn!(
target: LOG_TARGET,
"🔍 Failed to add discovered external address {address:?}: {err:?}",
);
},
let mut addresses = self.external_addresses.write();

if addresses.insert(address.clone()) {
log::info!(target: LOG_TARGET, "🔍 Discovered new external address for our node: {address}");
}
}
Some(DiscoveryEvent::ExternalAddressExpired{ address }) => {
Expand Down Expand Up @@ -1037,40 +1025,20 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
}
}
Some(Litep2pEvent::DialFailure { address, error }) => {
log::debug!(
log::trace!(
target: LOG_TARGET,
"failed to dial peer at {address:?}: {error:?}",
);

if let Some(metrics) = &self.metrics {
let reason = match error {
DialError::Timeout => "timeout",
DialError::AddressError(_) => "invalid-address",
DialError::DnsError(_) => "cannot-resolve-dns",
DialError::NegotiationError(error) => match error {
NegotiationError::Timeout => "timeout",
NegotiationError::PeerIdMissing => "missing-peer-id",
NegotiationError::StateMismatch => "state-mismatch",
NegotiationError::PeerIdMismatch(_,_) => "peer-id-missmatch",
NegotiationError::MultistreamSelectError(_) => "multistream-select-error",
NegotiationError::SnowError(_) => "noise-error",
NegotiationError::ParseError(_) => "parse-error",
NegotiationError::IoError(_) => "io-error",
NegotiationError::WebSocket(_) => "webscoket-error",
}
};

metrics.pending_connections_errors_total.with_label_values(&[&reason]).inc();
}
}
Some(Litep2pEvent::ListDialFailures { errors }) => {
log::debug!(
target: LOG_TARGET,
"failed to dial peer on multiple addresses {errors:?}",
);
let reason = match error {
Litep2pError::PeerIdMismatch(_, _) => "invalid-peer-id",
Litep2pError::Timeout | Litep2pError::TransportError(_) |
Litep2pError::IoError(_) | Litep2pError::WebSocket(_) => "transport-error",
_ => "other",
};

if let Some(metrics) = &self.metrics {
metrics.pending_connections_errors_total.with_label_values(&["transport-errors"]).inc();
metrics.pending_connections_errors_total.with_label_values(&[reason]).inc();
}
}
_ => {}
Expand Down
16 changes: 7 additions & 9 deletions substrate/client/network/src/litep2p/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@ use crate::litep2p::Record;
use codec::DecodeAll;
use futures::{channel::oneshot, stream::BoxStream};
use libp2p::{identity::SigningError, kad::record::Key as KademliaKey};
use litep2p::{
addresses::PublicAddresses, crypto::ed25519::Keypair,
types::multiaddr::Multiaddr as LiteP2pMultiaddr,
};
use litep2p::{crypto::ed25519::Keypair, types::multiaddr::Multiaddr as LiteP2pMultiaddr};
use parking_lot::RwLock;

use sc_network_common::{
Expand Down Expand Up @@ -199,7 +196,7 @@ pub struct Litep2pNetworkService {
listen_addresses: Arc<RwLock<HashSet<LiteP2pMultiaddr>>>,

/// External addresses.
external_addresses: PublicAddresses,
external_addresses: Arc<RwLock<HashSet<LiteP2pMultiaddr>>>,
}

impl Litep2pNetworkService {
Expand All @@ -213,7 +210,7 @@ impl Litep2pNetworkService {
block_announce_protocol: ProtocolName,
request_response_protocols: HashMap<ProtocolName, TracingUnboundedSender<OutboundRequest>>,
listen_addresses: Arc<RwLock<HashSet<LiteP2pMultiaddr>>>,
external_addresses: PublicAddresses,
external_addresses: Arc<RwLock<HashSet<LiteP2pMultiaddr>>>,
) -> Self {
Self {
local_peer_id,
Expand Down Expand Up @@ -326,8 +323,9 @@ impl NetworkStatusProvider for Litep2pNetworkService {
.collect(),
external_addresses: self
.external_addresses
.get_addresses()
.into_iter()
.read()
.iter()
.cloned()
.map(|a| Multiaddr::from(a).into())
.collect(),
connected_peers: HashMap::new(),
Expand Down Expand Up @@ -493,7 +491,7 @@ impl NetworkEventStream for Litep2pNetworkService {

impl NetworkStateInfo for Litep2pNetworkService {
fn external_addresses(&self) -> Vec<Multiaddr> {
self.external_addresses.get_addresses().into_iter().map(Into::into).collect()
self.external_addresses.read().iter().cloned().map(Into::into).collect()
}

fn listen_addresses(&self) -> Vec<Multiaddr> {
Expand Down
31 changes: 2 additions & 29 deletions substrate/client/network/src/litep2p/shim/request_response/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ use crate::{

use futures::{channel::oneshot, future::BoxFuture, stream::FuturesUnordered, StreamExt};
use litep2p::{
error::{ImmediateDialError, NegotiationError, SubstreamError},
protocol::request_response::{
DialOptions, RejectReason, RequestResponseError, RequestResponseEvent,
RequestResponseHandle,
DialOptions, RequestResponseError, RequestResponseEvent, RequestResponseHandle,
},
types::RequestId,
};
Expand Down Expand Up @@ -374,32 +372,7 @@ impl RequestResponseProtocol {
let status = match error {
RequestResponseError::NotConnected =>
Some((RequestFailure::NotConnected, "not-connected")),
RequestResponseError::Rejected(reason) => {
let reason = match reason {
RejectReason::ConnectionClosed => "connection-closed",
RejectReason::SubstreamClosed => "substream-closed",
RejectReason::SubstreamOpenError(substream_error) => match substream_error {
SubstreamError::NegotiationError(NegotiationError::Timeout) =>
"substream-timeout",
_ => "substream-open-error",
},
RejectReason::DialFailed(None) => "dial-failed",
RejectReason::DialFailed(Some(ImmediateDialError::AlreadyConnected)) =>
"dial-already-connected",
RejectReason::DialFailed(Some(ImmediateDialError::PeerIdMissing)) =>
"dial-peerid-missing",
RejectReason::DialFailed(Some(ImmediateDialError::TriedToDialSelf)) =>
"dial-tried-to-dial-self",
RejectReason::DialFailed(Some(ImmediateDialError::NoAddressAvailable)) =>
"dial-no-address-available",
RejectReason::DialFailed(Some(ImmediateDialError::TaskClosed)) =>
"dial-task-closed",
RejectReason::DialFailed(Some(ImmediateDialError::ChannelClogged)) =>
"dial-channel-clogged",
};

Some((RequestFailure::Refused, reason))
},
RequestResponseError::Rejected => Some((RequestFailure::Refused, "rejected")),
RequestResponseError::Timeout =>
Some((RequestFailure::Network(OutboundFailure::Timeout), "timeout")),
RequestResponseError::Canceled => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,12 +271,7 @@ async fn too_many_inbound_requests() {
match handle2.next().await {
Some(RequestResponseEvent::RequestFailed { peer, error, .. }) => {
assert_eq!(peer, peer1);
assert_eq!(
error,
RequestResponseError::Rejected(
litep2p::protocol::request_response::RejectReason::SubstreamClosed
)
);
assert_eq!(error, RequestResponseError::Rejected);
},
event => panic!("inavlid event: {event:?}"),
}
Expand Down
3 changes: 0 additions & 3 deletions substrate/client/tracing/src/logging/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,6 @@ where
.add_directive(
parse_default_directive("trust_dns_proto=off").expect("provided directive is valid"),
)
.add_directive(
parse_default_directive("hickory_proto=off").expect("provided directive is valid"),
)
.add_directive(
parse_default_directive("libp2p_mdns::behaviour::iface=off")
.expect("provided directive is valid"),
Expand Down

0 comments on commit aae80c7

Please sign in to comment.