Skip to content

Commit

Permalink
feat: adds transitive sharing of peer information (#618)
Browse files Browse the repository at this point in the history
* feat: adds transitive sharing of peer information

With this change all the pieces are in place such that peers can
discover peers from connected peers and connect to those peers.

Fixes: #605
Fixes: #606

* fix: faster tests
  • Loading branch information
nathanielc authored Dec 8, 2024
1 parent c9c8bef commit becc8b0
Show file tree
Hide file tree
Showing 14 changed files with 521 additions and 665 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ ssh-key = { workspace = true, features = ["ed25519", "std", "rand_core"] }
tempfile.workspace = true
tokio = { workspace = true, features = ["fs", "time", "sync", "macros"] }
tokio-stream.workspace = true
tracing-subscriber = { workspace = true, features = ["env-filter"] }
tracing.workspace = true
void.workspace = true
zeroize.workspace = true
Expand Down Expand Up @@ -71,9 +70,13 @@ features = [

[dev-dependencies]
ceramic-event-svc.workspace = true
ceramic-interest-svc.workspace = true
ceramic-peer-svc.workspace = true
recon.workspace = true
criterion2.workspace = true
rand_chacha.workspace = true
test-log.workspace = true
tracing-subscriber.workspace = true

[[bench]]
name = "lru_cache"
Expand Down
5 changes: 3 additions & 2 deletions p2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use tracing::{info, warn};

use self::ceramic_peer_manager::CeramicPeerManager;
pub use self::event::Event;
use crate::config::Libp2pConfig;
use crate::Metrics;
use crate::{config::Libp2pConfig, peers};

mod ceramic_peer_manager;
mod event;
Expand Down Expand Up @@ -72,6 +72,7 @@ where
relay_client: Option<relay::client::Behaviour>,
recons: Option<(P, I, M)>,
block_store: Arc<S>,
peers_tx: tokio::sync::mpsc::Sender<peers::Message>,
metrics: Metrics,
) -> Result<Self> {
let pub_key = local_key.public();
Expand Down Expand Up @@ -198,7 +199,7 @@ where
relay,
dcutr: dcutr.into(),
relay_client: relay_client.into(),
peer_manager: CeramicPeerManager::new(&config.ceramic_peers, metrics)?,
peer_manager: CeramicPeerManager::new(peers_tx, &config.ceramic_peers, metrics)?,
limits,
recon: recon.into(),
})
Expand Down
99 changes: 82 additions & 17 deletions p2p/src/behaviour/ceramic_peer_manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
collections::HashSet,
fmt::{self, Debug, Formatter},
future,
task::{Context, Poll},
Expand All @@ -8,23 +9,32 @@ use std::{
use ahash::AHashMap;
use anyhow::{anyhow, Result};
use backoff::{backoff::Backoff, ExponentialBackoff, ExponentialBackoffBuilder};
use ceramic_core::PeerEntry;
#[allow(deprecated)]
use ceramic_metrics::Recorder;
use futures_util::{future::BoxFuture, FutureExt};
use libp2p::swarm::{
dial_opts::{DialOpts, PeerCondition},
ToSwarm,
};
use libp2p::{
identify::Info as IdentifyInfo,
multiaddr::Protocol,
swarm::{dummy, ConnectionId, DialError, NetworkBehaviour},
Multiaddr, PeerId,
};
use tokio::time;
use libp2p::{
multiaddr::Protocol,
swarm::{
dial_opts::{DialOpts, PeerCondition},
ToSwarm,
},
};
use tokio::{
sync::{mpsc::Sender, oneshot},
time,
};
use tracing::{info, warn};

use crate::metrics::{self, Metrics};
use crate::{
metrics::{self, Metrics},
peers,
};

/// Manages state for Ceramic peers.
/// Ceramic peers are peers that participate in the Ceramic network.
Expand All @@ -35,6 +45,10 @@ pub struct CeramicPeerManager {
metrics: Metrics,
info: AHashMap<PeerId, Info>,
ceramic_peers: AHashMap<PeerId, CeramicPeer>,
// Use a message passing technique to get peers so that we do not use the current task to do
// DB/IO work.
peers_tx: Sender<peers::Message>,
peers_fut: Option<BoxFuture<'static, Result<Vec<PeerEntry>>>>,
}

#[derive(Default, Debug, Clone)]
Expand All @@ -60,14 +74,18 @@ const PEERING_DIAL_JITTER: f64 = 0.1;
pub enum PeerManagerEvent {}

impl CeramicPeerManager {
pub fn new(ceramic_peers: &[Multiaddr], metrics: Metrics) -> Result<Self> {
pub fn new(
peers_tx: Sender<peers::Message>,
ceramic_peers: &[Multiaddr],
metrics: Metrics,
) -> Result<Self> {
let ceramic_peers = ceramic_peers
.iter()
// Extract peer id from multiaddr
.map(|multiaddr| {
if let Some(peer) = multiaddr.iter().find_map(|proto| match proto {
Protocol::P2p(peer_id) => {
Some((peer_id, CeramicPeer::new(multiaddr.to_owned())))
Some((peer_id, CeramicPeer::new(vec![multiaddr.to_owned()])))
}
_ => None,
}) {
Expand All @@ -81,6 +99,8 @@ impl CeramicPeerManager {
metrics,
info: Default::default(),
ceramic_peers,
peers_tx,
peers_fut: None,
})
}

Expand All @@ -99,11 +119,30 @@ impl CeramicPeerManager {
pub fn is_ceramic_peer(&self, peer_id: &PeerId) -> bool {
self.ceramic_peers.contains_key(peer_id)
}
pub fn new_peers(&mut self) {
if self.peers_fut.is_none() {
let (tx, rx) = oneshot::channel();
// Construct future that will resolve to the set of all known remote peers
let peers_tx = self.peers_tx.clone();
self.peers_fut = Some(
async move {
futures::future::join(peers_tx.send(peers::Message::AllRemotePeers(tx)), rx)
.map(|(send, peers)| {
send.map_err(anyhow::Error::from)
.and(peers.map_err(anyhow::Error::from).and_then(|inner| inner))
})
.await
}
.boxed(),
)
} // else do nothing because we will get all peers anyways
}

fn handle_connection_established(&mut self, peer_id: &PeerId) {
if let Some(peer) = self.ceramic_peers.get_mut(peer_id) {
info!(
multiaddr = %peer.multiaddr,
%peer_id,
multiaddr = ?peer.addrs,
"connection established, stop dialing ceramic peer",
);
peer.stop_redial();
Expand All @@ -114,7 +153,8 @@ impl CeramicPeerManager {
fn handle_connection_closed(&mut self, peer_id: &PeerId) {
if let Some(peer) = self.ceramic_peers.get_mut(peer_id) {
warn!(
multiaddr = %peer.multiaddr,
%peer_id,
multiaddr = ?peer.addrs,
"connection closed, redial ceramic peer",
);
peer.start_redial();
Expand All @@ -125,7 +165,8 @@ impl CeramicPeerManager {
fn handle_dial_failure(&mut self, peer_id: &PeerId) {
if let Some(peer) = self.ceramic_peers.get_mut(peer_id) {
warn!(
multiaddr = %peer.multiaddr,
%peer_id,
multiaddr = ?peer.addrs,
"dial failed, redial ceramic peer"
);
peer.backoff_redial();
Expand Down Expand Up @@ -195,13 +236,37 @@ impl NetworkBehaviour for CeramicPeerManager {
&mut self,
cx: &mut Context<'_>,
) -> Poll<libp2p::swarm::ToSwarm<Self::ToSwarm, libp2p::swarm::THandlerInEvent<Self>>> {
if let Some(mut peers) = self.peers_fut.take() {
match peers.poll_unpin(cx) {
Poll::Ready(peers) => match peers {
Ok(peers) => {
for peer_entry in peers {
self.ceramic_peers
.entry(peer_entry.id().peer_id())
.and_modify(|peer| {
let count = peer.addrs.len();
peer.addrs.extend(peer_entry.addresses().iter().cloned());
if count != peer.addrs.len() {
peer.start_redial()
}
})
.or_insert(CeramicPeer::new(peer_entry.addresses().to_vec()));
}
}
Err(err) => warn!(%err,"failed to get set of remote peers"),
},
Poll::Pending => {
self.peers_fut.replace(peers);
}
}
}
for (peer_id, peer) in self.ceramic_peers.iter_mut() {
if let Some(mut dial_future) = peer.dial_future.take() {
match dial_future.as_mut().poll_unpin(cx) {
Poll::Ready(()) => {
return Poll::Ready(ToSwarm::Dial {
opts: DialOpts::peer_id(*peer_id)
.addresses(vec![peer.multiaddr.clone()])
.addresses(peer.addrs.iter().cloned().collect())
.condition(PeerCondition::Disconnected)
.build(),
})
Expand Down Expand Up @@ -239,23 +304,23 @@ impl NetworkBehaviour for CeramicPeerManager {

// State of Ceramic peer.
struct CeramicPeer {
multiaddr: Multiaddr,
addrs: HashSet<Multiaddr>,
dial_backoff: ExponentialBackoff,
dial_future: Option<BoxFuture<'static, ()>>,
}

impl Debug for CeramicPeer {
fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
f.debug_struct("BootstrapPeer")
.field("multiaddr", &self.multiaddr)
.field("multiaddr", &self.addrs)
.field("dial_backoff", &self.dial_backoff)
.field("dial_future", &self.dial_future.is_some())
.finish()
}
}

impl CeramicPeer {
fn new(multiaddr: Multiaddr) -> Self {
fn new(addrs: Vec<Multiaddr>) -> Self {
let dial_backoff = ExponentialBackoffBuilder::new()
.with_initial_interval(PEERING_MIN_DIAL_SECS)
.with_multiplier(PEERING_DIAL_BACKOFF)
Expand All @@ -266,7 +331,7 @@ impl CeramicPeer {
// Expire initial future so that we dial peers immediately
let dial_future = Some(future::ready(()).boxed());
Self {
multiaddr,
addrs: addrs.into_iter().collect(),
dial_backoff,
dial_future,
}
Expand Down
Loading

0 comments on commit becc8b0

Please sign in to comment.