From c0e09fefffaee7666c55c36025c039026109f21d Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Mon, 18 Dec 2023 13:41:31 +0400 Subject: [PATCH] fix(p2p/messaging): single stream per connection (#845) Description --- Single messaging stream per connection If another stream is requested while one is pending, the pending one is passed back to caller Motivation and Context --- Previously each message sent could cause a new stream to be requested. This resulted in the permitted amount of streams being exhausted. The stream could be used to send a single message and be dropped. This PR checks for pending dial requests or streams and returns the message sink for those. Streams are kept alive so that they may be reused without the overhead of creating a new one each time. How Has This Been Tested? --- 4-VN committee and small 100tx stress test What process can a PR reviewer use to test or verify this change? --- You should not see `Dropping outbound stream because we are at capacity` messages Breaking Changes --- - [x] None - [ ] Requires data directory to be deleted - [ ] Other - Please specify --------- Co-authored-by: stringhandler --- dan_layer/storage/Cargo.toml | 2 +- networking/core/src/worker.rs | 39 ++- networking/libp2p-messaging/src/behaviour.rs | 278 +++++++++++++++---- networking/libp2p-messaging/src/event.rs | 9 +- networking/libp2p-messaging/src/handler.rs | 51 ++-- networking/libp2p-messaging/src/stream.rs | 27 +- 6 files changed, 300 insertions(+), 106 deletions(-) diff --git a/dan_layer/storage/Cargo.toml b/dan_layer/storage/Cargo.toml index f5a058a2b..10197131e 100644 --- a/dan_layer/storage/Cargo.toml +++ b/dan_layer/storage/Cargo.toml @@ -15,7 +15,7 @@ tari_dan_common_types = { workspace = true } # Shard store deps tari_engine_types = { workspace = true } tari_transaction = { workspace = true } -tari_core = { workspace = true } +tari_core = { workspace = true, default-features = true } tari_mmr = { workspace = true } tari_crypto = { workspace = true } diff --git a/networking/core/src/worker.rs b/networking/core/src/worker.rs index 58a0428db..9df0c9225 100644 --- a/networking/core/src/worker.rs +++ b/networking/core/src/worker.rs @@ -206,23 +206,15 @@ where peer, message, reply_tx, - } => { - match self - .swarm - .behaviour_mut() - .messaging - .enqueue_message(peer, message) - .await - { - Ok(_) => { - debug!(target: LOG_TARGET, "đŸ“ĸ Queued message to peer {}", peer); - let _ignore = reply_tx.send(Ok(())); - }, - Err(err) => { - debug!(target: LOG_TARGET, "🚨 Failed to queue message to peer {}: {}", peer, err); - let _ignore = reply_tx.send(Err(err.into())); - }, - } + } => match self.swarm.behaviour_mut().messaging.send_message(peer, message).await { + Ok(_) => { + debug!(target: LOG_TARGET, "đŸ“ĸ Queued message to peer {}", peer); + let _ignore = reply_tx.send(Ok(())); + }, + Err(err) => { + debug!(target: LOG_TARGET, "🚨 Failed to queue message to peer {}: {}", peer, err); + let _ignore = reply_tx.send(Err(err.into())); + }, }, NetworkingRequest::SendMulticast { destination, @@ -233,7 +225,7 @@ where let messaging_mut = &mut self.swarm.behaviour_mut().messaging; for peer in destination { - match messaging_mut.enqueue_message(peer, message.clone()).await { + match messaging_mut.send_message(peer, message.clone()).await { Ok(_) => {}, Err(err) => { debug!(target: LOG_TARGET, "🚨 Failed to queue message to peer {}: {}", peer, err); @@ -521,7 +513,6 @@ where debug!(target: LOG_TARGET, "ℹī¸ Messaging event: {:?}", event); }, Substream(event) => { - info!(target: LOG_TARGET, "ℹī¸ Substream event: {:?}", event); self.on_substream_event(event); }, ConnectionLimits(_) => { @@ -826,6 +817,7 @@ where stream, protocol, } => { + info!(target: LOG_TARGET, "đŸ“Ĩ substream open: peer_id={}, stream_id={}, protocol={}", peer_id, stream_id, protocol); let Some(reply) = self.pending_substream_requests.remove(&stream_id) else { debug!(target: LOG_TARGET, "No pending requests for subtream protocol {protocol} for peer {peer_id}"); return; @@ -835,6 +827,7 @@ where let _ignore = reply.send(Ok(NegotiatedSubstream::new(peer_id, protocol, stream))); }, InboundSubstreamOpen { notification } => { + info!(target: LOG_TARGET, "đŸ“Ĩ Inbound substream open: protocol={}", notification.protocol); self.substream_notifiers.notify(notification); }, InboundFailure { @@ -844,7 +837,13 @@ where } => { debug!(target: LOG_TARGET, "Inbound substream failed from peer {peer_id} with stream id {stream_id}: {error}"); }, - OutboundFailure { error, stream_id, .. } => { + OutboundFailure { + error, + stream_id, + peer_id, + .. + } => { + debug!(target: LOG_TARGET, "Outbound substream failed with peer {peer_id}, stream {stream_id}: {error}"); if let Some(waiting_reply) = self.pending_substream_requests.remove(&stream_id) { let _ignore = waiting_reply.send(Err(NetworkingError::FailedToOpenSubstream(error))); } diff --git a/networking/libp2p-messaging/src/behaviour.rs b/networking/libp2p-messaging/src/behaviour.rs index 40c8b24d6..b2e8a3b94 100644 --- a/networking/libp2p-messaging/src/behaviour.rs +++ b/networking/libp2p-messaging/src/behaviour.rs @@ -2,7 +2,8 @@ // SPDX-License-Identifier: BSD-3-Clause use std::{ - collections::{HashMap, HashSet, VecDeque}, + cmp, + collections::{HashMap, VecDeque}, task::{Context, Poll}, }; @@ -46,6 +47,8 @@ use crate::{ /// released. pub const EMPTY_QUEUE_SHRINK_THRESHOLD: usize = 100; +type MessageSinkAndStream = (MessageSink, MessageStream); + #[derive(Debug)] pub struct Behaviour where TCodec: Codec + Send + Clone + 'static @@ -55,8 +58,8 @@ where TCodec: Codec + Send + Clone + 'static pending_events: VecDeque, THandlerInEvent>>, /// The currently connected peers, their pending outbound and inbound responses and their known, /// reachable addresses, if any. - connected: HashMap>, - pending_outbound_streams: HashMap, 10>>, + connected: HashMap>, + pending_outbound_dials: HashMap>, next_outbound_stream_id: StreamId, } @@ -68,48 +71,81 @@ where TCodec: Codec + Send + Clone + 'static protocol, config, pending_events: VecDeque::new(), - pending_outbound_streams: HashMap::new(), connected: HashMap::new(), next_outbound_stream_id: StreamId::default(), + pending_outbound_dials: HashMap::default(), } } - pub async fn enqueue_message(&mut self, peer_id: PeerId, message: TCodec::Message) -> Result<(), Error> { - self.open_message_channel(peer_id).send(message).await?; + pub async fn send_message(&mut self, peer_id: PeerId, message: TCodec::Message) -> Result<(), Error> { + self.obtain_message_channel(peer_id).send(message)?; Ok(()) } - pub fn open_message_channel(&mut self, peer_id: PeerId) -> MessageSink { - let stream_id = self.next_outbound_stream_id(); - let (sink, stream) = stream::channel(stream_id, peer_id, 10); + pub fn obtain_message_channel(&mut self, peer_id: PeerId) -> MessageSink { + let stream_id = self.next_outbound_stream_id; match self.get_connections(&peer_id) { Some(connections) => { - let ix = (stream_id as usize) % connections.len(); - let conn = &mut connections[ix]; - conn.pending_streams.insert(stream_id); - let conn_id = conn.id; + // Return a currently active stream + if let Some(sink) = connections.next_active_sink() { + tracing::debug!("return a currently active stream {}", sink.stream_id()); + return sink.clone(); + } + + // Otherwise, return a pending stream + if let Some(sink) = connections.next_pending_sink() { + tracing::debug!("return a pending stream {}", sink.stream_id()); + return sink.clone(); + } + + // Otherwise, create a new stream + let (sink, stream) = stream::channel(stream_id, peer_id); + let ix = (stream_id as usize) % connections.connections.len(); + let conn_mut = &mut connections.connections[ix]; + conn_mut.stream_id = Some(stream_id); + assert!(conn_mut.pending_sink.is_none()); + assert!(conn_mut.message_sink.is_none()); + conn_mut.pending_sink = Some(sink.clone()); + + let conn_id = conn_mut.id; + tracing::debug!("create a new stream {peer_id} {stream_id}"); self.pending_events.push_back(ToSwarm::NotifyHandler { peer_id, handler: NotifyHandler::One(conn_id), event: stream, }); + + // Can't use next_outbound_stream_id() above because of multiple mutable borrows + self.next_outbound_stream_id(); + + sink }, - None => { - self.pending_events.push_back(ToSwarm::Dial { - opts: DialOpts::peer_id(peer_id).build(), - }); - self.pending_outbound_streams.entry(peer_id).or_default().push(stream); + None => match self.pending_outbound_dials.get(&peer_id) { + Some((sink, _)) => { + tracing::debug!("return a pending outbound dial {}", sink.stream_id()); + sink.clone() + }, + None => { + let stream_id = self.next_outbound_stream_id(); + tracing::debug!("create a new outbound dial {stream_id}"); + let (sink, stream) = stream::channel(stream_id, peer_id); + + self.pending_events.push_back(ToSwarm::Dial { + opts: DialOpts::peer_id(peer_id).build(), + }); + + self.pending_outbound_dials.insert(peer_id, (sink.clone(), stream)); + sink + }, }, } - - sink } fn next_outbound_stream_id(&mut self) -> StreamId { - let request_id = self.next_outbound_stream_id; + let stream_id = self.next_outbound_stream_id; self.next_outbound_stream_id = self.next_outbound_stream_id.wrapping_add(1); - request_id + stream_id } fn on_connection_closed( @@ -127,9 +163,10 @@ where TCodec: Codec + Send + Clone + 'static .expect("Expected some established connection to peer before closing."); let connection = connections + .connections .iter() .position(|c| c.id == connection_id) - .map(|p: usize| connections.remove(p)) + .map(|p: usize| connections.connections.remove(p)) .expect("Expected connection to be established before closing."); debug_assert_eq!(connections.is_empty(), remaining_established == 0); @@ -137,11 +174,11 @@ where TCodec: Codec + Send + Clone + 'static self.connected.remove(&peer_id); } - for stream_id in connection.pending_streams { + if let Some(sink) = connection.pending_sink { self.pending_events .push_back(ToSwarm::GenerateEvent(Event::InboundFailure { peer_id, - stream_id, + stream_id: sink.stream_id(), error: Error::ConnectionClosed, })); } @@ -155,7 +192,7 @@ where TCodec: Codec + Send + Clone + 'static .. } = address_change; if let Some(connections) = self.connected.get_mut(&peer_id) { - for connection in connections { + for connection in &mut connections.connections { if connection.id == connection_id { connection.remote_address = Some(new.get_remote_address().clone()); return; @@ -172,15 +209,13 @@ where TCodec: Codec + Send + Clone + 'static // only created when a peer is not connected when a request is made. // Thus these requests must be considered failed, even if there is // another, concurrent dialing attempt ongoing. - if let Some(pending) = self.pending_outbound_streams.remove(&peer) { - for stream in pending { - self.pending_events - .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure { - peer_id: peer, - stream_id: stream.stream_id(), - error: Error::DialFailure, - })); - } + if let Some((_sink, stream)) = self.pending_outbound_dials.remove(&peer) { + self.pending_events + .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure { + peer_id: peer, + stream_id: stream.stream_id(), + error: Error::DialFailure, + })); } } } @@ -194,17 +229,16 @@ where TCodec: Codec + Send + Clone + 'static ) { let mut connection = Connection::new(connection_id, remote_address); - if let Some(pending_streams) = self.pending_outbound_streams.remove(&peer_id) { - for stream in pending_streams { - connection.pending_streams.insert(stream.stream_id()); - handler.on_behaviour_event(stream); - } + if let Some((sink, stream)) = self.pending_outbound_dials.remove(&peer_id) { + connection.stream_id = Some(stream.stream_id()); + connection.pending_sink = Some(sink); + handler.on_behaviour_event(stream); } self.connected.entry(peer_id).or_default().push(connection); } - fn get_connections(&mut self, peer_id: &PeerId) -> Option<&mut SmallVec> { + fn get_connections(&mut self, peer_id: &PeerId) -> Option<&mut Connections> { self.connected.get_mut(peer_id).filter(|c| !c.is_empty()) } } @@ -252,22 +286,42 @@ where TCodec: Codec + Send + Clone + 'static fn on_connection_handler_event( &mut self, - _peer_id: PeerId, + peer_id: PeerId, _connection_id: ConnectionId, event: THandlerOutEvent, ) { + match &event { + Event::InboundFailure { stream_id, .. } | + Event::OutboundFailure { stream_id, .. } | + Event::StreamClosed { stream_id, .. } => { + if let Some(connections) = self.connected.get_mut(&peer_id) { + for connection in &mut connections.connections { + if connection.stream_id == Some(*stream_id) { + connection.stream_id = None; + connection.pending_sink = None; + connection.message_sink = None; + break; + } + } + } + }, + Event::OutboundStreamOpened { stream_id, .. } => { + if let Some(connections) = self.connected.get_mut(&peer_id) { + for connection in &mut connections.connections { + if connection.stream_id == Some(*stream_id) { + connection.message_sink = connection.pending_sink.take(); + break; + } + } + } + }, + _ => {}, + } self.pending_events.push_back(ToSwarm::GenerateEvent(event)); } fn poll(&mut self, _cx: &mut Context<'_>) -> Poll>> { if let Some(event) = self.pending_events.pop_front() { - if let ToSwarm::GenerateEvent(Event::StreamClosed { stream_id, peer_id, .. }) = &event { - if let Some(conn) = self.connected.get_mut(peer_id) { - for connection in conn { - connection.pending_streams.remove(stream_id); - } - } - } return Poll::Ready(event); } if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD { @@ -280,18 +334,136 @@ where TCodec: Codec + Send + Clone + 'static /// Internal information tracked for an established connection. #[derive(Debug)] -struct Connection { +struct Connection { id: ConnectionId, + stream_id: Option, remote_address: Option, - pending_streams: HashSet, + pending_sink: Option>, + message_sink: Option>, } -impl Connection { +impl Connection { fn new(id: ConnectionId, remote_address: Option) -> Self { Self { id, remote_address, - pending_streams: HashSet::new(), + stream_id: None, + pending_sink: None, + message_sink: None, + } + } +} + +#[derive(Debug)] +struct Connections { + last_selected_index: usize, + connections: SmallVec, 2>, +} + +impl Connections { + fn new() -> Self { + Self { + last_selected_index: 0, + connections: SmallVec::new(), + } + } + + pub fn push(&mut self, connection: Connection) { + self.connections.push(connection); + } + + pub fn is_empty(&self) -> bool { + self.connections.is_empty() + } + + pub fn next_active_sink(&mut self) -> Option<&MessageSink> { + let initial_last_selected = cmp::min(self.last_selected_index, self.connections.len() - 1); + let (last_index, sink) = cycle_once(self.connections.len(), initial_last_selected, |i| { + let conn = &self.connections[i]; + conn.message_sink.as_ref() + })?; + + self.last_selected_index = last_index; + Some(sink) + } + + pub fn next_pending_sink(&mut self) -> Option<&MessageSink> { + let initial_last_selected = cmp::min(self.last_selected_index, self.connections.len() - 1); + let (last_index, sink) = cycle_once(self.connections.len(), initial_last_selected, |i| { + let conn = &self.connections[i]; + conn.pending_sink.as_ref() + })?; + + self.last_selected_index = last_index; + Some(sink) + } +} + +impl Default for Connections { + fn default() -> Self { + Self::new() + } +} + +fn cycle_once(n: usize, start: usize, mut f: F) -> Option<(usize, T)> +where F: FnMut(usize) -> Option { + let mut did_wrap = false; + let mut i = start; + if n == 0 { + return None; + } + if i >= n { + return None; + } + + loop { + // Did we find a value? + if let Some(t) = f(i) { + return Some((i, t)); + } + + // Are we back at where we started? + if did_wrap && i == start { + return None; } + + i = (i + 1) % n; + // Did we wrap around? + if i == 0 { + did_wrap = true; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn cycle_once_works() { + assert_eq!(cycle_once(0, 0, |_| -> Option<()> { panic!() }), None); + assert_eq!(cycle_once(1, 0, Some), Some((0, 0))); + assert_eq!(cycle_once(0, 1, Some), None); + assert_eq!(cycle_once(10, 2, |_| None::<()>), None); + assert_eq!( + cycle_once(10, 2, |i| { + if i == 5 { + Some(()) + } else { + None + } + }), + Some((5, ())) + ); + assert_eq!( + cycle_once(10, 2, |i| { + if i == 1 { + Some(()) + } else { + None + } + }), + Some((1, ())) + ); } } diff --git a/networking/libp2p-messaging/src/event.rs b/networking/libp2p-messaging/src/event.rs index 145a541e9..1baaf5b5e 100644 --- a/networking/libp2p-messaging/src/event.rs +++ b/networking/libp2p-messaging/src/event.rs @@ -25,12 +25,19 @@ pub enum Event { stream_id: StreamId, error: Error, }, - StreamClosed { + OutboundStreamOpened { peer_id: PeerId, stream_id: StreamId, }, + InboundStreamOpened { + peer_id: PeerId, + }, InboundStreamClosed { peer_id: PeerId, }, + StreamClosed { + peer_id: PeerId, + stream_id: StreamId, + }, Error(Error), } diff --git a/networking/libp2p-messaging/src/handler.rs b/networking/libp2p-messaging/src/handler.rs index dcbc050fb..5e5f38a30 100644 --- a/networking/libp2p-messaging/src/handler.rs +++ b/networking/libp2p-messaging/src/handler.rs @@ -46,8 +46,8 @@ use crate::{ pub struct Handler { peer_id: PeerId, protocol: StreamProtocol, - requested_streams: VecDeque>, - pending_streams: VecDeque>, + requested_stream: Option>, + pending_stream: Option>, pending_events: VecDeque>, pending_events_sender: mpsc::Sender>, pending_events_receiver: mpsc::Receiver>, @@ -61,8 +61,8 @@ impl Handler { Self { peer_id, protocol, - requested_streams: VecDeque::new(), - pending_streams: VecDeque::new(), + requested_stream: None, + pending_stream: None, pending_events: VecDeque::new(), codec: TCodec::default(), pending_events_sender, @@ -84,9 +84,9 @@ where TCodec: Codec + Send + Clone + 'static fn on_dial_upgrade_error(&mut self, error: DialUpgradeError<(), Protocol>) { let stream = self - .requested_streams - .pop_front() - .expect("negotiated a stream without a pending message"); + .requested_stream + .take() + .expect("negotiated a stream without a requested stream"); match error.error { StreamUpgradeError::Timeout => { @@ -114,7 +114,7 @@ where TCodec: Codec + Send + Clone + 'static "outbound stream for request {} failed: {e}, retrying", stream.stream_id() ); - self.requested_streams.push_back(stream); + self.requested_stream = Some(stream); }, } } @@ -124,12 +124,17 @@ where TCodec: Codec + Send + Clone + 'static let (mut peer_stream, _protocol) = outbound.protocol; let mut msg_stream = self - .requested_streams - .pop_front() - .expect("negotiated a stream without a pending message"); + .requested_stream + .take() + .expect("negotiated outbound stream without a requested stream"); let mut events = self.pending_events_sender.clone(); + self.pending_events.push_back(Event::OutboundStreamOpened { + peer_id: self.peer_id, + stream_id: msg_stream.stream_id(), + }); + let fut = async move { let mut message_id = MessageId::default(); let stream_id = msg_stream.stream_id(); @@ -167,6 +172,8 @@ where TCodec: Codec + Send + Clone + 'static let (mut stream, _protocol) = inbound.protocol; let mut events = self.pending_events_sender.clone(); + self.pending_events.push_back(Event::InboundStreamOpened { peer_id }); + let fut = async move { loop { match codec.decode_from(&mut stream).await { @@ -243,27 +250,25 @@ where TCodec: Codec + Send + Clone + 'static return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); } - // Emit outbound streams. - if let Some(stream) = self.pending_streams.pop_front() { - let protocol = self.protocol.clone(); - self.requested_streams.push_back(stream); + // Open outbound stream. + if let Some(stream) = self.pending_stream.take() { + self.requested_stream = Some(stream); return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(Protocol { protocol }, ()), + protocol: SubstreamProtocol::new( + Protocol { + protocol: self.protocol.clone(), + }, + (), + ), }); } - debug_assert!(self.pending_streams.is_empty()); - - if self.pending_streams.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD { - self.pending_streams.shrink_to_fit(); - } - Poll::Pending } fn on_behaviour_event(&mut self, stream: Self::FromBehaviour) { - self.pending_streams.push_back(stream); + self.pending_stream = Some(stream); } fn on_connection_event( diff --git a/networking/libp2p-messaging/src/stream.rs b/networking/libp2p-messaging/src/stream.rs index 72a221ec7..f4c0665f5 100644 --- a/networking/libp2p-messaging/src/stream.rs +++ b/networking/libp2p-messaging/src/stream.rs @@ -7,8 +7,8 @@ use libp2p::{ }; pub type StreamId = u64; -pub fn channel(stream_id: StreamId, peer_id: PeerId, size: usize) -> (MessageSink, MessageStream) { - let (sender, receiver) = mpsc::channel(size); +pub fn channel(stream_id: StreamId, peer_id: PeerId) -> (MessageSink, MessageStream) { + let (sender, receiver) = mpsc::unbounded(); let sink = MessageSink::new(stream_id, peer_id, sender); let stream = MessageStream::new(stream_id, peer_id, receiver); (sink, stream) @@ -18,11 +18,11 @@ pub fn channel(stream_id: StreamId, peer_id: PeerId, size: usize) -> (Message pub struct MessageStream { stream_id: StreamId, peer_id: PeerId, - receiver: mpsc::Receiver, + receiver: mpsc::UnboundedReceiver, } impl MessageStream { - pub fn new(stream_id: StreamId, peer_id: PeerId, receiver: mpsc::Receiver) -> Self { + pub fn new(stream_id: StreamId, peer_id: PeerId, receiver: mpsc::UnboundedReceiver) -> Self { Self { stream_id, peer_id, @@ -43,14 +43,15 @@ impl MessageStream { } } +#[derive(Debug)] pub struct MessageSink { stream_id: StreamId, peer_id: PeerId, - sender: mpsc::Sender, + sender: mpsc::UnboundedSender, } impl MessageSink { - pub fn new(stream_id: StreamId, peer_id: PeerId, sender: mpsc::Sender) -> Self { + pub fn new(stream_id: StreamId, peer_id: PeerId, sender: mpsc::UnboundedSender) -> Self { Self { stream_id, peer_id, @@ -66,8 +67,8 @@ impl MessageSink { self.stream_id } - pub async fn send(&mut self, msg: TMsg) -> Result<(), crate::Error> { - self.sender.send(msg).await.map_err(|_| crate::Error::ChannelClosed) + pub fn send(&mut self, msg: TMsg) -> Result<(), crate::Error> { + self.sender.unbounded_send(msg).map_err(|_| crate::Error::ChannelClosed) } pub async fn send_all(&mut self, stream: &mut TStream) -> Result<(), crate::Error> @@ -78,3 +79,13 @@ impl MessageSink { .map_err(|_| crate::Error::ChannelClosed) } } + +impl Clone for MessageSink { + fn clone(&self) -> Self { + Self { + stream_id: self.stream_id, + peer_id: self.peer_id, + sender: self.sender.clone(), + } + } +}