From ea487aebfe6eb672b05d2bec2d9d79bbd92450ba Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 20 Jun 2022 05:33:59 +0200 Subject: [PATCH] muxers/mplex: Implement `AsyncRead` and `AsyncWrite` for `Substream` (#2706) This aligns the public API of the `libp2p-mplex` module with the one from `libp2p-yamux`. This change has two benefits: 1. For standalone users of `libp2p-mplex`, the substreams itself are now useful, similar to `libp2p-yamux` and don't necessarily need to be polled via the `StreamMuxer`. The `StreamMuxer` only forwards to the `Async{Read,Write}` implementations. 2. This will reduce the diff of #2648 because we can chunk the one giant commit into smaller atomic ones. --- CHANGELOG.md | 2 + Cargo.toml | 2 +- muxers/mplex/CHANGELOG.md | 6 ++ muxers/mplex/Cargo.toml | 2 +- muxers/mplex/src/lib.rs | 124 +++++++++++++++++++++++++++++--------- 5 files changed, 104 insertions(+), 32 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 042cb9f6015..fd3b9656f95 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,6 +46,8 @@ # 0.46.0 [unreleased] - Semver bump Rust from `1.56.1` to `1.60.0` . See [PR 2646]. - Added weak dependencies for features. See [PR 2646]. +- Update individual crates. + - Update to [`libp2p-mplex` `v0.34.0`](muxers/mplex/CHANGELOG.md). [PR 2646]: https://github.com/libp2p/rust-libp2p/pull/2646 diff --git a/Cargo.toml b/Cargo.toml index a48316a6479..365bb431f1f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,7 +83,7 @@ libp2p-floodsub = { version = "0.36.0", path = "protocols/floodsub", optional = libp2p-identify = { version = "0.36.1", path = "protocols/identify", optional = true } libp2p-kad = { version = "0.37.1", path = "protocols/kad", optional = true } libp2p-metrics = { version = "0.6.0", path = "misc/metrics", optional = true } -libp2p-mplex = { version = "0.33.0", path = "muxers/mplex", optional = true } +libp2p-mplex = { version = "0.34.0", path = "muxers/mplex", optional = true } libp2p-noise = { version = "0.36.0", path = "transports/noise", optional = true } libp2p-ping = { version = "0.36.0", path = "protocols/ping", optional = true } libp2p-plaintext = { version = "0.33.0", path = "transports/plaintext", optional = true } diff --git a/muxers/mplex/CHANGELOG.md b/muxers/mplex/CHANGELOG.md index 142367adae5..6a2b73f4f9d 100644 --- a/muxers/mplex/CHANGELOG.md +++ b/muxers/mplex/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.34.0 [unreleased] + +- `Substream` now implements `AsyncRead` and `AsyncWrite`. See [PR 2706]. + +[PR 2706]: https://github.com/libp2p/rust-libp2p/pull/2706/ + # 0.33.0 - Update to `libp2p-core` `v0.33.0`. diff --git a/muxers/mplex/Cargo.toml b/muxers/mplex/Cargo.toml index dd1fb65523d..b4c76620469 100644 --- a/muxers/mplex/Cargo.toml +++ b/muxers/mplex/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-mplex" edition = "2021" rust-version = "1.56.1" description = "Mplex multiplexing protocol for libp2p" -version = "0.33.0" +version = "0.34.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/muxers/mplex/src/lib.rs b/muxers/mplex/src/lib.rs index f2a95c21151..24fc72c2940 100644 --- a/muxers/mplex/src/lib.rs +++ b/muxers/mplex/src/lib.rs @@ -33,7 +33,7 @@ use libp2p_core::{ StreamMuxer, }; use parking_lot::Mutex; -use std::{cmp, iter, task::Context, task::Poll}; +use std::{cmp, iter, pin::Pin, sync::Arc, task::Context, task::Poll}; impl UpgradeInfo for MplexConfig { type Info = &'static [u8]; @@ -54,7 +54,7 @@ where fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future { future::ready(Ok(Multiplex { - io: Mutex::new(io::Multiplexed::new(socket, self)), + io: Arc::new(Mutex::new(io::Multiplexed::new(socket, self))), })) } } @@ -69,7 +69,7 @@ where fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { future::ready(Ok(Multiplex { - io: Mutex::new(io::Multiplexed::new(socket, self)), + io: Arc::new(Mutex::new(io::Multiplexed::new(socket, self))), })) } } @@ -79,14 +79,14 @@ where /// This implementation isn't capable of detecting when the underlying socket changes its address, /// and no [`StreamMuxerEvent::AddressChange`] event is ever emitted. pub struct Multiplex { - io: Mutex>, + io: Arc>>, } impl StreamMuxer for Multiplex where C: AsyncRead + AsyncWrite + Unpin, { - type Substream = Substream; + type Substream = Substream; type OutboundSubstream = OutboundSubstream; type Error = io::Error; @@ -95,7 +95,7 @@ where cx: &mut Context<'_>, ) -> Poll>> { let stream_id = ready!(self.io.lock().poll_next_stream(cx))?; - let stream = Substream::new(stream_id); + let stream = Substream::new(stream_id, self.io.clone()); Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(stream))) } @@ -109,7 +109,7 @@ where _: &mut Self::OutboundSubstream, ) -> Poll> { let stream_id = ready!(self.io.lock().poll_open_stream(cx))?; - Poll::Ready(Ok(Substream::new(stream_id))) + Poll::Ready(Ok(Substream::new(stream_id, self.io.clone()))) } fn destroy_outbound(&self, _substream: Self::OutboundSubstream) { @@ -122,22 +122,7 @@ where substream: &mut Self::Substream, buf: &mut [u8], ) -> Poll> { - loop { - // Try to read from the current (i.e. last received) frame. - if !substream.current_data.is_empty() { - let len = cmp::min(substream.current_data.len(), buf.len()); - buf[..len].copy_from_slice(&substream.current_data.split_to(len)); - return Poll::Ready(Ok(len)); - } - - // Read the next data frame from the multiplexed stream. - match ready!(self.io.lock().poll_read_stream(cx, substream.id))? { - Some(data) => { - substream.current_data = data; - } - None => return Poll::Ready(Ok(0)), - } - } + Pin::new(substream).poll_read(cx, buf) } fn write_substream( @@ -146,7 +131,7 @@ where substream: &mut Self::Substream, buf: &[u8], ) -> Poll> { - self.io.lock().poll_write_stream(cx, substream.id, buf) + Pin::new(substream).poll_write(cx, buf) } fn flush_substream( @@ -154,7 +139,7 @@ where cx: &mut Context<'_>, substream: &mut Self::Substream, ) -> Poll> { - self.io.lock().poll_flush_stream(cx, substream.id) + Pin::new(substream).poll_flush(cx) } fn shutdown_substream( @@ -162,11 +147,11 @@ where cx: &mut Context<'_>, substream: &mut Self::Substream, ) -> Poll> { - self.io.lock().poll_close_stream(cx, substream.id) + Pin::new(substream).poll_close(cx) } fn destroy_substream(&self, sub: Self::Substream) { - self.io.lock().drop_stream(sub.id); + std::mem::drop(sub) } fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { @@ -177,19 +162,98 @@ where /// Active attempt to open an outbound substream. pub struct OutboundSubstream {} +impl AsyncRead for Substream +where + C: AsyncRead + AsyncWrite + Unpin, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + let this = self.get_mut(); + + loop { + // Try to read from the current (i.e. last received) frame. + if !this.current_data.is_empty() { + let len = cmp::min(this.current_data.len(), buf.len()); + buf[..len].copy_from_slice(&this.current_data.split_to(len)); + return Poll::Ready(Ok(len)); + } + + // Read the next data frame from the multiplexed stream. + match ready!(this.io.lock().poll_read_stream(cx, this.id))? { + Some(data) => { + this.current_data = data; + } + None => return Poll::Ready(Ok(0)), + } + } + } +} + +impl AsyncWrite for Substream +where + C: AsyncRead + AsyncWrite + Unpin, +{ + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let this = self.get_mut(); + + this.io.lock().poll_write_stream(cx, this.id, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + this.io.lock().poll_flush_stream(cx, this.id) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + let mut io = this.io.lock(); + + ready!(io.poll_close_stream(cx, this.id))?; + ready!(io.poll_flush_stream(cx, this.id))?; + + Poll::Ready(Ok(())) + } +} + /// Active substream to the remote. -pub struct Substream { +pub struct Substream +where + C: AsyncRead + AsyncWrite + Unpin, +{ /// The unique, local identifier of the substream. id: LocalStreamId, /// The current data frame the substream is reading from. current_data: Bytes, + /// Shared reference to the actual muxer. + io: Arc>>, } -impl Substream { - fn new(id: LocalStreamId) -> Self { +impl Substream +where + C: AsyncRead + AsyncWrite + Unpin, +{ + fn new(id: LocalStreamId, io: Arc>>) -> Self { Self { id, current_data: Bytes::new(), + io, } } } + +impl Drop for Substream +where + C: AsyncRead + AsyncWrite + Unpin, +{ + fn drop(&mut self) { + self.io.lock().drop_stream(self.id); + } +}