Skip to content

Commit

Permalink
fix(swarm): exit with error if unsupported seed multiaddr (#836)
Browse files Browse the repository at this point in the history
Description
---
Exit with a helpful error message if an invalid seed peer address is
used
Prevents kademlia bootstrap OOM if using unsupported addresses 
Remove add peer NetworkHandle call as new _valid_ addresses are
automatically added on successful dials

Motivation and Context
---
bootstrapping with unsupported multiaddr causes kad to misbehave and
dial thousands of times, This PR checks the addresses and exits if they
are unsupported.

How Has This Been Tested?
---
Manually, current default configs contain onioin addresses that are not
supported

What process can a PR reviewer use to test or verify this change?
---


Breaking Changes
---

- [x] None
- [ ] Requires data directory to be deleted
- [ ] Other - Please specify
  • Loading branch information
sdbondi authored Dec 12, 2023
1 parent 9c29995 commit b54bde8
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 60 deletions.
20 changes: 14 additions & 6 deletions applications/tari_indexer/src/json_rpc/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use log::{error, warn};
use serde_json::{self as json, json, Value};
use tari_base_node_client::{grpc::GrpcBaseNodeClient, types::BaseLayerConsensusConstants, BaseNodeClient};
use tari_dan_app_utilities::{keypair::RistrettoKeypair, substate_file_cache::SubstateFileCache};
use tari_dan_common_types::{optional::Optional, Epoch, PeerAddress};
use tari_dan_common_types::{optional::Optional, public_key_to_peer_id, Epoch, PeerAddress};
use tari_dan_storage::consensus_models::Decision;
use tari_epoch_manager::{base_layer::EpochManagerHandle, EpochManagerReader};
use tari_indexer_client::{
Expand Down Expand Up @@ -64,7 +64,7 @@ use tari_indexer_client::{
SubmitTransactionResponse,
},
};
use tari_networking::{NetworkingHandle, NetworkingService};
use tari_networking::{is_supported_multiaddr, NetworkingHandle, NetworkingService};
use tari_validator_node_rpc::{
client::{SubstateResult, TariValidatorNodeRpcClientFactory, TransactionResultStatus},
proto,
Expand Down Expand Up @@ -177,11 +177,19 @@ impl JsonRpcHandlers {
wait_for_dial,
} = value.parse_params()?;

if let Some(unsupported) = addresses.iter().find(|a| !is_supported_multiaddr(a)) {
return Err(JsonRpcResponse::error(
answer_id,
JsonRpcError::new(
JsonRpcErrorReason::InvalidParams,
format!("Unsupported multiaddr {unsupported}"),
json::Value::Null,
),
));
}

let mut networking = self.networking.clone();
let peer_id = networking
.add_peer(public_key, addresses.clone())
.await
.map_err(internal_error(answer_id))?;
let peer_id = public_key_to_peer_id(public_key);

let dial_wait = networking
.dial_peer(
Expand Down
20 changes: 14 additions & 6 deletions applications/tari_validator_node/src/json_rpc/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ use log::*;
use serde_json::{self as json, json};
use tari_base_node_client::{grpc::GrpcBaseNodeClient, BaseNodeClient};
use tari_dan_app_utilities::{keypair::RistrettoKeypair, template_manager::interface::TemplateManagerHandle};
use tari_dan_common_types::{optional::Optional, PeerAddress, ShardId};
use tari_dan_common_types::{optional::Optional, public_key_to_peer_id, PeerAddress, ShardId};
use tari_dan_storage::{
consensus_models::{Block, ExecutedTransaction, LeafBlock, QuorumDecision, SubstateRecord, TransactionRecord},
Ordering,
StateStore,
};
use tari_epoch_manager::{base_layer::EpochManagerHandle, EpochManagerReader};
use tari_networking::{NetworkingHandle, NetworkingService};
use tari_networking::{is_supported_multiaddr, NetworkingHandle, NetworkingService};
use tari_state_store_sqlite::SqliteStateStore;
use tari_validator_node_client::{
types,
Expand Down Expand Up @@ -630,11 +630,19 @@ impl JsonRpcHandlers {
wait_for_dial,
} = value.parse_params()?;

if let Some(unsupported) = addresses.iter().find(|a| !is_supported_multiaddr(a)) {
return Err(JsonRpcResponse::error(
answer_id,
JsonRpcError::new(
JsonRpcErrorReason::InvalidParams,
format!("Unsupported multiaddr {unsupported}"),
json::Value::Null,
),
));
}

let mut networking = self.networking.clone();
let peer_id = networking
.add_peer(public_key, addresses.clone())
.await
.map_err(internal_error(answer_id))?;
let peer_id = public_key_to_peer_id(public_key);

let dial_wait = networking
.dial_peer(
Expand Down
2 changes: 1 addition & 1 deletion dan_layer/common_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ mod shard_id;
pub use shard_id::ShardId;

mod peer_address;
pub use peer_address::PeerAddress;
pub use peer_address::*;
pub mod uint;

pub use tari_engine_types::serde_with;
8 changes: 5 additions & 3 deletions dan_layer/common_types/src/peer_address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@ impl Display for PeerAddress {

impl From<RistrettoPublicKey> for PeerAddress {
fn from(pk: RistrettoPublicKey) -> Self {
let pk = identity::PublicKey::from(identity::sr25519::PublicKey::from(pk));
let peer_id = pk.to_peer_id();
Self(peer_id)
Self(public_key_to_peer_id(pk))
}
}

Expand All @@ -73,6 +71,10 @@ impl PartialEq<PeerId> for PeerAddress {

impl DerivableFromPublicKey for PeerAddress {}

pub fn public_key_to_peer_id(public_key: RistrettoPublicKey) -> PeerId {
identity::PublicKey::from(identity::sr25519::PublicKey::from(public_key)).to_peer_id()
}

#[cfg(test)]
mod tests {
use tari_crypto::keys::PublicKey;
Expand Down
25 changes: 1 addition & 24 deletions networking/core/src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@
use std::collections::HashSet;

use async_trait::async_trait;
use libp2p::{gossipsub::IdentTopic, swarm::dial_opts::DialOpts, Multiaddr, PeerId, StreamProtocol};
use libp2p::{gossipsub::IdentTopic, swarm::dial_opts::DialOpts, PeerId, StreamProtocol};
use log::*;
use tari_crypto::ristretto::RistrettoPublicKey;
use tari_rpc_framework::{
framing,
framing::CanonicalFraming,
Expand All @@ -50,11 +49,6 @@ use crate::{
const LOG_TARGET: &str = "tari::networking::handle";

pub enum NetworkingRequest<TMsg> {
AddPeer {
public_key: RistrettoPublicKey,
addresses: Vec<Multiaddr>,
reply_tx: oneshot::Sender<Result<PeerId, NetworkingError>>,
},
DialPeer {
dial_opts: DialOpts,
reply_tx: oneshot::Sender<Result<Waiter<()>, NetworkingError>>,
Expand Down Expand Up @@ -268,23 +262,6 @@ impl<TMsg> NetworkingHandle<TMsg> {
rx.await?
}

pub async fn add_peer(
&mut self,
public_key: RistrettoPublicKey,
addresses: Vec<Multiaddr>,
) -> Result<PeerId, NetworkingError> {
let (tx, rx) = oneshot::channel();
self.tx_request
.send(NetworkingRequest::AddPeer {
public_key,
addresses,
reply_tx: tx,
})
.await
.map_err(|_| NetworkingHandleError::ServiceHasShutdown)?;
rx.await?
}

pub async fn get_local_peer_info(&self) -> Result<PeerInfo, NetworkingError> {
let (tx, rx) = oneshot::channel();
self.tx_request
Expand Down
2 changes: 1 addition & 1 deletion networking/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub use config::*;
pub use connection::*;
pub use handle::*;
pub use spawn::*;
pub use tari_swarm::{Config as SwarmConfig, TariNetwork};
pub use tari_swarm::{is_supported_multiaddr, Config as SwarmConfig, TariNetwork};

#[async_trait]
pub trait NetworkingService<TMsg> {
Expand Down
13 changes: 10 additions & 3 deletions networking/core/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,33 @@

use std::collections::HashSet;

use anyhow::anyhow;
use libp2p::{identity::Keypair, Multiaddr, PeerId};
use tari_shutdown::ShutdownSignal;
use tari_swarm::{messaging, messaging::prost::ProstCodec};
use tari_swarm::{is_supported_multiaddr, messaging, messaging::prost::ProstCodec};
use tokio::{
sync::{broadcast, mpsc},
task::JoinHandle,
};

use crate::{worker::NetworkingWorker, NetworkingError, NetworkingHandle};
use crate::{worker::NetworkingWorker, NetworkingHandle};

pub fn spawn<TMsg>(
identity: Keypair,
tx_messages: mpsc::Sender<(PeerId, TMsg)>,
mut config: crate::Config,
seed_peers: Vec<(PeerId, Multiaddr)>,
shutdown_signal: ShutdownSignal,
) -> Result<(NetworkingHandle<TMsg>, JoinHandle<anyhow::Result<()>>), NetworkingError>
) -> anyhow::Result<(NetworkingHandle<TMsg>, JoinHandle<anyhow::Result<()>>)>
where
TMsg: messaging::prost::Message + Default + Clone + 'static,
{
for (_, addr) in &seed_peers {
if !is_supported_multiaddr(addr) {
return Err(anyhow!("Unsupported seed peer multi-address: {}", addr));
}
}

config.swarm.enable_relay = config.swarm.enable_relay || !config.reachability_mode.is_private();
let swarm = tari_swarm::create_swarm::<ProstCodec<TMsg>>(identity, HashSet::new(), config.swarm.clone())?;
let local_peer_id = *swarm.local_peer_id();
Expand Down
16 changes: 0 additions & 16 deletions networking/core/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use libp2p::{
futures::StreamExt,
gossipsub,
identify,
identity,
kad,
kad::{QueryResult, RoutingUpdate},
mdns,
Expand Down Expand Up @@ -190,21 +189,6 @@ where
},
}
},
NetworkingRequest::AddPeer {
public_key,
addresses,
reply_tx,
} => {
let kad_mut = &mut self.swarm.behaviour_mut().kad;
let peer_id = PeerId::from_public_key(&identity::PublicKey::from(identity::sr25519::PublicKey::from(
public_key,
)));
for address in addresses {
kad_mut.add_address(&peer_id, address);
}

let _ignore = reply_tx.send(Ok(peer_id));
},
NetworkingRequest::GetConnectedPeers { reply_tx } => {
let peers = self.swarm.connected_peers().copied().collect();
let _ignore = reply_tx.send(Ok(peers));
Expand Down
11 changes: 11 additions & 0 deletions networking/swarm/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,17 @@ where TCodec: messaging::Codec + Send + Clone + 'static
pub gossipsub: gossipsub::Behaviour,
}

/// Returns true if the given Multiaddr is supported by the Tari swarm, otherwise false.
/// NOTE: this function only currently returns false for onion addresses.
pub fn is_supported_multiaddr(addr: &libp2p::Multiaddr) -> bool {
!addr.iter().any(|p| {
matches!(
p,
libp2p::core::multiaddr::Protocol::Onion(_, _) | libp2p::core::multiaddr::Protocol::Onion3(_)
)
})
}

pub fn create_swarm<TCodec>(
identity: Keypair,
supported_protocols: HashSet<StreamProtocol>,
Expand Down

0 comments on commit b54bde8

Please sign in to comment.