From aa1a2425f799075a26df53c08060b421b85c69ef Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 3 Aug 2022 21:43:37 +0200 Subject: [PATCH 01/12] Generalise `StreamMuxer::poll_address_change` to `poll_event` This is to allow general-purpose background work to be performed by implementations. --- core/src/either.rs | 11 +++++------ core/src/muxing.rs | 24 ++++++++++++++++-------- core/src/muxing/boxed.rs | 17 +++++++---------- core/src/muxing/singleton.rs | 6 +++--- muxers/mplex/src/lib.rs | 7 ++++--- muxers/yamux/src/lib.rs | 7 +++---- swarm/src/connection.rs | 12 ++++++++---- 7 files changed, 46 insertions(+), 38 deletions(-) diff --git a/core/src/either.rs b/core/src/either.rs index 42984519488..db980ad75ad 100644 --- a/core/src/either.rs +++ b/core/src/either.rs @@ -18,6 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use crate::muxing::StreamMuxerEvent; use crate::{ muxing::StreamMuxer, transport::{ListenerId, Transport, TransportError, TransportEvent}, @@ -236,15 +237,13 @@ where } } - fn poll_address_change( + fn poll_event( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { match self.project() { - EitherOutputProj::First(inner) => inner.poll_address_change(cx).map_err(EitherError::A), - EitherOutputProj::Second(inner) => { - inner.poll_address_change(cx).map_err(EitherError::B) - } + EitherOutputProj::First(inner) => inner.poll_event(cx).map_err(EitherError::A), + EitherOutputProj::Second(inner) => inner.poll_event(cx).map_err(EitherError::B), } } diff --git a/core/src/muxing.rs b/core/src/muxing.rs index 2d1e1068044..2322c8ac22a 100644 --- a/core/src/muxing.rs +++ b/core/src/muxing.rs @@ -86,13 +86,15 @@ pub trait StreamMuxer { cx: &mut Context<'_>, ) -> Poll>; - /// Poll for an address change of the underlying connection. + /// Poll for an event of the underlying connection. /// - /// Not all implementations may support this feature. - fn poll_address_change( + /// In addition to returning an event, this function may be used to perform any kind of background + /// work that needs to happen for the muxer to do its work. Implementations can rely on this + /// function to be called regularly and unconditionally. + fn poll_event( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>; + ) -> Poll>; /// Closes this `StreamMuxer`. /// @@ -107,6 +109,12 @@ pub trait StreamMuxer { fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; } +/// An event produced by a [`StreamMuxer`]. +pub enum StreamMuxerEvent { + /// The address of the remote has changed. + AddressChange(Multiaddr), +} + /// Extension trait for [`StreamMuxer`]. pub trait StreamMuxerExt: StreamMuxer + Sized { /// Convenience function for calling [`StreamMuxer::poll_inbound`] for [`StreamMuxer`]s that are `Unpin`. @@ -131,15 +139,15 @@ pub trait StreamMuxerExt: StreamMuxer + Sized { Pin::new(self).poll_outbound(cx) } - /// Convenience function for calling [`StreamMuxer::poll_address_change`] for [`StreamMuxer`]s that are `Unpin`. - fn poll_address_change_unpin( + /// Convenience function for calling [`StreamMuxer::poll_event`] for [`StreamMuxer`]s that are `Unpin`. + fn poll_event_unpin( &mut self, cx: &mut Context<'_>, - ) -> Poll> + ) -> Poll> where Self: Unpin, { - Pin::new(self).poll_address_change(cx) + Pin::new(self).poll_event(cx) } /// Convenience function for calling [`StreamMuxer::poll_close`] for [`StreamMuxer`]s that are `Unpin`. diff --git a/core/src/muxing/boxed.rs b/core/src/muxing/boxed.rs index 0f5b6e5822e..156f53b1a18 100644 --- a/core/src/muxing/boxed.rs +++ b/core/src/muxing/boxed.rs @@ -1,6 +1,6 @@ +use crate::muxing::StreamMuxerEvent; use crate::StreamMuxer; use futures::{AsyncRead, AsyncWrite}; -use multiaddr::Multiaddr; use pin_project::pin_project; use std::error::Error; use std::fmt; @@ -65,14 +65,11 @@ where .map_err(into_io_error) } - fn poll_address_change( + fn poll_event( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { - self.project() - .inner - .poll_address_change(cx) - .map_err(into_io_error) + ) -> Poll> { + self.project().inner.poll_event(cx).map_err(into_io_error) } } @@ -128,11 +125,11 @@ impl StreamMuxer for StreamMuxerBox { self.project().poll_outbound(cx) } - fn poll_address_change( + fn poll_event( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { - self.project().poll_address_change(cx) + ) -> Poll> { + self.project().poll_event(cx) } } diff --git a/core/src/muxing/singleton.rs b/core/src/muxing/singleton.rs index 193cfb6303f..843eb56dfb9 100644 --- a/core/src/muxing/singleton.rs +++ b/core/src/muxing/singleton.rs @@ -20,8 +20,8 @@ use crate::{connection::Endpoint, muxing::StreamMuxer}; +use crate::muxing::StreamMuxerEvent; use futures::prelude::*; -use multiaddr::Multiaddr; use std::cell::Cell; use std::pin::Pin; use std::{io, task::Context, task::Poll}; @@ -88,10 +88,10 @@ where } } - fn poll_address_change( + fn poll_event( self: Pin<&mut Self>, _: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { Poll::Pending } diff --git a/muxers/mplex/src/lib.rs b/muxers/mplex/src/lib.rs index 14f9cda65d9..f786dd42d3d 100644 --- a/muxers/mplex/src/lib.rs +++ b/muxers/mplex/src/lib.rs @@ -27,9 +27,10 @@ pub use config::{MaxBufferBehaviour, MplexConfig}; use bytes::Bytes; use codec::LocalStreamId; use futures::{future, prelude::*, ready}; +use libp2p_core::muxing::StreamMuxerEvent; use libp2p_core::{ upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}, - Multiaddr, StreamMuxer, + StreamMuxer, }; use parking_lot::Mutex; use std::{cmp, iter, pin::Pin, sync::Arc, task::Context, task::Poll}; @@ -105,10 +106,10 @@ where .map_ok(|stream_id| Substream::new(stream_id, self.io.clone())) } - fn poll_address_change( + fn poll_event( self: Pin<&mut Self>, _: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { Poll::Pending } diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index 07327e203b3..f524a369d21 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -26,9 +26,8 @@ use futures::{ prelude::*, stream::{BoxStream, LocalBoxStream}, }; -use libp2p_core::muxing::StreamMuxer; +use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; -use libp2p_core::Multiaddr; use std::{ fmt, io, iter, mem, pin::Pin, @@ -124,10 +123,10 @@ where .map_err(YamuxError) } - fn poll_address_change( + fn poll_event( self: Pin<&mut Self>, _: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { Poll::Pending } diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index f92186618ae..bb556768b18 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -35,7 +35,7 @@ use crate::IntoConnectionHandler; use handler_wrapper::HandlerWrapper; use libp2p_core::connection::ConnectedPoint; use libp2p_core::multiaddr::Multiaddr; -use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt}; +use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerEvent, StreamMuxerExt}; use libp2p_core::upgrade; use libp2p_core::PeerId; use std::collections::VecDeque; @@ -171,9 +171,13 @@ where continue; // Go back to the top, handler can potentially make progress again. } - if let Poll::Ready(address) = self.muxing.poll_address_change_unpin(cx)? { - self.handler.inject_address_change(&address); - return Poll::Ready(Ok(Event::AddressChange(address))); + if let Poll::Ready(event) = self.muxing.poll_event_unpin(cx)? { + match event { + StreamMuxerEvent::AddressChange(address) => { + self.handler.inject_address_change(&address); + return Poll::Ready(Ok(Event::AddressChange(address))); + } + } } return Poll::Pending; // Nothing can make progress, return `Pending`. From febd5182bb3f154739366a9aebfa557e4b968bb4 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 3 Aug 2022 21:45:42 +0200 Subject: [PATCH 02/12] Re-order functions to feature `poll_event` at the bottom --- core/src/either.rs | 14 +++++++------- core/src/muxing.rs | 20 ++++++++++---------- core/src/muxing/boxed.rs | 20 ++++++++++---------- core/src/muxing/singleton.rs | 8 ++++---- 4 files changed, 31 insertions(+), 31 deletions(-) diff --git a/core/src/either.rs b/core/src/either.rs index db980ad75ad..de008a3caba 100644 --- a/core/src/either.rs +++ b/core/src/either.rs @@ -237,6 +237,13 @@ where } } + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.project() { + EitherOutputProj::First(inner) => inner.poll_close(cx).map_err(EitherError::A), + EitherOutputProj::Second(inner) => inner.poll_close(cx).map_err(EitherError::B), + } + } + fn poll_event( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -246,13 +253,6 @@ where EitherOutputProj::Second(inner) => inner.poll_event(cx).map_err(EitherError::B), } } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.project() { - EitherOutputProj::First(inner) => inner.poll_close(cx).map_err(EitherError::A), - EitherOutputProj::Second(inner) => inner.poll_close(cx).map_err(EitherError::B), - } - } } /// Implements `Future` and dispatches all method calls to either `First` or `Second`. diff --git a/core/src/muxing.rs b/core/src/muxing.rs index 2322c8ac22a..0b18d009b4c 100644 --- a/core/src/muxing.rs +++ b/core/src/muxing.rs @@ -86,16 +86,6 @@ pub trait StreamMuxer { cx: &mut Context<'_>, ) -> Poll>; - /// Poll for an event of the underlying connection. - /// - /// In addition to returning an event, this function may be used to perform any kind of background - /// work that needs to happen for the muxer to do its work. Implementations can rely on this - /// function to be called regularly and unconditionally. - fn poll_event( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>; - /// Closes this `StreamMuxer`. /// /// After this has returned `Poll::Ready(Ok(()))`, the muxer has become useless. All @@ -107,6 +97,16 @@ pub trait StreamMuxer { /// > properly informing the remote, there is no difference between this and /// > immediately dropping the muxer. fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; + + /// Poll for an event of the underlying connection. + /// + /// In addition to returning an event, this function may be used to perform any kind of background + /// work that needs to happen for the muxer to do its work. Implementations can rely on this + /// function to be called regularly and unconditionally. + fn poll_event( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>; } /// An event produced by a [`StreamMuxer`]. diff --git a/core/src/muxing/boxed.rs b/core/src/muxing/boxed.rs index 156f53b1a18..9aae3a5e992 100644 --- a/core/src/muxing/boxed.rs +++ b/core/src/muxing/boxed.rs @@ -38,11 +38,6 @@ where type Substream = SubstreamBox; type Error = io::Error; - #[inline] - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().inner.poll_close(cx).map_err(into_io_error) - } - fn poll_inbound( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -65,6 +60,11 @@ where .map_err(into_io_error) } + #[inline] + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_close(cx).map_err(into_io_error) + } + fn poll_event( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -106,11 +106,6 @@ impl StreamMuxer for StreamMuxerBox { type Substream = SubstreamBox; type Error = io::Error; - #[inline] - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().poll_close(cx) - } - fn poll_inbound( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -125,6 +120,11 @@ impl StreamMuxer for StreamMuxerBox { self.project().poll_outbound(cx) } + #[inline] + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().poll_close(cx) + } + fn poll_event( self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/core/src/muxing/singleton.rs b/core/src/muxing/singleton.rs index 843eb56dfb9..47edf2465da 100644 --- a/core/src/muxing/singleton.rs +++ b/core/src/muxing/singleton.rs @@ -88,14 +88,14 @@ where } } + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + fn poll_event( self: Pin<&mut Self>, _: &mut Context<'_>, ) -> Poll> { Poll::Pending } - - fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } } From ce958fc1e3b28d410326bcbe9f11c5927376763d Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 3 Aug 2022 21:46:43 +0200 Subject: [PATCH 03/12] Call `poll_event` before we call other `poll` functions This is to allow any kind of background work to happen before anything else. --- swarm/src/connection.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index bb556768b18..c103eb45651 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -153,6 +153,15 @@ where } } + if let Poll::Ready(event) = self.muxing.poll_event_unpin(cx)? { + match event { + StreamMuxerEvent::AddressChange(address) => { + self.handler.inject_address_change(&address); + return Poll::Ready(Ok(Event::AddressChange(address))); + } + } + } + if !self.open_info.is_empty() { if let Poll::Ready(substream) = self.muxing.poll_outbound_unpin(cx)? { let user_data = self @@ -171,15 +180,6 @@ where continue; // Go back to the top, handler can potentially make progress again. } - if let Poll::Ready(event) = self.muxing.poll_event_unpin(cx)? { - match event { - StreamMuxerEvent::AddressChange(address) => { - self.handler.inject_address_change(&address); - return Poll::Ready(Ok(Event::AddressChange(address))); - } - } - } - return Poll::Pending; // Nothing can make progress, return `Pending`. } } From 9a0eed2870378db1175e53a1885f1f70b8918e62 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 3 Aug 2022 21:48:42 +0200 Subject: [PATCH 04/12] Add changelog entry --- core/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index dc4fd0829c0..8974fd88a57 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -5,12 +5,14 @@ - Drop `Unpin` requirement from `SubstreamBox`. See [PR 2762] and [PR 2776]. - Drop `Sync` requirement on `StreamMuxer` for constructing `StreamMuxerBox`. See [PR 2775]. - Use `Pin<&mut Self>` as the receiver type for all `StreamMuxer` poll functions. See [PR 2765]. +- Generalise `StreamMuxer::poll_address_change` to `StreamMuxer::poll_event`. See [PR XXXX]. [PR 2724]: https://github.com/libp2p/rust-libp2p/pull/2724 [PR 2762]: https://github.com/libp2p/rust-libp2p/pull/2762 [PR 2775]: https://github.com/libp2p/rust-libp2p/pull/2775 [PR 2776]: https://github.com/libp2p/rust-libp2p/pull/2776 [PR 2765]: https://github.com/libp2p/rust-libp2p/pull/2765 +[PR XXXX]: https://github.com/libp2p/rust-libp2p/pull/XXXX # 0.34.0 From fc61a68cf8e1139d9910d2d482cf3bc0cf228aee Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 4 Aug 2022 13:36:16 +0200 Subject: [PATCH 05/12] Align coding style of all branches in `poll` function --- swarm/src/connection.rs | 41 +++++++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index c103eb45651..388b8495cb7 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -153,31 +153,36 @@ where } } - if let Poll::Ready(event) = self.muxing.poll_event_unpin(cx)? { - match event { - StreamMuxerEvent::AddressChange(address) => { - self.handler.inject_address_change(&address); - return Poll::Ready(Ok(Event::AddressChange(address))); - } + match self.muxing.poll_event_unpin(cx)? { + Poll::Pending => {} + Poll::Ready(StreamMuxerEvent::AddressChange(address)) => { + self.handler.inject_address_change(&address); + return Poll::Ready(Ok(Event::AddressChange(address))); } } if !self.open_info.is_empty() { - if let Poll::Ready(substream) = self.muxing.poll_outbound_unpin(cx)? { - let user_data = self - .open_info - .pop_front() - .expect("`open_info` is not empty"); - let endpoint = SubstreamEndpoint::Dialer(user_data); - self.handler.inject_substream(substream, endpoint); - continue; // Go back to the top, handler can potentially make progress again. + match self.muxing.poll_outbound_unpin(cx)? { + Poll::Pending => {} + Poll::Ready(substream) => { + let user_data = self + .open_info + .pop_front() + .expect("`open_info` is not empty"); + let endpoint = SubstreamEndpoint::Dialer(user_data); + self.handler.inject_substream(substream, endpoint); + continue; // Go back to the top, handler can potentially make progress again. + } } } - if let Poll::Ready(substream) = self.muxing.poll_inbound_unpin(cx)? { - self.handler - .inject_substream(substream, SubstreamEndpoint::Listener); - continue; // Go back to the top, handler can potentially make progress again. + match self.muxing.poll_inbound_unpin(cx)? { + Poll::Pending => {} + Poll::Ready(substream) => { + self.handler + .inject_substream(substream, SubstreamEndpoint::Listener); + continue; // Go back to the top, handler can potentially make progress again. + } } return Poll::Pending; // Nothing can make progress, return `Pending`. From 360857b0e89be430f5e0f290c84ae94ed5d03d6a Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 8 Aug 2022 17:54:33 +0200 Subject: [PATCH 06/12] Update changelog --- core/CHANGELOG.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index 8974fd88a57..36e6cb7c32f 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -1,11 +1,11 @@ # 0.35.0 [unreleased] -- Remove `StreamMuxer::poll_event` in favor of individual functions: `poll_inbound`, `poll_outbound` - and `poll_address_change`. Consequently, `StreamMuxerEvent` is also removed. See [PR 2724]. - Drop `Unpin` requirement from `SubstreamBox`. See [PR 2762] and [PR 2776]. - Drop `Sync` requirement on `StreamMuxer` for constructing `StreamMuxerBox`. See [PR 2775]. - Use `Pin<&mut Self>` as the receiver type for all `StreamMuxer` poll functions. See [PR 2765]. -- Generalise `StreamMuxer::poll_address_change` to `StreamMuxer::poll_event`. See [PR XXXX]. +- Change `StreamMuxer` interface to be entirely poll-based. All functions on `StreamMuxer` now + require a `Context` and return `Poll`. This gives callers fine-grained control over what they + would like to make progress on. See [PR 2724] and [PR XXXX]. [PR 2724]: https://github.com/libp2p/rust-libp2p/pull/2724 [PR 2762]: https://github.com/libp2p/rust-libp2p/pull/2762 From 55c0da0e8bb50bc542fa0dcabb54359c5b5794fd Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 8 Aug 2022 17:54:49 +0200 Subject: [PATCH 07/12] Fill in changelog number --- core/CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index 36e6cb7c32f..c377d042106 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -5,14 +5,14 @@ - Use `Pin<&mut Self>` as the receiver type for all `StreamMuxer` poll functions. See [PR 2765]. - Change `StreamMuxer` interface to be entirely poll-based. All functions on `StreamMuxer` now require a `Context` and return `Poll`. This gives callers fine-grained control over what they - would like to make progress on. See [PR 2724] and [PR XXXX]. + would like to make progress on. See [PR 2724] and [PR 2797]. [PR 2724]: https://github.com/libp2p/rust-libp2p/pull/2724 [PR 2762]: https://github.com/libp2p/rust-libp2p/pull/2762 [PR 2775]: https://github.com/libp2p/rust-libp2p/pull/2775 [PR 2776]: https://github.com/libp2p/rust-libp2p/pull/2776 [PR 2765]: https://github.com/libp2p/rust-libp2p/pull/2765 -[PR XXXX]: https://github.com/libp2p/rust-libp2p/pull/XXXX +[PR 2797]: https://github.com/libp2p/rust-libp2p/pull/2797 # 0.34.0 From d9c2e6a2ed1801e4e0164c17dd6e58f5238fae4f Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 8 Aug 2022 17:57:31 +0200 Subject: [PATCH 08/12] Adopt docs on `poll_event` --- core/src/muxing.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/muxing.rs b/core/src/muxing.rs index 0b18d009b4c..9da66943c0f 100644 --- a/core/src/muxing.rs +++ b/core/src/muxing.rs @@ -100,9 +100,9 @@ pub trait StreamMuxer { /// Poll for an event of the underlying connection. /// - /// In addition to returning an event, this function may be used to perform any kind of background - /// work that needs to happen for the muxer to do its work. Implementations can rely on this - /// function to be called regularly and unconditionally. + /// Out of all functions on [`StreamMuxer`] this function is guaranteed to be called conditionally + /// and may thus be used to perform necessary background work in order for the muxer or the + /// underlying connection to work. fn poll_event( self: Pin<&mut Self>, cx: &mut Context<'_>, From 147927195d9c07614169dd709343616fe270af4c Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 9 Aug 2022 11:12:38 +0200 Subject: [PATCH 09/12] Improve docs --- core/src/muxing.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/core/src/muxing.rs b/core/src/muxing.rs index 9da66943c0f..91a0130db43 100644 --- a/core/src/muxing.rs +++ b/core/src/muxing.rs @@ -75,6 +75,10 @@ pub trait StreamMuxer { type Error: std::error::Error; /// Poll for new inbound substreams. + /// + /// This function should be called whenever callers are ready to accept more inbound streams. In + /// other words, callers may exercise back-pressure on incoming streams by not calling this + /// function if a certain limit is hit. fn poll_inbound( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -86,11 +90,10 @@ pub trait StreamMuxer { cx: &mut Context<'_>, ) -> Poll>; - /// Closes this `StreamMuxer`. + /// Poll to close this [`StreamMuxer`]. /// - /// After this has returned `Poll::Ready(Ok(()))`, the muxer has become useless. All - /// subsequent reads must return either `EOF` or an error. All subsequent writes, shutdowns, - /// or polls must generate an error or be ignored. + /// After this has returned `Poll::Ready(Ok(()))`, the muxer has become useless and may be safely + /// dropped. /// /// > **Note**: You are encouraged to call this method and wait for it to return `Ready`, so /// > that the remote is properly informed of the shutdown. However, apart from From 8f8fa91fc8e7e41a1b440268a682f2d782ecff5a Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 9 Aug 2022 11:20:04 +0200 Subject: [PATCH 10/12] Rename `poll_event` to `poll` --- core/src/either.rs | 6 +++--- core/src/muxing.rs | 20 +++++++++----------- core/src/muxing/boxed.rs | 8 ++++---- core/src/muxing/singleton.rs | 2 +- muxers/mplex/src/lib.rs | 2 +- muxers/yamux/src/lib.rs | 2 +- swarm/src/connection.rs | 2 +- 7 files changed, 20 insertions(+), 22 deletions(-) diff --git a/core/src/either.rs b/core/src/either.rs index de008a3caba..a34552bf28f 100644 --- a/core/src/either.rs +++ b/core/src/either.rs @@ -244,13 +244,13 @@ where } } - fn poll_event( + fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { match self.project() { - EitherOutputProj::First(inner) => inner.poll_event(cx).map_err(EitherError::A), - EitherOutputProj::Second(inner) => inner.poll_event(cx).map_err(EitherError::B), + EitherOutputProj::First(inner) => inner.poll(cx).map_err(EitherError::A), + EitherOutputProj::Second(inner) => inner.poll(cx).map_err(EitherError::B), } } } diff --git a/core/src/muxing.rs b/core/src/muxing.rs index 91a0130db43..5ff018dc828 100644 --- a/core/src/muxing.rs +++ b/core/src/muxing.rs @@ -101,12 +101,13 @@ pub trait StreamMuxer { /// > immediately dropping the muxer. fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; - /// Poll for an event of the underlying connection. + /// Poll to allow the underlying connection to make progress. /// - /// Out of all functions on [`StreamMuxer`] this function is guaranteed to be called conditionally - /// and may thus be used to perform necessary background work in order for the muxer or the - /// underlying connection to work. - fn poll_event( + /// In contrast to all other `poll`-functions on [`StreamMuxer`], this function must be called + /// unconditionally. Because it will be called regardless, this function can be used by + /// implementations to return events about the underlying connection that the caller MUST deal + /// with. + fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>; @@ -142,15 +143,12 @@ pub trait StreamMuxerExt: StreamMuxer + Sized { Pin::new(self).poll_outbound(cx) } - /// Convenience function for calling [`StreamMuxer::poll_event`] for [`StreamMuxer`]s that are `Unpin`. - fn poll_event_unpin( - &mut self, - cx: &mut Context<'_>, - ) -> Poll> + /// Convenience function for calling [`StreamMuxer::poll`] for [`StreamMuxer`]s that are `Unpin`. + fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll> where Self: Unpin, { - Pin::new(self).poll_event(cx) + Pin::new(self).poll(cx) } /// Convenience function for calling [`StreamMuxer::poll_close`] for [`StreamMuxer`]s that are `Unpin`. diff --git a/core/src/muxing/boxed.rs b/core/src/muxing/boxed.rs index 9aae3a5e992..2dc605af483 100644 --- a/core/src/muxing/boxed.rs +++ b/core/src/muxing/boxed.rs @@ -65,11 +65,11 @@ where self.project().inner.poll_close(cx).map_err(into_io_error) } - fn poll_event( + fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - self.project().inner.poll_event(cx).map_err(into_io_error) + self.project().inner.poll(cx).map_err(into_io_error) } } @@ -125,11 +125,11 @@ impl StreamMuxer for StreamMuxerBox { self.project().poll_close(cx) } - fn poll_event( + fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - self.project().poll_event(cx) + self.project().poll(cx) } } diff --git a/core/src/muxing/singleton.rs b/core/src/muxing/singleton.rs index 47edf2465da..a10516edc54 100644 --- a/core/src/muxing/singleton.rs +++ b/core/src/muxing/singleton.rs @@ -92,7 +92,7 @@ where Poll::Ready(Ok(())) } - fn poll_event( + fn poll( self: Pin<&mut Self>, _: &mut Context<'_>, ) -> Poll> { diff --git a/muxers/mplex/src/lib.rs b/muxers/mplex/src/lib.rs index f786dd42d3d..1a18d0e2d68 100644 --- a/muxers/mplex/src/lib.rs +++ b/muxers/mplex/src/lib.rs @@ -106,7 +106,7 @@ where .map_ok(|stream_id| Substream::new(stream_id, self.io.clone())) } - fn poll_event( + fn poll( self: Pin<&mut Self>, _: &mut Context<'_>, ) -> Poll> { diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index f524a369d21..5b109f2b3b0 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -123,7 +123,7 @@ where .map_err(YamuxError) } - fn poll_event( + fn poll( self: Pin<&mut Self>, _: &mut Context<'_>, ) -> Poll> { diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 388b8495cb7..24e54aba525 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -153,7 +153,7 @@ where } } - match self.muxing.poll_event_unpin(cx)? { + match self.muxing.poll_unpin(cx)? { Poll::Pending => {} Poll::Ready(StreamMuxerEvent::AddressChange(address)) => { self.handler.inject_address_change(&address); From ee0a76c1c21b07a0d745d7b0e8d25280a2c3a6c9 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 9 Aug 2022 19:21:35 +1000 Subject: [PATCH 11/12] Update core/src/muxing.rs --- core/src/muxing.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/muxing.rs b/core/src/muxing.rs index 5ff018dc828..9763436e94a 100644 --- a/core/src/muxing.rs +++ b/core/src/muxing.rs @@ -103,7 +103,7 @@ pub trait StreamMuxer { /// Poll to allow the underlying connection to make progress. /// - /// In contrast to all other `poll`-functions on [`StreamMuxer`], this function must be called + /// In contrast to all other `poll`-functions on [`StreamMuxer`], this function MUST be called /// unconditionally. Because it will be called regardless, this function can be used by /// implementations to return events about the underlying connection that the caller MUST deal /// with. From 94ae705a70c56595d33a9df9efce3144f003b176 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 9 Aug 2022 11:27:13 +0200 Subject: [PATCH 12/12] Streamline imports --- core/src/muxing/boxed.rs | 3 +-- core/src/muxing/singleton.rs | 4 ++-- muxers/mplex/src/lib.rs | 7 ++----- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/core/src/muxing/boxed.rs b/core/src/muxing/boxed.rs index 2dc605af483..99f7a87c6a5 100644 --- a/core/src/muxing/boxed.rs +++ b/core/src/muxing/boxed.rs @@ -1,5 +1,4 @@ -use crate::muxing::StreamMuxerEvent; -use crate::StreamMuxer; +use crate::muxing::{StreamMuxer, StreamMuxerEvent}; use futures::{AsyncRead, AsyncWrite}; use pin_project::pin_project; use std::error::Error; diff --git a/core/src/muxing/singleton.rs b/core/src/muxing/singleton.rs index a10516edc54..3ba2c1cb366 100644 --- a/core/src/muxing/singleton.rs +++ b/core/src/muxing/singleton.rs @@ -18,9 +18,9 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::{connection::Endpoint, muxing::StreamMuxer}; +use crate::connection::Endpoint; +use crate::muxing::{StreamMuxer, StreamMuxerEvent}; -use crate::muxing::StreamMuxerEvent; use futures::prelude::*; use std::cell::Cell; use std::pin::Pin; diff --git a/muxers/mplex/src/lib.rs b/muxers/mplex/src/lib.rs index 1a18d0e2d68..501c4dd6735 100644 --- a/muxers/mplex/src/lib.rs +++ b/muxers/mplex/src/lib.rs @@ -27,11 +27,8 @@ pub use config::{MaxBufferBehaviour, MplexConfig}; use bytes::Bytes; use codec::LocalStreamId; use futures::{future, prelude::*, ready}; -use libp2p_core::muxing::StreamMuxerEvent; -use libp2p_core::{ - upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}, - StreamMuxer, -}; +use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; +use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use parking_lot::Mutex; use std::{cmp, iter, pin::Pin, sync::Arc, task::Context, task::Poll};