Skip to content

Commit

Permalink
fix(p2p/messaging): single stream per connection (#845)
Browse files Browse the repository at this point in the history
Description
---
Single messaging stream per connection
If another stream is requested while one is pending, the pending one is
passed back to caller

Motivation and Context
---
Previously each message sent could cause a new stream to be requested.
This resulted in the permitted amount of streams being exhausted. The
stream could be used to send a single message and be dropped. This PR
checks for pending dial requests or streams and returns the message sink
for those. Streams are kept alive so that they may be reused without the
overhead of creating a new one each time.

How Has This Been Tested?
---
4-VN committee and small 100tx stress test

What process can a PR reviewer use to test or verify this change?
---
You should not see `Dropping outbound stream because we are at capacity`
messages

Breaking Changes
---

- [x] None
- [ ] Requires data directory to be deleted
- [ ] Other - Please specify

---------

Co-authored-by: stringhandler <[email protected]>
  • Loading branch information
sdbondi and stringhandler authored Dec 18, 2023
1 parent b49af42 commit c0e09fe
Show file tree
Hide file tree
Showing 6 changed files with 300 additions and 106 deletions.
2 changes: 1 addition & 1 deletion dan_layer/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ tari_dan_common_types = { workspace = true }
# Shard store deps
tari_engine_types = { workspace = true }
tari_transaction = { workspace = true }
tari_core = { workspace = true }
tari_core = { workspace = true, default-features = true }
tari_mmr = { workspace = true }
tari_crypto = { workspace = true }

Expand Down
39 changes: 19 additions & 20 deletions networking/core/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,23 +206,15 @@ where
peer,
message,
reply_tx,
} => {
match self
.swarm
.behaviour_mut()
.messaging
.enqueue_message(peer, message)
.await
{
Ok(_) => {
debug!(target: LOG_TARGET, "📢 Queued message to peer {}", peer);
let _ignore = reply_tx.send(Ok(()));
},
Err(err) => {
debug!(target: LOG_TARGET, "🚨 Failed to queue message to peer {}: {}", peer, err);
let _ignore = reply_tx.send(Err(err.into()));
},
}
} => match self.swarm.behaviour_mut().messaging.send_message(peer, message).await {
Ok(_) => {
debug!(target: LOG_TARGET, "📢 Queued message to peer {}", peer);
let _ignore = reply_tx.send(Ok(()));
},
Err(err) => {
debug!(target: LOG_TARGET, "🚨 Failed to queue message to peer {}: {}", peer, err);
let _ignore = reply_tx.send(Err(err.into()));
},
},
NetworkingRequest::SendMulticast {
destination,
Expand All @@ -233,7 +225,7 @@ where
let messaging_mut = &mut self.swarm.behaviour_mut().messaging;

for peer in destination {
match messaging_mut.enqueue_message(peer, message.clone()).await {
match messaging_mut.send_message(peer, message.clone()).await {
Ok(_) => {},
Err(err) => {
debug!(target: LOG_TARGET, "🚨 Failed to queue message to peer {}: {}", peer, err);
Expand Down Expand Up @@ -521,7 +513,6 @@ where
debug!(target: LOG_TARGET, "ℹ️ Messaging event: {:?}", event);
},
Substream(event) => {
info!(target: LOG_TARGET, "ℹ️ Substream event: {:?}", event);
self.on_substream_event(event);
},
ConnectionLimits(_) => {
Expand Down Expand Up @@ -826,6 +817,7 @@ where
stream,
protocol,
} => {
info!(target: LOG_TARGET, "📥 substream open: peer_id={}, stream_id={}, protocol={}", peer_id, stream_id, protocol);
let Some(reply) = self.pending_substream_requests.remove(&stream_id) else {
debug!(target: LOG_TARGET, "No pending requests for subtream protocol {protocol} for peer {peer_id}");
return;
Expand All @@ -835,6 +827,7 @@ where
let _ignore = reply.send(Ok(NegotiatedSubstream::new(peer_id, protocol, stream)));
},
InboundSubstreamOpen { notification } => {
info!(target: LOG_TARGET, "📥 Inbound substream open: protocol={}", notification.protocol);
self.substream_notifiers.notify(notification);
},
InboundFailure {
Expand All @@ -844,7 +837,13 @@ where
} => {
debug!(target: LOG_TARGET, "Inbound substream failed from peer {peer_id} with stream id {stream_id}: {error}");
},
OutboundFailure { error, stream_id, .. } => {
OutboundFailure {
error,
stream_id,
peer_id,
..
} => {
debug!(target: LOG_TARGET, "Outbound substream failed with peer {peer_id}, stream {stream_id}: {error}");
if let Some(waiting_reply) = self.pending_substream_requests.remove(&stream_id) {
let _ignore = waiting_reply.send(Err(NetworkingError::FailedToOpenSubstream(error)));
}
Expand Down
Loading

0 comments on commit c0e09fe

Please sign in to comment.