From 2ef3751060e52b5f06084b041cfcf22471dafec0 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Fri, 25 Oct 2024 17:03:40 +0300 Subject: [PATCH] address_store: Improve address tracking and add eviction algorithm (#250) This PR improves the transport manager's address tracking to keep a healthier view of addresses. General changes: - Fixes a bug where `listener` addresses were tracked instead of `dialing` addresses (incoming connections may be established with ephemeral ports and there's no guarantee the remote is listening on them) - `PeerIdMismatch` error coming from the noise handshake redirects the address to the appropriate peer for healthier addresses - Addresses are tracked first, regardless of the peer state to ensure we update our view of the addresses (reachable or not) Address Store changes: - The store is bounded to a maximum of 64 tracked addresses - Removing and reinsertion of addresses into the store is prone to error and instead the address store updates addresses in place - Introduces an eviction algorithm for tracking addresses with higher score ### Testing Done - added extra tests to the address store - tested with subp2p-explorer for discovering kusama This PR is part of a bigger refactor to keep track of healthier addresses. It is essentially a breakdown of https://github.com/paritytech/litep2p/pull/248 for an easier review process. --------- Signed-off-by: Alexandru Vasile Co-authored-by: Dmitry Markin --- src/transport/manager/address.rs | 204 ++++++++++++++++++++++--------- src/transport/manager/handle.rs | 83 +++++++------ src/transport/manager/mod.rs | 201 ++++++++++++++++++++---------- src/transport/manager/types.rs | 10 ++ 4 files changed, 338 insertions(+), 160 deletions(-) diff --git a/src/transport/manager/address.rs b/src/transport/manager/address.rs index e18d7c05..e712cfe1 100644 --- a/src/transport/manager/address.rs +++ b/src/transport/manager/address.rs @@ -18,12 +18,29 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::{types::ConnectionId, PeerId}; +use crate::{error::DialError, types::ConnectionId, PeerId}; use multiaddr::{Multiaddr, Protocol}; use multihash::Multihash; -use std::collections::{BinaryHeap, HashSet}; +use std::collections::{hash_map::Entry, HashMap}; + +/// Maximum number of addresses tracked for a peer. +const MAX_ADDRESSES: usize = 64; + +/// Scores for address records. +pub mod scores { + /// Score indicating that the connection was successfully established. + pub const CONNECTION_ESTABLISHED: i32 = 100i32; + + /// Score for failing to connect due to an invalid or unreachable address. + pub const CONNECTION_FAILURE: i32 = -100i32; + + /// Score for providing an invalid address. + /// + /// This address can never be reached. + pub const ADDRESS_FAILURE: i32 = i32::MIN; +} #[allow(clippy::derived_hash_with_manual_eq)] #[derive(Debug, Clone, Hash)] @@ -134,11 +151,10 @@ impl Ord for AddressRecord { /// Store for peer addresses. #[derive(Debug)] pub struct AddressStore { - //// Addresses sorted by score. - pub by_score: BinaryHeap, - - /// Addresses queryable by hashing them for faster lookup. - pub by_address: HashSet, + /// Addresses available. + pub addresses: HashMap, + /// Maximum capacity of the address store. + max_capacity: usize, } impl FromIterator for AddressStore { @@ -158,8 +174,7 @@ impl FromIterator for AddressStore { fn from_iter>(iter: T) -> Self { let mut store = AddressStore::new(); for record in iter { - store.by_address.insert(record.address.clone()); - store.by_score.push(record); + store.insert(record); } store @@ -186,52 +201,65 @@ impl AddressStore { /// Create new [`AddressStore`]. pub fn new() -> Self { Self { - by_score: BinaryHeap::new(), - by_address: HashSet::new(), + addresses: HashMap::with_capacity(MAX_ADDRESSES), + max_capacity: MAX_ADDRESSES, } } - /// Check if [`AddressStore`] is empty. - pub fn is_empty(&self) -> bool { - self.by_score.is_empty() + /// Get the score for a given error. + pub fn error_score(error: &DialError) -> i32 { + match error { + DialError::AddressError(_) => scores::ADDRESS_FAILURE, + _ => scores::CONNECTION_FAILURE, + } } - /// Check if address is already in the a - pub fn contains(&self, address: &Multiaddr) -> bool { - self.by_address.contains(address) + /// Check if [`AddressStore`] is empty. + pub fn is_empty(&self) -> bool { + self.addresses.is_empty() } - /// Insert new address record into [`AddressStore`] with default address score. - pub fn insert(&mut self, mut record: AddressRecord) { - if self.by_address.contains(record.address()) { + /// Insert the address record into [`AddressStore`] with the provided score. + /// + /// If the address is not in the store, it will be inserted. + /// Otherwise, the score and connection ID will be updated. + pub fn insert(&mut self, record: AddressRecord) { + if let Entry::Occupied(mut occupied) = self.addresses.entry(record.address.clone()) { + occupied.get_mut().update_score(record.score); return; } - record.connection_id = None; - self.by_address.insert(record.address.clone()); - self.by_score.push(record); - } - - /// Pop address with the highest score from [`AddressStore`]. - pub fn pop(&mut self) -> Option { - self.by_score.pop().map(|record| { - self.by_address.remove(&record.address); - record - }) - } - - /// Take at most `limit` `AddressRecord`s from [`AddressStore`]. - pub fn take(&mut self, limit: usize) -> Vec { - let mut records = Vec::new(); - - for _ in 0..limit { - match self.pop() { - Some(record) => records.push(record), - None => break, + // The eviction algorithm favours addresses with higher scores. + // + // This algorithm has the following implications: + // - it keeps the best addresses in the store. + // - if the store is at capacity, the worst address will be evicted. + // - an address that is not dialed yet (with score zero) will be preferred over an address + // that already failed (with negative score). + if self.addresses.len() >= self.max_capacity { + let min_record = self + .addresses + .values() + .min() + .cloned() + .expect("There is at least one element checked above; qed"); + + // The lowest score is better than the new record. + if record.score < min_record.score { + return; } + self.addresses.remove(min_record.address()); } - records + // Insert the record. + self.addresses.insert(record.address.clone(), record); + } + + /// Return the available addresses sorted by score. + pub fn addresses(&self, limit: usize) -> Vec { + let mut records = self.addresses.values().cloned().collect::>(); + records.sort_by(|lhs, rhs| rhs.score.cmp(&lhs.score)); + records.into_iter().take(limit).map(|record| record.address).collect() } } @@ -256,7 +284,7 @@ mod tests { ), rng.gen_range(1..=65535), )); - let score: i32 = rng.gen(); + let score: i32 = rng.gen_range(10..=200); AddressRecord::new( &peer, @@ -279,7 +307,7 @@ mod tests { ), rng.gen_range(1..=65535), )); - let score: i32 = rng.gen(); + let score: i32 = rng.gen_range(10..=200); AddressRecord::new( &peer, @@ -303,7 +331,7 @@ mod tests { ), rng.gen_range(1..=65535), )); - let score: i32 = rng.gen(); + let score: i32 = rng.gen_range(10..=200); AddressRecord::new( &peer, @@ -331,19 +359,22 @@ mod tests { store.insert(quic_address_record(&mut rng)); } - let known_addresses = store.by_address.len(); + let known_addresses = store.addresses.len(); assert!(known_addresses >= 3); - let taken = store.take(known_addresses - 2); + let taken = store.addresses(known_addresses - 2); assert_eq!(known_addresses - 2, taken.len()); assert!(!store.is_empty()); let mut prev: Option = None; - for record in taken { - assert!(!store.contains(record.address())); + for address in taken { + // Addresses are still in the store. + assert!(store.addresses.contains_key(&address)); + + let record = store.addresses.get(&address).unwrap().clone(); if let Some(previous) = prev { - assert!(previous.score > record.score); + assert!(previous.score >= record.score); } prev = Some(record); @@ -359,18 +390,19 @@ mod tests { store.insert(ws_address_record(&mut rng)); store.insert(quic_address_record(&mut rng)); - assert_eq!(store.by_address.len(), 3); + assert_eq!(store.addresses.len(), 3); - let taken = store.take(8usize); + let taken = store.addresses(8usize); assert_eq!(taken.len(), 3); - assert!(store.is_empty()); let mut prev: Option = None; for record in taken { + let record = store.addresses.get(&record).unwrap().clone(); + if prev.is_none() { prev = Some(record); } else { - assert!(prev.unwrap().score > record.score); + assert!(prev.unwrap().score >= record.score); prev = Some(record); } } @@ -401,10 +433,9 @@ mod tests { .collect::>(); store.extend(records); - for record in store.by_score { + for record in store.addresses.values().cloned() { let stored = cloned.get(record.address()).unwrap(); assert_eq!(stored.score(), record.score()); - assert_eq!(stored.connection_id(), record.connection_id()); assert_eq!(stored.address(), record.address()); } } @@ -433,11 +464,68 @@ mod tests { let cloned = records.iter().cloned().collect::>(); store.extend(records.iter().map(|(_, record)| record)); - for record in store.by_score { + for record in store.addresses.values().cloned() { let stored = cloned.get(record.address()).unwrap(); assert_eq!(stored.score(), record.score()); - assert_eq!(stored.connection_id(), record.connection_id()); assert_eq!(stored.address(), record.address()); } } + + #[test] + fn insert_record() { + let mut store = AddressStore::new(); + let mut rng = rand::thread_rng(); + + let mut record = tcp_address_record(&mut rng); + record.score = 10; + + store.insert(record.clone()); + + assert_eq!(store.addresses.len(), 1); + assert_eq!(store.addresses.get(record.address()).unwrap(), &record); + + // This time the record is updated. + store.insert(record.clone()); + + assert_eq!(store.addresses.len(), 1); + let store_record = store.addresses.get(record.address()).unwrap(); + assert_eq!(store_record.score, record.score * 2); + } + + #[test] + fn evict_on_capacity() { + let mut store = AddressStore { + addresses: HashMap::new(), + max_capacity: 2, + }; + + let mut rng = rand::thread_rng(); + let mut first_record = tcp_address_record(&mut rng); + first_record.score = scores::CONNECTION_ESTABLISHED; + let mut second_record = ws_address_record(&mut rng); + second_record.score = 0; + + store.insert(first_record.clone()); + store.insert(second_record.clone()); + + assert_eq!(store.addresses.len(), 2); + + // We have better addresses, ignore this one. + let mut third_record = quic_address_record(&mut rng); + third_record.score = scores::CONNECTION_FAILURE; + store.insert(third_record.clone()); + assert_eq!(store.addresses.len(), 2); + assert!(store.addresses.contains_key(first_record.address())); + assert!(store.addresses.contains_key(second_record.address())); + + // Evict the address with the lowest score. + // Store contains scores: [100, 0]. + let mut fourth_record = quic_address_record(&mut rng); + fourth_record.score = 1; + store.insert(fourth_record.clone()); + + assert_eq!(store.addresses.len(), 2); + assert!(store.addresses.contains_key(first_record.address())); + assert!(store.addresses.contains_key(fourth_record.address())); + } } diff --git a/src/transport/manager/handle.rs b/src/transport/manager/handle.rs index 0ded6406..937ebc3e 100644 --- a/src/transport/manager/handle.rs +++ b/src/transport/manager/handle.rs @@ -180,51 +180,62 @@ impl TransportManagerHandle { peer: &PeerId, addresses: impl Iterator, ) -> usize { - let mut peers = self.peers.write(); - let addresses = addresses - .filter_map(|address| { - (self.supported_transport(&address) && !self.is_local_address(&address)) - .then_some(AddressRecord::from_multiaddr(address)?) - }) - .collect::>(); - - // if all of the added addresses belonged to unsupported transports, exit early - let num_added = addresses.len(); - if num_added == 0 { - tracing::debug!( - target: LOG_TARGET, - ?peer, - "didn't add any addresses for peer because transport is not supported", - ); + let mut peer_addresses = HashSet::new(); - return 0usize; + for address in addresses { + // There is not supported transport configured that can dial this address. + if !self.supported_transport(&address) { + continue; + } + if self.is_local_address(&address) { + continue; + } + + // Check the peer ID if present. + if let Some(Protocol::P2p(multihash)) = address.iter().last() { + // This can correspond to the provided peerID or to a different one. + if multihash != *peer.as_ref() { + tracing::warn!( + target: LOG_TARGET, + ?peer, + ?address, + "Refusing to add known address that corresponds to a different peer ID", + ); + + continue; + } + + peer_addresses.insert(address); + } else { + // Add the provided peer ID to the address. + let address = address.with(Protocol::P2p(multihash::Multihash::from(peer.clone()))); + peer_addresses.insert(address); + } } + let num_added = peer_addresses.len(); + tracing::trace!( target: LOG_TARGET, ?peer, - ?addresses, + ?peer_addresses, "add known addresses", ); - match peers.get_mut(peer) { - Some(context) => - for record in addresses { - if !context.addresses.contains(record.address()) { - context.addresses.insert(record); - } - }, - None => { - peers.insert( - *peer, - PeerContext { - state: PeerState::Disconnected { dial_record: None }, - addresses: AddressStore::from_iter(addresses), - secondary_connection: None, - }, - ); - } - } + let mut peers = self.peers.write(); + let entry = peers.entry(*peer).or_insert_with(|| PeerContext { + state: PeerState::Disconnected { dial_record: None }, + addresses: AddressStore::new(), + secondary_connection: None, + }); + + // All addresses should be valid at this point, since the peer ID was either added or + // double checked. + entry.addresses.extend( + peer_addresses + .into_iter() + .filter_map(|addr| AddressRecord::from_multiaddr(addr)), + ); num_added } diff --git a/src/transport/manager/mod.rs b/src/transport/manager/mod.rs index 6816bbf2..ce17b1c8 100644 --- a/src/transport/manager/mod.rs +++ b/src/transport/manager/mod.rs @@ -37,6 +37,7 @@ use crate::{ BandwidthSink, PeerId, }; +use address::scores; use futures::{Stream, StreamExt}; use indexmap::IndexMap; use multiaddr::{Multiaddr, Protocol}; @@ -435,7 +436,7 @@ impl TransportManager { let PeerContext { state, secondary_connection, - mut addresses, + addresses, } = match peers.remove(&peer) { None => return Err(Error::PeerDoesntExist(peer)), Some( @@ -482,9 +483,9 @@ impl TransportManager { } let mut records: HashMap<_, _> = addresses - .take(limit) + .addresses(limit) .into_iter() - .map(|record| (record.address().clone(), record)) + .map(|address| (address.clone(), AddressRecord::new(&peer, address, 0, None))) .collect(); if records.is_empty() { @@ -666,10 +667,7 @@ impl TransportManager { Entry::Occupied(occupied) => { let context = occupied.into_mut(); - // For a better address tacking, see: - // https://github.com/paritytech/litep2p/issues/180 - // - // TODO: context.addresses.insert(record.clone()); + context.addresses.insert(record.clone()); tracing::debug!( target: LOG_TARGET, @@ -704,11 +702,13 @@ impl TransportManager { } } Entry::Vacant(vacant) => { + let mut addresses = AddressStore::new(); + addresses.insert(record.clone()); vacant.insert(PeerContext { state: PeerState::Dialing { record: record.clone(), }, - addresses: AddressStore::new(), + addresses, secondary_connection: None, }); } @@ -724,6 +724,29 @@ impl TransportManager { Ok(()) } + // Update the address on a dial failure. + fn update_address_on_dial_failure(&mut self, address: Multiaddr, error: &DialError) { + let mut peers = self.peers.write(); + + let score = AddressStore::error_score(error); + + // Extract the peer ID at this point to give `NegotiationError::PeerIdMismatch` a chance to + // propagate. + let peer_id = match address.iter().last() { + Some(Protocol::P2p(hash)) => PeerId::from_multihash(hash).ok(), + _ => None, + }; + let Some(peer_id) = peer_id else { + return; + }; + + // We need a valid context for this peer to keep track of failed addresses. + let context = peers.entry(peer_id).or_insert_with(|| PeerContext::default()); + context + .addresses + .insert(AddressRecord::new(&peer_id, address.clone(), score, None)); + } + /// Handle dial failure. fn on_dial_failure(&mut self, connection_id: ConnectionId) -> crate::Result<()> { let peer = self.pending_connections.remove(&connection_id).ok_or_else(|| { @@ -1002,11 +1025,36 @@ impl TransportManager { } } + /// Update the address on a connection established. + fn update_address_on_connection_established(&mut self, peer: PeerId, endpoint: &Endpoint) { + // The connection can be inbound or outbound. + // For the inbound connection type, in most cases, the remote peer dialed + // with an ephemeral port which it might not be listening on. + // Therefore, we only insert the address into the store if we're the dialer. + if endpoint.is_listener() { + return; + } + + let mut peers = self.peers.write(); + + let record = AddressRecord::new( + &peer, + endpoint.address().clone(), + scores::CONNECTION_ESTABLISHED, + None, + ); + + let context = peers.entry(peer).or_insert_with(|| PeerContext::default()); + context.addresses.insert(record); + } + fn on_connection_established( &mut self, peer: PeerId, endpoint: &Endpoint, ) -> crate::Result { + self.update_address_on_connection_established(peer, &endpoint); + if let Some(dialed_peer) = self.pending_connections.remove(&endpoint.connection_id()) { if dialed_peer != peer { tracing::warn!( @@ -1052,19 +1100,6 @@ impl TransportManager { "secondary connection already exists, ignoring connection", ); - // insert address into the store only if we're the dialer - // - // if we're the listener, remote might have dialed with an ephemeral port - // which it might not be listening, making this address useless - if endpoint.is_listener() { - context.addresses.insert(AddressRecord::new( - &peer, - endpoint.address().clone(), - SCORE_CONNECT_SUCCESS, - None, - )) - } - return Ok(ConnectionEstablishedResult::Reject); } None => match dial_record.take() { @@ -1541,6 +1576,11 @@ impl TransportManager { "failed to dial peer", ); + // Update the addresses on dial failure regardless of the + // internal peer context state. This ensures a robust address tracking + // while taking into account the error type. + self.update_address_on_dial_failure(address.clone(), &error); + if let Ok(()) = self.on_dial_failure(connection_id) { match address.iter().last() { Some(Protocol::P2p(hash)) => match PeerId::from_multihash(hash) { @@ -1686,6 +1726,10 @@ impl TransportManager { } } TransportEvent::OpenFailure { connection_id, errors } => { + for (address, error) in &errors { + self.update_address_on_dial_failure(address.clone(), error); + } + match self.on_open_failure(transport, connection_id) { Err(error) => tracing::debug!( target: LOG_TARGET, @@ -2270,7 +2314,7 @@ mod tests { match &peer.state { PeerState::Connected { dial_record, .. } => { assert!(dial_record.is_none()); - assert!(peer.addresses.contains(&dial_address)); + assert!(peer.addresses.addresses.contains_key(&dial_address)); } state => panic!("invalid state: {state:?}"), } @@ -2357,7 +2401,7 @@ mod tests { PeerState::Disconnected { dial_record: None, .. } => { - assert!(peer.addresses.contains(&dial_address)); + assert!(peer.addresses.addresses.contains_key(&dial_address)); } state => panic!("invalid state: {state:?}"), } @@ -2492,7 +2536,7 @@ mod tests { let established_result = manager .on_connection_established( peer, - &Endpoint::listener(address1, ConnectionId::from(0usize)), + &Endpoint::dialer(address1.clone(), ConnectionId::from(0usize)), ) .unwrap(); assert_eq!(established_result, ConnectionEstablishedResult::Accept); @@ -2552,15 +2596,21 @@ mod tests { PeerState::Connected { dial_record: None, .. } => { - let seconary_connection = peer.secondary_connection.as_ref().unwrap(); - assert_eq!(seconary_connection.address(), &address2); - assert_eq!(seconary_connection.score(), SCORE_CONNECT_SUCCESS); - assert!(peer.addresses.contains(&address3)); + assert_eq!( + peer.secondary_connection.as_ref().unwrap().address(), + &address2 + ); + // Endpoint::listener addresses are not tracked. + assert!(!peer.addresses.addresses.contains_key(&address2)); + assert!(!peer.addresses.addresses.contains_key(&address3)); + assert_eq!( + peer.addresses.addresses.get(&address1).unwrap().score(), + scores::CONNECTION_ESTABLISHED + ); } state => panic!("invalid state: {state:?}"), } } - #[tokio::test] async fn secondary_connection_with_different_dial_endpoint_is_rejected() { let _ = tracing_subscriber::fmt() @@ -2761,7 +2811,7 @@ mod tests { record, } => { assert!(context.secondary_connection.is_none()); - assert!(context.addresses.contains(&address2)); + assert!(context.addresses.addresses.contains_key(&address2)); assert_eq!(record.connection_id(), &Some(ConnectionId::from(0usize))); } state => panic!("invalid state: {state:?}"), @@ -2865,7 +2915,7 @@ mod tests { record, } => { assert!(context.secondary_connection.is_none()); - assert!(context.addresses.contains(&address1)); + assert!(context.addresses.addresses.contains_key(&address1)); assert_eq!(record.connection_id(), &Some(ConnectionId::from(1usize))); } state => panic!("invalid state: {state:?}"), @@ -2914,7 +2964,7 @@ mod tests { let emit_event = manager .on_connection_established( peer, - &Endpoint::listener(address1, ConnectionId::from(0usize)), + &Endpoint::listener(address1.clone(), ConnectionId::from(0usize)), ) .unwrap(); assert!(std::matches!( @@ -2922,19 +2972,23 @@ mod tests { ConnectionEstablishedResult::Accept )); + // The address1 should be ignored because it is an inbound connection + // initiated from an ephemeral port. + let peers = manager.peers.read(); + let context = peers.get(&peer).unwrap(); + assert!(!context.addresses.addresses.contains_key(&address1)); + drop(peers); + // verify that the peer state is `Connected` with no seconary connection { let peers = manager.peers.read(); let peer = peers.get(&peer).unwrap(); match &peer.state { - PeerState::Connected { - dial_record: None, .. - } => { - assert!(peer.secondary_connection.is_none()); - } + PeerState::Connected { .. } => {} state => panic!("invalid state: {state:?}"), } + assert!(peer.secondary_connection.is_none()); } // second connection is established, verify that the seconary connection is tracked @@ -2949,19 +3003,23 @@ mod tests { ConnectionEstablishedResult::Accept )); + // Ensure we keep track of this address. + let peers = manager.peers.read(); + let context = peers.get(&peer).unwrap(); + assert!(context.addresses.addresses.contains_key(&address2)); + drop(peers); + let peers = manager.peers.read(); let context = peers.get(&peer).unwrap(); match &context.state { - PeerState::Connected { - dial_record: None, .. - } => { - let seconary_connection = context.secondary_connection.as_ref().unwrap(); - assert_eq!(seconary_connection.address(), &address2); - assert_eq!(seconary_connection.score(), SCORE_CONNECT_SUCCESS); - } + PeerState::Connected { .. } => {} state => panic!("invalid state: {state:?}"), } + assert_eq!( + context.secondary_connection.as_ref().unwrap().address(), + &address2, + ); drop(peers); // third connection is established, verify that it's discarded @@ -2978,7 +3036,9 @@ mod tests { let peers = manager.peers.read(); let context = peers.get(&peer).unwrap(); - assert!(context.addresses.contains(&address3)); + // The tertiary connection should be ignored because it is an inbound connection + // initiated from an ephemeral port. + assert!(!context.addresses.addresses.contains_key(&address3)); drop(peers); // close the tertiary connection that was ignored @@ -2990,18 +3050,20 @@ mod tests { let context = peers.get(&peer).unwrap(); match &context.state { - PeerState::Connected { - dial_record: None, .. - } => { - let seconary_connection = context.secondary_connection.as_ref().unwrap(); - assert_eq!(seconary_connection.address(), &address2); - assert_eq!(seconary_connection.score(), SCORE_CONNECT_SUCCESS); - } + PeerState::Connected { .. } => {} state => panic!("invalid state: {state:?}"), } + assert_eq!( + context.secondary_connection.as_ref().unwrap().address(), + &address2 + ); + assert_eq!( + context.addresses.addresses.get(&address2).unwrap().score(), + scores::CONNECTION_ESTABLISHED + ); + drop(peers); } - #[tokio::test] #[cfg(debug_assertions)] #[should_panic] @@ -3548,7 +3610,7 @@ mod tests { secondary_connection, addresses, } => { - assert!(!addresses.contains(record.address())); + assert!(!addresses.addresses.contains_key(record.address())); assert!(dial_record.is_none()); assert!(secondary_connection.is_none()); assert_eq!(record.address(), &dial_address); @@ -3632,16 +3694,13 @@ mod tests { let peers = manager.peers.read(); match peers.get(&peer).unwrap() { PeerContext { - state: - PeerState::Connected { - record, - dial_record, - }, + state: PeerState::Connected { record, .. }, secondary_connection, addresses, } => { - assert!(addresses.is_empty()); - assert!(dial_record.is_none()); + // Saved from the dial attempt. + assert_eq!(addresses.addresses.get(&dial_address).unwrap().score(), 0); + assert!(secondary_connection.is_none()); assert_eq!(record.address(), &dial_address); assert_eq!(record.connection_id(), &Some(connection_id)); @@ -4025,7 +4084,7 @@ mod tests { } #[tokio::test] - async fn do_not_overwrite_dial_addresses() { + async fn persist_dial_addresses() { let _ = tracing_subscriber::fmt() .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) .try_init(); @@ -4069,8 +4128,11 @@ mod tests { state => panic!("invalid state: {state:?}"), } - // The address is not saved yet. - assert!(!peer_context.addresses.contains(&dial_address)); + // The address is saved for future dials. + assert_eq!( + peer_context.addresses.addresses.get(&dial_address).unwrap().score(), + 0 + ); } let second_address = Multiaddr::empty() @@ -4094,8 +4156,15 @@ mod tests { state => panic!("invalid state: {state:?}"), } - assert!(!peer_context.addresses.contains(&dial_address)); - assert!(!peer_context.addresses.contains(&second_address)); + // The address is still saved, even if a second dial is not initiated. + assert_eq!( + peer_context.addresses.addresses.get(&dial_address).unwrap().score(), + 0 + ); + assert_eq!( + peer_context.addresses.addresses.get(&second_address).unwrap().score(), + 0 + ); } } diff --git a/src/transport/manager/types.rs b/src/transport/manager/types.rs index b367a086..2a853606 100644 --- a/src/transport/manager/types.rs +++ b/src/transport/manager/types.rs @@ -106,3 +106,13 @@ pub struct PeerContext { /// Known addresses of peer. pub addresses: AddressStore, } + +impl Default for PeerContext { + fn default() -> Self { + Self { + state: PeerState::Disconnected { dial_record: None }, + secondary_connection: None, + addresses: AddressStore::new(), + } + } +}