Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

peer_state: Robust state machine transitions #251

Merged
merged 46 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
59dbbf8
address: Add score constants to the address store
lexnv Sep 24, 2024
ae7799a
address: Convert DialError to appropriate score
lexnv Sep 24, 2024
ac5d731
address: Implement eviction algorithm for the AddressStore
lexnv Sep 24, 2024
a2356ec
manager: Keep track of the address for future dials
lexnv Sep 24, 2024
5355e85
manager: Make add_known_address more robust to track multiple peer addrs
lexnv Sep 24, 2024
ecb436b
manager: Track addresses better for dial and open failure
lexnv Sep 24, 2024
bf066d1
manager: Update addresses on connection established
lexnv Sep 24, 2024
b052c5d
manager/handle: Keep lock for shorter time
lexnv Sep 24, 2024
da52056
address/tests: Adjust testing
lexnv Sep 24, 2024
6c7f399
address/tests: Add tests for eviction and insertion
lexnv Sep 24, 2024
5b88ea4
manager/tests: Adjust testing to new address store interface
lexnv Sep 24, 2024
d575812
peer_state: Introduce connection record without address score
lexnv Sep 24, 2024
6494256
peer_state: Refactor peerstate for minimal state machine
lexnv Sep 24, 2024
7029015
peer_state: Implement peer state transitions isolated from manager
lexnv Sep 24, 2024
3f4cdc9
peer_state/tests: Add tests for peerstate transitions
lexnv Sep 24, 2024
6bf358b
address_store: Remove connection ID from the store
lexnv Sep 24, 2024
c9205df
types: Remove secondary connection ID and use new peerstate
lexnv Sep 24, 2024
25a5c5e
manager: Handle opening addresses generically
lexnv Sep 24, 2024
a792d0e
manager: Refactor dialing on multiple addresses
lexnv Sep 24, 2024
c7c5ed4
manager: Refactor dial addresses
lexnv Sep 24, 2024
b137d0a
manager: Refactor on dial failure
lexnv Sep 24, 2024
f4d46a9
manager: Refacotr on connection closed
lexnv Sep 24, 2024
a795fd4
manager: Refactor on connection established
lexnv Sep 24, 2024
7c8242b
manager: Refactor on connection opened
lexnv Sep 24, 2024
f7e6d88
manager: Refactor on open failure
lexnv Sep 24, 2024
10cb1ed
manager/tests: Adjust testing to the new interface
lexnv Sep 24, 2024
728c563
manager/handle: Use new peerstate interface
lexnv Sep 24, 2024
450c109
Update src/transport/manager/peer_state.rs
lexnv Oct 25, 2024
16e1294
Update src/transport/manager/peer_state.rs
lexnv Oct 25, 2024
ebe8853
Update src/transport/manager/peer_state.rs
lexnv Oct 25, 2024
2a33ab5
Update src/transport/manager/peer_state.rs
lexnv Oct 25, 2024
caaa3f1
Update src/transport/manager/peer_state.rs
lexnv Oct 25, 2024
f2b4285
Update src/transport/manager/peer_state.rs
lexnv Oct 25, 2024
4614c7d
Update src/transport/manager/peer_state.rs
lexnv Oct 25, 2024
c8180ab
Update src/transport/manager/mod.rs
lexnv Oct 25, 2024
2105dd2
peer_state: Fix clippy
lexnv Oct 25, 2024
896c47b
peer_state: Address other possible states
lexnv Oct 25, 2024
4a09f3f
peer_state: Warn if connection opened from other sources
lexnv Oct 25, 2024
8fa12bb
peerstate: Add warns when opening -> connected
lexnv Oct 25, 2024
22406c6
peer_state: Warn about peerId mismatch
lexnv Oct 25, 2024
764b769
manager: Replace assert with warn
lexnv Oct 25, 2024
14d7f0f
Merge remote-tracking branch 'origin/master' into lexnv/improve-peer-…
lexnv Oct 25, 2024
b7ba361
peerstate: Downgrade warn to trace
lexnv Oct 25, 2024
6e5868e
manager: Rename `Self::open_addresses` to
lexnv Oct 28, 2024
c2ee9b8
manager: Link todo issue
lexnv Oct 28, 2024
fc7dea1
peer_state: Use peer as state to ensure peerID
lexnv Oct 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 5 additions & 31 deletions src/transport/manager/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::{error::DialError, types::ConnectionId, PeerId};
use crate::{error::DialError, PeerId};

use multiaddr::{Multiaddr, Protocol};
use multihash::Multihash;
Expand Down Expand Up @@ -50,9 +50,6 @@ pub struct AddressRecord {

/// Address.
address: Multiaddr,

/// Connection ID, if specified.
connection_id: Option<ConnectionId>,
}

impl AsRef<Multiaddr> for AddressRecord {
Expand All @@ -64,12 +61,7 @@ impl AsRef<Multiaddr> for AddressRecord {
impl AddressRecord {
/// Create new `AddressRecord` and if `address` doesn't contain `P2p`,
/// append the provided `PeerId` to the address.
pub fn new(
peer: &PeerId,
address: Multiaddr,
score: i32,
connection_id: Option<ConnectionId>,
) -> Self {
pub fn new(peer: &PeerId, address: Multiaddr, score: i32) -> Self {
let address = if !std::matches!(address.iter().last(), Some(Protocol::P2p(_))) {
address.with(Protocol::P2p(
Multihash::from_bytes(&peer.to_bytes()).expect("valid peer id"),
Expand All @@ -78,11 +70,7 @@ impl AddressRecord {
address
};

Self {
address,
score,
connection_id,
}
Self { address, score }
}

/// Create `AddressRecord` from `Multiaddr`.
Expand All @@ -97,7 +85,6 @@ impl AddressRecord {
Some(AddressRecord {
address,
score: 0i32,
connection_id: None,
})
}

Expand All @@ -112,20 +99,10 @@ impl AddressRecord {
&self.address
}

/// Get connection ID.
pub fn connection_id(&self) -> &Option<ConnectionId> {
&self.connection_id
}

/// Update score of an address.
pub fn update_score(&mut self, score: i32) {
self.score = self.score.saturating_add(score);
}

/// Set `ConnectionId` for the [`AddressRecord`].
pub fn set_connection_id(&mut self, connection_id: ConnectionId) {
self.connection_id = Some(connection_id);
}
}

impl PartialEq for AddressRecord {
Expand Down Expand Up @@ -161,8 +138,8 @@ impl FromIterator<Multiaddr> for AddressStore {
fn from_iter<T: IntoIterator<Item = Multiaddr>>(iter: T) -> Self {
let mut store = AddressStore::new();
for address in iter {
if let Some(address) = AddressRecord::from_multiaddr(address) {
store.insert(address);
if let Some(record) = AddressRecord::from_multiaddr(address) {
store.insert(record);
}
}

Expand Down Expand Up @@ -292,7 +269,6 @@ mod tests {
.with(Protocol::from(address.ip()))
.with(Protocol::Tcp(address.port())),
score,
None,
)
}

Expand All @@ -316,7 +292,6 @@ mod tests {
.with(Protocol::Tcp(address.port()))
.with(Protocol::Ws(std::borrow::Cow::Owned("/".to_string()))),
score,
None,
)
}

Expand All @@ -340,7 +315,6 @@ mod tests {
.with(Protocol::Udp(address.port()))
.with(Protocol::QuicV1),
score,
None,
)
}

Expand Down
103 changes: 44 additions & 59 deletions src/transport/manager/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ use crate::{
executor::Executor,
protocol::ProtocolSet,
transport::manager::{
address::{AddressRecord, AddressStore},
types::{PeerContext, PeerState, SupportedTransport},
address::AddressRecord,
peer_state::StateDialResult,
types::{PeerContext, SupportedTransport},
ProtocolContext, TransportManagerEvent, LOG_TARGET,
},
types::{protocol::ProtocolName, ConnectionId},
Expand Down Expand Up @@ -223,11 +224,7 @@ impl TransportManagerHandle {
);

let mut peers = self.peers.write();
let entry = peers.entry(*peer).or_insert_with(|| PeerContext {
state: PeerState::Disconnected { dial_record: None },
addresses: AddressStore::new(),
secondary_connection: None,
});
let entry = peers.entry(*peer).or_insert_with(|| PeerContext::default());

// All addresses should be valid at this point, since the peer ID was either added or
// double checked.
Expand All @@ -249,36 +246,21 @@ impl TransportManagerHandle {
}

{
match self.peers.read().get(peer) {
Some(PeerContext {
state: PeerState::Connected { .. },
..
}) => return Err(ImmediateDialError::AlreadyConnected),
Some(PeerContext {
state: PeerState::Disconnected { dial_record },
addresses,
..
}) => {
if addresses.is_empty() {
return Err(ImmediateDialError::NoAddressAvailable);
}

// peer is already being dialed, don't dial again until the first dial concluded
if dial_record.is_some() {
tracing::debug!(
target: LOG_TARGET,
?peer,
?dial_record,
"peer is aready being dialed",
);
return Ok(());
}
}
Some(PeerContext {
state: PeerState::Dialing { .. } | PeerState::Opening { .. },
..
}) => return Ok(()),
None => return Err(ImmediateDialError::NoAddressAvailable),
let peers = self.peers.read();
let Some(PeerContext { state, addresses }) = peers.get(peer) else {
return Err(ImmediateDialError::NoAddressAvailable);
};

match state.can_dial() {
StateDialResult::AlreadyConnected =>
return Err(ImmediateDialError::AlreadyConnected),
StateDialResult::DialingInProgress => return Ok(()),
StateDialResult::Ok => {}
};

// Check if we have enough addresses to dial.
if addresses.is_empty() {
return Err(ImmediateDialError::NoAddressAvailable);
}
}

Expand Down Expand Up @@ -338,6 +320,11 @@ impl TransportHandle {

#[cfg(test)]
mod tests {
use crate::transport::manager::{
address::AddressStore,
peer_state::{ConnectionRecord, PeerState},
};

use super::*;
use multihash::Multihash;
use parking_lot::lock_api::RwLock;
Expand Down Expand Up @@ -454,16 +441,16 @@ mod tests {
peer,
PeerContext {
state: PeerState::Connected {
record: AddressRecord::from_multiaddr(
Multiaddr::empty()
record: ConnectionRecord {
address: Multiaddr::empty()
.with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
.with(Protocol::Tcp(8888))
.with(Protocol::P2p(Multihash::from(peer))),
)
.unwrap(),
dial_record: None,
connection_id: ConnectionId::from(0),
},
secondary: None,
},
secondary_connection: None,

addresses: AddressStore::from_iter(
vec![Multiaddr::empty()
.with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
Expand Down Expand Up @@ -497,15 +484,15 @@ mod tests {
peer,
PeerContext {
state: PeerState::Dialing {
record: AddressRecord::from_multiaddr(
Multiaddr::empty()
dial_record: ConnectionRecord {
address: Multiaddr::empty()
.with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
.with(Protocol::Tcp(8888))
.with(Protocol::P2p(Multihash::from(peer))),
)
.unwrap(),
connection_id: ConnectionId::from(0),
},
},
secondary_connection: None,

addresses: AddressStore::from_iter(
vec![Multiaddr::empty()
.with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
Expand Down Expand Up @@ -539,7 +526,6 @@ mod tests {
peer,
PeerContext {
state: PeerState::Disconnected { dial_record: None },
secondary_connection: None,
addresses: AddressStore::new(),
},
);
Expand All @@ -565,17 +551,16 @@ mod tests {
peer,
PeerContext {
state: PeerState::Disconnected {
dial_record: Some(
AddressRecord::from_multiaddr(
Multiaddr::empty()
.with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
.with(Protocol::Tcp(8888))
.with(Protocol::P2p(Multihash::from(peer))),
)
.unwrap(),
),
dial_record: Some(ConnectionRecord::new(
peer,
Multiaddr::empty()
.with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
.with(Protocol::Tcp(8888))
.with(Protocol::P2p(Multihash::from(peer))),
ConnectionId::from(0),
)),
},
secondary_connection: None,

addresses: AddressStore::from_iter(
vec![Multiaddr::empty()
.with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
Expand Down
Loading