Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/muxing: Generalise StreamMuxer::poll_address_change to poll #2797

Merged
merged 13 commits into from
Aug 16, 2022
Merged
2 changes: 2 additions & 0 deletions core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
- Drop `Unpin` requirement from `SubstreamBox`. See [PR 2762] and [PR 2776].
- Drop `Sync` requirement on `StreamMuxer` for constructing `StreamMuxerBox`. See [PR 2775].
- Use `Pin<&mut Self>` as the receiver type for all `StreamMuxer` poll functions. See [PR 2765].
- Generalise `StreamMuxer::poll_address_change` to `StreamMuxer::poll_event`. See [PR XXXX].
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved

[PR 2724]: https://github.com/libp2p/rust-libp2p/pull/2724
[PR 2762]: https://github.com/libp2p/rust-libp2p/pull/2762
[PR 2775]: https://github.com/libp2p/rust-libp2p/pull/2775
[PR 2776]: https://github.com/libp2p/rust-libp2p/pull/2776
[PR 2765]: https://github.com/libp2p/rust-libp2p/pull/2765
[PR XXXX]: https://github.com/libp2p/rust-libp2p/pull/XXXX

# 0.34.0

Expand Down
21 changes: 10 additions & 11 deletions core/src/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::muxing::StreamMuxerEvent;
use crate::{
muxing::StreamMuxer,
transport::{ListenerId, Transport, TransportError, TransportEvent},
Expand Down Expand Up @@ -236,22 +237,20 @@ where
}
}

fn poll_address_change(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>> {
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.project() {
EitherOutputProj::First(inner) => inner.poll_address_change(cx).map_err(EitherError::A),
EitherOutputProj::Second(inner) => {
inner.poll_address_change(cx).map_err(EitherError::B)
}
EitherOutputProj::First(inner) => inner.poll_close(cx).map_err(EitherError::A),
EitherOutputProj::Second(inner) => inner.poll_close(cx).map_err(EitherError::B),
}
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_event(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
match self.project() {
EitherOutputProj::First(inner) => inner.poll_close(cx).map_err(EitherError::A),
EitherOutputProj::Second(inner) => inner.poll_close(cx).map_err(EitherError::B),
EitherOutputProj::First(inner) => inner.poll_event(cx).map_err(EitherError::A),
EitherOutputProj::Second(inner) => inner.poll_event(cx).map_err(EitherError::B),
}
}
}
Expand Down
32 changes: 20 additions & 12 deletions core/src/muxing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,6 @@ pub trait StreamMuxer {
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>>;

/// Poll for an address change of the underlying connection.
///
/// Not all implementations may support this feature.
fn poll_address_change(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>>;

/// Closes this `StreamMuxer`.
///
/// After this has returned `Poll::Ready(Ok(()))`, the muxer has become useless. All
Expand All @@ -105,6 +97,22 @@ pub trait StreamMuxer {
/// > properly informing the remote, there is no difference between this and
/// > immediately dropping the muxer.
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;

/// Poll for an event of the underlying connection.
///
/// In addition to returning an event, this function may be used to perform any kind of background
/// work that needs to happen for the muxer to do its work. Implementations can rely on this
/// function to be called regularly and unconditionally.
fn poll_event(
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent, Self::Error>>;
}

/// An event produced by a [`StreamMuxer`].
pub enum StreamMuxerEvent {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any thoughts on just naming this Event and referring to is as muxing::Event?

I think there was a loose consensus around #2217 at some point but we haven't really made progress on this front.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Sounds good to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am in favor of just Event as long as we don't do use muxing::Event in other files, but instead just use muxing and then refer to it as muxing::Event.

/// The address of the remote has changed.
AddressChange(Multiaddr),
}

/// Extension trait for [`StreamMuxer`].
Expand All @@ -131,15 +139,15 @@ pub trait StreamMuxerExt: StreamMuxer + Sized {
Pin::new(self).poll_outbound(cx)
}

/// Convenience function for calling [`StreamMuxer::poll_address_change`] for [`StreamMuxer`]s that are `Unpin`.
fn poll_address_change_unpin(
/// Convenience function for calling [`StreamMuxer::poll_event`] for [`StreamMuxer`]s that are `Unpin`.
fn poll_event_unpin(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>>
) -> Poll<Result<StreamMuxerEvent, Self::Error>>
where
Self: Unpin,
{
Pin::new(self).poll_address_change(cx)
Pin::new(self).poll_event(cx)
}

/// Convenience function for calling [`StreamMuxer::poll_close`] for [`StreamMuxer`]s that are `Unpin`.
Expand Down
37 changes: 17 additions & 20 deletions core/src/muxing/boxed.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::muxing::StreamMuxerEvent;
use crate::StreamMuxer;
use futures::{AsyncRead, AsyncWrite};
use multiaddr::Multiaddr;
use pin_project::pin_project;
use std::error::Error;
use std::fmt;
Expand Down Expand Up @@ -38,11 +38,6 @@ where
type Substream = SubstreamBox;
type Error = io::Error;

#[inline]
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_close(cx).map_err(into_io_error)
}

fn poll_inbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand All @@ -65,14 +60,16 @@ where
.map_err(into_io_error)
}

fn poll_address_change(
#[inline]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why inline this method but not the other ones, and why only inline it in this muxer? Just asking out of curiosity because I am not really familiar with #[inline].

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not on purpose to be honest. I thought I left everything as it was before.

I am not too familiar with inlining either but the rough advice I got was that the compiler tends to be smarter on when it is needed :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the many #[inline] attributes are from past premature optimizations. See #897.

the rough advice I got was that the compiler tends to be smarter on when it is needed :)

Yes. Unless proven through a benchmark, let the compiler make the decision and thus don't use #[inline].

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not on purpose to be honest. I thought I left everything as it was before.

The method was marked as #[inline] before this patch as well. I am fine with either removing it here or in a pull request in the future.

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_close(cx).map_err(into_io_error)
}

fn poll_event(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>> {
self.project()
.inner
.poll_address_change(cx)
.map_err(into_io_error)
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
self.project().inner.poll_event(cx).map_err(into_io_error)
}
}

Expand Down Expand Up @@ -109,11 +106,6 @@ impl StreamMuxer for StreamMuxerBox {
type Substream = SubstreamBox;
type Error = io::Error;

#[inline]
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().poll_close(cx)
}

fn poll_inbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand All @@ -128,11 +120,16 @@ impl StreamMuxer for StreamMuxerBox {
self.project().poll_outbound(cx)
}

fn poll_address_change(
#[inline]
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().poll_close(cx)
}

fn poll_event(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>> {
self.project().poll_address_change(cx)
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
self.project().poll_event(cx)
}
}

Expand Down
14 changes: 7 additions & 7 deletions core/src/muxing/singleton.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

use crate::{connection::Endpoint, muxing::StreamMuxer};

use crate::muxing::StreamMuxerEvent;
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::cell::Cell;
use std::pin::Pin;
use std::{io, task::Context, task::Poll};
Expand Down Expand Up @@ -88,14 +88,14 @@ where
}
}

fn poll_address_change(
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}

fn poll_event(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>> {
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
Poll::Pending
}

fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}
}
7 changes: 4 additions & 3 deletions muxers/mplex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ pub use config::{MaxBufferBehaviour, MplexConfig};
use bytes::Bytes;
use codec::LocalStreamId;
use futures::{future, prelude::*, ready};
use libp2p_core::muxing::StreamMuxerEvent;
use libp2p_core::{
upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo},
Multiaddr, StreamMuxer,
StreamMuxer,
};
use parking_lot::Mutex;
use std::{cmp, iter, pin::Pin, sync::Arc, task::Context, task::Poll};
Expand Down Expand Up @@ -105,10 +106,10 @@ where
.map_ok(|stream_id| Substream::new(stream_id, self.io.clone()))
}

fn poll_address_change(
fn poll_event(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>> {
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
Poll::Pending
}

Expand Down
7 changes: 3 additions & 4 deletions muxers/yamux/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ use futures::{
prelude::*,
stream::{BoxStream, LocalBoxStream},
};
use libp2p_core::muxing::StreamMuxer;
use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent};
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use libp2p_core::Multiaddr;
use std::{
fmt, io, iter, mem,
pin::Pin,
Expand Down Expand Up @@ -124,10 +123,10 @@ where
.map_err(YamuxError)
}

fn poll_address_change(
fn poll_event(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>> {
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
Poll::Pending
}

Expand Down
43 changes: 26 additions & 17 deletions swarm/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::IntoConnectionHandler;
use handler_wrapper::HandlerWrapper;
use libp2p_core::connection::ConnectedPoint;
use libp2p_core::multiaddr::Multiaddr;
use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt};
use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerEvent, StreamMuxerExt};
use libp2p_core::upgrade;
use libp2p_core::PeerId;
use std::collections::VecDeque;
Expand Down Expand Up @@ -153,27 +153,36 @@ where
}
}

if !self.open_info.is_empty() {
if let Poll::Ready(substream) = self.muxing.poll_outbound_unpin(cx)? {
let user_data = self
.open_info
.pop_front()
.expect("`open_info` is not empty");
let endpoint = SubstreamEndpoint::Dialer(user_data);
self.handler.inject_substream(substream, endpoint);
continue; // Go back to the top, handler can potentially make progress again.
match self.muxing.poll_event_unpin(cx)? {
Poll::Pending => {}
Poll::Ready(StreamMuxerEvent::AddressChange(address)) => {
self.handler.inject_address_change(&address);
return Poll::Ready(Ok(Event::AddressChange(address)));
}
}

if let Poll::Ready(substream) = self.muxing.poll_inbound_unpin(cx)? {
self.handler
.inject_substream(substream, SubstreamEndpoint::Listener);
continue; // Go back to the top, handler can potentially make progress again.
if !self.open_info.is_empty() {
match self.muxing.poll_outbound_unpin(cx)? {
Poll::Pending => {}
Poll::Ready(substream) => {
let user_data = self
.open_info
.pop_front()
.expect("`open_info` is not empty");
let endpoint = SubstreamEndpoint::Dialer(user_data);
self.handler.inject_substream(substream, endpoint);
continue; // Go back to the top, handler can potentially make progress again.
}
}
}

if let Poll::Ready(address) = self.muxing.poll_address_change_unpin(cx)? {
self.handler.inject_address_change(&address);
return Poll::Ready(Ok(Event::AddressChange(address)));
match self.muxing.poll_inbound_unpin(cx)? {
Poll::Pending => {}
Poll::Ready(substream) => {
self.handler
.inject_substream(substream, SubstreamEndpoint::Listener);
continue; // Go back to the top, handler can potentially make progress again.
}
}

return Poll::Pending; // Nothing can make progress, return `Pending`.
Expand Down