diff --git a/.cargo/config.toml b/.cargo/config.toml index 95976aaa800..9d5619d2c4e 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,3 +1,3 @@ [alias] # Temporary solution to have clippy config in a single place until https://github.com/rust-lang/rust-clippy/blob/master/doc/roadmap-2021.md#lintstoml-configuration is shipped. -custom-clippy = "clippy -- -A clippy::type_complexity -A clippy::pedantic -D warnings" +custom-clippy = "clippy --all-features -- -A clippy::type_complexity -A clippy::pedantic -D warnings" diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5cda587983c..3e97069e3a9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -26,7 +26,7 @@ jobs: - uses: actions/checkout@v3 - - uses: Swatinem/rust-cache@cb2cf0cc7c5198d3364b9630e2c3d457f160790c # v1.4.0 + - uses: Swatinem/rust-cache@6720f05bc48b77f96918929a9019fb2203ff71f8 # v2.0.0 with: key: ${{ matrix.args }} @@ -72,7 +72,7 @@ jobs: - name: Install CMake run: sudo apt-get install -y cmake - - uses: Swatinem/rust-cache@cb2cf0cc7c5198d3364b9630e2c3d457f160790c # v1.4.0 + - uses: Swatinem/rust-cache@6720f05bc48b77f96918929a9019fb2203ff71f8 # v2.0.0 with: key: ${{ matrix.toolchain }} @@ -99,7 +99,7 @@ jobs: toolchain: stable override: true - - uses: Swatinem/rust-cache@cb2cf0cc7c5198d3364b9630e2c3d457f160790c # v1.4.0 + - uses: Swatinem/rust-cache@6720f05bc48b77f96918929a9019fb2203ff71f8 # v2.0.0 - name: Check rustdoc links run: RUSTDOCFLAGS="--deny broken_intra_doc_links" cargo doc --verbose --workspace --no-deps --document-private-items --all-features @@ -122,7 +122,7 @@ jobs: override: true components: clippy - - uses: Swatinem/rust-cache@cb2cf0cc7c5198d3364b9630e2c3d457f160790c # v1.4.0 + - uses: Swatinem/rust-cache@6720f05bc48b77f96918929a9019fb2203ff71f8 # v2.0.0 - name: Run cargo clippy uses: actions-rs/cargo@844f36862e911db73fe0815f00a4a2602c279505 # v1.0.3 @@ -147,7 +147,7 @@ jobs: toolchain: stable override: true - - uses: Swatinem/rust-cache@cb2cf0cc7c5198d3364b9630e2c3d457f160790c # v1.4.0 + - uses: Swatinem/rust-cache@6720f05bc48b77f96918929a9019fb2203ff71f8 # v2.0.0 - name: Run ipfs-kad example run: RUST_LOG=libp2p_swarm=debug,libp2p_kad=trace,libp2p_tcp=debug cargo run --example ipfs-kad diff --git a/Cargo.toml b/Cargo.toml index 29606815b30..07ad3879430 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,25 +80,25 @@ instant = "0.1.11" # Explicit dependency to be used in `wasm-bindgen` feature lazy_static = "1.2" libp2p-autonat = { version = "0.6.0", path = "protocols/autonat", optional = true } -libp2p-core = { version = "0.34.0", path = "core", default-features = false } +libp2p-core = { version = "0.35.0", path = "core", default-features = false } libp2p-dcutr = { version = "0.5.0", path = "protocols/dcutr", optional = true } libp2p-floodsub = { version = "0.38.0", path = "protocols/floodsub", optional = true } libp2p-identify = { version = "0.38.0", path = "protocols/identify", optional = true } libp2p-kad = { version = "0.39.0", path = "protocols/kad", optional = true } libp2p-metrics = { version = "0.8.0", path = "misc/metrics", optional = true } -libp2p-mplex = { version = "0.34.0", path = "muxers/mplex", optional = true } -libp2p-noise = { version = "0.37.0", path = "transports/noise", optional = true } +libp2p-mplex = { version = "0.35.0", path = "muxers/mplex", optional = true } +libp2p-noise = { version = "0.38.0", path = "transports/noise", optional = true } libp2p-ping = { version = "0.38.0", path = "protocols/ping", optional = true } -libp2p-plaintext = { version = "0.34.0", path = "transports/plaintext", optional = true } +libp2p-plaintext = { version = "0.35.0", path = "transports/plaintext", optional = true } libp2p-pnet = { version = "0.22.0", path = "transports/pnet", optional = true } libp2p-relay = { version = "0.11.0", path = "protocols/relay", optional = true } libp2p-rendezvous = { version = "0.8.0", path = "protocols/rendezvous", optional = true } libp2p-request-response = { version = "0.20.0", path = "protocols/request-response", optional = true } libp2p-swarm = { version = "0.38.0", path = "swarm" } libp2p-swarm-derive = { version = "0.28.0", path = "swarm-derive" } -libp2p-uds = { version = "0.33.0", path = "transports/uds", optional = true } -libp2p-wasm-ext = { version = "0.34.0", path = "transports/wasm-ext", default-features = false, optional = true } -libp2p-yamux = { version = "0.38.0", path = "muxers/yamux", optional = true } +libp2p-uds = { version = "0.34.0", path = "transports/uds", optional = true } +libp2p-wasm-ext = { version = "0.35.0", path = "transports/wasm-ext", default-features = false, optional = true } +libp2p-yamux = { version = "0.39.0", path = "muxers/yamux", optional = true } multiaddr = { version = "0.14.0" } parking_lot = "0.12.0" pin-project = "1.0.0" @@ -106,12 +106,12 @@ rand = "0.7.3" # Explicit dependency to be used in `wasm-bindgen` feature smallvec = "1.6.1" [target.'cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))'.dependencies] -libp2p-deflate = { version = "0.34.0", path = "transports/deflate", optional = true } -libp2p-dns = { version = "0.34.0", path = "transports/dns", optional = true, default-features = false } +libp2p-deflate = { version = "0.35.0", path = "transports/deflate", optional = true } +libp2p-dns = { version = "0.35.0", path = "transports/dns", optional = true, default-features = false } libp2p-mdns = { version = "0.39.0", path = "protocols/mdns", optional = true } libp2p-quic = { version = "0.7.0", path = "transports/quic", optional = true } -libp2p-tcp = { version = "0.34.0", path = "transports/tcp", default-features = false, optional = true } -libp2p-websocket = { version = "0.36.0", path = "transports/websocket", optional = true } +libp2p-tcp = { version = "0.35.0", path = "transports/tcp", default-features = false, optional = true } +libp2p-websocket = { version = "0.37.0", path = "transports/websocket", optional = true } [target.'cfg(not(target_os = "unknown"))'.dependencies] libp2p-gossipsub = { version = "0.40.0", path = "protocols/gossipsub", optional = true } diff --git a/README.md b/README.md index fb45ac38c10..6e668184a8b 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,8 @@ [![dependency status](https://deps.rs/repo/github/libp2p/rust-libp2p/status.svg?style=flat-square)](https://deps.rs/repo/github/libp2p/rust-libp2p) +[![Crates.io](https://img.shields.io/crates/v/libp2p.svg)](https://crates.io/crates/libp2p) +[![docs.rs](https://img.shields.io/badge/api-rustdoc-blue.svg)](https://docs.rs/libp2p) This repository is the central place for Rust development of the [libp2p](https://libp2p.io) spec. diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index 442499f118a..047a7ac40e2 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -1,6 +1,17 @@ +# 0.35.0 [unreleased] + +- Remove `StreamMuxer::poll_event` in favor of individual functions: `poll_inbound`, `poll_outbound` + and `poll_address_change`. Consequently, `StreamMuxerEvent` is also removed. See [PR 2724]. +- Drop `Unpin` requirement from `SubstreamBox`. See [PR 2762] and [PR 2776]. +- Drop `Sync` requirement on `StreamMuxer` for constructing `StreamMuxerBox`. See [PR 2775]. + +[PR 2724]: https://github.com/libp2p/rust-libp2p/pull/2724 +[PR 2762]: https://github.com/libp2p/rust-libp2p/pull/2762 +[PR 2775]: https://github.com/libp2p/rust-libp2p/pull/2775 +[PR 2776]: https://github.com/libp2p/rust-libp2p/pull/2776 + # 0.34.0 -- Introduce `StreamMuxerEvent::map_inbound_stream`. See [PR 2691]. - Remove `{read,write,flush,shutdown,destroy}_substream` functions from `StreamMuxer` trait in favor of forcing `StreamMuxer::Substream` to implement `AsyncRead + AsyncWrite`. See [PR 2707]. - Replace `Into` bound on `StreamMuxer::Error` with `std::error::Error`. See [PR 2710]. diff --git a/core/Cargo.toml b/core/Cargo.toml index deb6479e433..386dff09669 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-core" edition = "2021" rust-version = "1.56.1" description = "Core traits and structs of libp2p" -version = "0.34.0" +version = "0.35.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/core/src/either.rs b/core/src/either.rs index bce6e05aadf..4b5c20b2929 100644 --- a/core/src/either.rs +++ b/core/src/either.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::{ - muxing::{StreamMuxer, StreamMuxerEvent}, + muxing::StreamMuxer, transport::{ListenerId, Transport, TransportError, TransportEvent}, Multiaddr, ProtocolName, }; @@ -202,60 +202,38 @@ where B: StreamMuxer, { type Substream = EitherOutput; - type OutboundSubstream = EitherOutbound; type Error = EitherError; - fn poll_event( - &self, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { + fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll> { match self { EitherOutput::First(inner) => inner - .poll_event(cx) - .map_err(EitherError::A) - .map_ok(|event| event.map_inbound_stream(EitherOutput::First)), + .poll_inbound(cx) + .map_ok(EitherOutput::First) + .map_err(EitherError::A), EitherOutput::Second(inner) => inner - .poll_event(cx) - .map_err(EitherError::B) - .map_ok(|event| event.map_inbound_stream(EitherOutput::Second)), + .poll_inbound(cx) + .map_ok(EitherOutput::Second) + .map_err(EitherError::B), } } - fn open_outbound(&self) -> Self::OutboundSubstream { + fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll> { match self { - EitherOutput::First(inner) => EitherOutbound::A(inner.open_outbound()), - EitherOutput::Second(inner) => EitherOutbound::B(inner.open_outbound()), - } - } - - fn poll_outbound( - &self, - cx: &mut Context<'_>, - substream: &mut Self::OutboundSubstream, - ) -> Poll> { - match (self, substream) { - (EitherOutput::First(ref inner), EitherOutbound::A(ref mut substream)) => inner - .poll_outbound(cx, substream) - .map(|p| p.map(EitherOutput::First)) + EitherOutput::First(inner) => inner + .poll_outbound(cx) + .map_ok(EitherOutput::First) .map_err(EitherError::A), - (EitherOutput::Second(ref inner), EitherOutbound::B(ref mut substream)) => inner - .poll_outbound(cx, substream) - .map(|p| p.map(EitherOutput::Second)) + EitherOutput::Second(inner) => inner + .poll_outbound(cx) + .map_ok(EitherOutput::Second) .map_err(EitherError::B), - _ => panic!("Wrong API usage"), } } - fn destroy_outbound(&self, substream: Self::OutboundSubstream) { + fn poll_address_change(&self, cx: &mut Context<'_>) -> Poll> { match self { - EitherOutput::First(inner) => match substream { - EitherOutbound::A(substream) => inner.destroy_outbound(substream), - _ => panic!("Wrong API usage"), - }, - EitherOutput::Second(inner) => match substream { - EitherOutbound::B(substream) => inner.destroy_outbound(substream), - _ => panic!("Wrong API usage"), - }, + EitherOutput::First(inner) => inner.poll_address_change(cx).map_err(EitherError::A), + EitherOutput::Second(inner) => inner.poll_address_change(cx).map_err(EitherError::B), } } @@ -267,13 +245,6 @@ where } } -#[derive(Debug, Copy, Clone)] -#[must_use = "futures do nothing unless polled"] -pub enum EitherOutbound { - A(A::OutboundSubstream), - B(B::OutboundSubstream), -} - /// Implements `Future` and dispatches all method calls to either `First` or `Second`. #[pin_project(project = EitherFutureProj)] #[derive(Debug, Copy, Clone)] diff --git a/core/src/identity/ecdsa.rs b/core/src/identity/ecdsa.rs index b883243b13b..81dfec4b4e0 100644 --- a/core/src/identity/ecdsa.rs +++ b/core/src/identity/ecdsa.rs @@ -157,7 +157,7 @@ impl PublicKey { let buf = Self::del_asn1_header(k).ok_or_else(|| { DecodingError::new("failed to parse asn.1 encoded ecdsa p256 public key") })?; - Self::from_bytes(&buf) + Self::from_bytes(buf) } // ecPublicKey (ANSI X9.62 public key type) OID: 1.2.840.10045.2.1 @@ -198,8 +198,8 @@ impl PublicKey { if asn1_head[0] != 0x30 || asn1_head[2] != 0x30 || asn1_head[3] as usize != oids_len - || &oids_buf[..Self::EC_PUBLIC_KEY_OID.len()] != &Self::EC_PUBLIC_KEY_OID - || &oids_buf[Self::EC_PUBLIC_KEY_OID.len()..] != &Self::SECP_256_R1_OID + || oids_buf[..Self::EC_PUBLIC_KEY_OID.len()] != Self::EC_PUBLIC_KEY_OID + || oids_buf[Self::EC_PUBLIC_KEY_OID.len()..] != Self::SECP_256_R1_OID || bitstr_head[0] != 0x03 || bitstr_head[2] != 0x00 { diff --git a/core/src/muxing.rs b/core/src/muxing.rs index 050d8d1bd35..a2bdfa80b37 100644 --- a/core/src/muxing.rs +++ b/core/src/muxing.rs @@ -63,62 +63,25 @@ mod singleton; /// Provides multiplexing for a connection by allowing users to open substreams. /// /// A substream created by a [`StreamMuxer`] is a type that implements [`AsyncRead`] and [`AsyncWrite`]. -/// -/// Inbound substreams are reported via [`StreamMuxer::poll_event`]. -/// Outbound substreams can be opened via [`StreamMuxer::open_outbound`] and subsequent polling via -/// [`StreamMuxer::poll_outbound`]. +/// The [`StreamMuxer`] itself is modelled closely after [`AsyncWrite`]. It features `poll`-style +/// functions that allow the implementation to make progress on various tasks. pub trait StreamMuxer { /// Type of the object that represents the raw substream where data can be read and written. type Substream: AsyncRead + AsyncWrite; - /// Future that will be resolved when the outgoing substream is open. - type OutboundSubstream; - /// Error type of the muxer type Error: std::error::Error; - /// Polls for a connection-wide event. - /// - /// This function behaves the same as a `Stream`. - /// - /// If `Pending` is returned, then the current task will be notified once the muxer - /// is ready to be polled, similar to the API of `Stream::poll()`. - /// Only the latest task that was used to call this method may be notified. - /// - /// It is permissible and common to use this method to perform background - /// work, such as processing incoming packets and polling timers. - /// - /// An error can be generated if the connection has been closed. - fn poll_event( - &self, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>>; + /// Poll for new inbound substreams. + fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll>; - /// Opens a new outgoing substream, and produces the equivalent to a future that will be - /// resolved when it becomes available. - /// - /// The API of `OutboundSubstream` is totally opaque, and the object can only be interfaced - /// through the methods on the `StreamMuxer` trait. - fn open_outbound(&self) -> Self::OutboundSubstream; + /// Poll for a new, outbound substream. + fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll>; - /// Polls the outbound substream. - /// - /// If `Pending` is returned, then the current task will be notified once the substream - /// is ready to be polled, similar to the API of `Future::poll()`. - /// However, for each individual outbound substream, only the latest task that was used to - /// call this method may be notified. + /// Poll for an address change of the underlying connection. /// - /// May panic or produce an undefined result if an earlier polling of the same substream - /// returned `Ready` or `Err`. - fn poll_outbound( - &self, - cx: &mut Context<'_>, - s: &mut Self::OutboundSubstream, - ) -> Poll>; - - /// Destroys an outbound substream future. Use this after the outbound substream has finished, - /// or if you want to interrupt it. - fn destroy_outbound(&self, s: Self::OutboundSubstream); + /// Not all implementations may support this feature. + fn poll_address_change(&self, cx: &mut Context<'_>) -> Poll>; /// Closes this `StreamMuxer`. /// @@ -132,38 +95,3 @@ pub trait StreamMuxer { /// > immediately dropping the muxer. fn poll_close(&self, cx: &mut Context<'_>) -> Poll>; } - -/// Event about a connection, reported by an implementation of [`StreamMuxer`]. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum StreamMuxerEvent { - /// Remote has opened a new substream. Contains the substream in question. - InboundSubstream(T), - - /// Address to the remote has changed. The previous one is now obsolete. - /// - /// > **Note**: This can for example happen when using the QUIC protocol, where the two nodes - /// > can change their IP address while retaining the same QUIC connection. - AddressChange(Multiaddr), -} - -impl StreamMuxerEvent { - /// If `self` is a [`StreamMuxerEvent::InboundSubstream`], returns the content. Otherwise - /// returns `None`. - pub fn into_inbound_substream(self) -> Option { - if let StreamMuxerEvent::InboundSubstream(s) = self { - Some(s) - } else { - None - } - } - - /// Map the stream within [`StreamMuxerEvent::InboundSubstream`] to a new type. - pub fn map_inbound_stream(self, map: impl FnOnce(T) -> O) -> StreamMuxerEvent { - match self { - StreamMuxerEvent::InboundSubstream(stream) => { - StreamMuxerEvent::InboundSubstream(map(stream)) - } - StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr), - } - } -} diff --git a/core/src/muxing/boxed.rs b/core/src/muxing/boxed.rs index ad39ef0532d..8c6467dd7ad 100644 --- a/core/src/muxing/boxed.rs +++ b/core/src/muxing/boxed.rs @@ -1,94 +1,61 @@ -use crate::muxing::StreamMuxerEvent; use crate::StreamMuxer; -use fnv::FnvHashMap; -use futures::{ready, AsyncRead, AsyncWrite}; -use parking_lot::Mutex; +use futures::{AsyncRead, AsyncWrite}; +use multiaddr::Multiaddr; use std::error::Error; use std::fmt; use std::io; use std::io::{IoSlice, IoSliceMut}; use std::pin::Pin; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::task::{Context, Poll}; /// Abstract `StreamMuxer`. pub struct StreamMuxerBox { - inner: Box< - dyn StreamMuxer - + Send - + Sync, - >, + inner: Box + Send>, } /// Abstract type for asynchronous reading and writing. /// /// A [`SubstreamBox`] erases the concrete type it is given and only retains its `AsyncRead` /// and `AsyncWrite` capabilities. -pub struct SubstreamBox(Box); +pub struct SubstreamBox(Pin>); struct Wrap where T: StreamMuxer, { inner: T, - outbound: Mutex>, - next_outbound: AtomicUsize, } impl StreamMuxer for Wrap where T: StreamMuxer, - T::Substream: Send + Unpin + 'static, + T::Substream: Send + 'static, T::Error: Send + Sync + 'static, { type Substream = SubstreamBox; - type OutboundSubstream = usize; // TODO: use a newtype type Error = io::Error; #[inline] - fn poll_event( - &self, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - let event = ready!(self.inner.poll_event(cx).map_err(into_io_error)?) - .map_inbound_stream(SubstreamBox::new); - - Poll::Ready(Ok(event)) - } - - #[inline] - fn open_outbound(&self) -> Self::OutboundSubstream { - let outbound = self.inner.open_outbound(); - let id = self.next_outbound.fetch_add(1, Ordering::Relaxed); - self.outbound.lock().insert(id, outbound); - id + fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_close(cx).map_err(into_io_error) } - #[inline] - fn poll_outbound( - &self, - cx: &mut Context<'_>, - substream: &mut Self::OutboundSubstream, - ) -> Poll> { - let mut list = self.outbound.lock(); - let stream = ready!(self - .inner - .poll_outbound(cx, list.get_mut(substream).unwrap()) - .map_err(into_io_error)?); - - Poll::Ready(Ok(SubstreamBox::new(stream))) + fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll> { + self.inner + .poll_inbound(cx) + .map_ok(SubstreamBox::new) + .map_err(into_io_error) } - #[inline] - fn destroy_outbound(&self, substream: Self::OutboundSubstream) { - let mut list = self.outbound.lock(); + fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll> { self.inner - .destroy_outbound(list.remove(&substream).unwrap()) + .poll_outbound(cx) + .map_ok(SubstreamBox::new) + .map_err(into_io_error) } - #[inline] - fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_close(cx).map_err(into_io_error) + fn poll_address_change(&self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_address_change(cx).map_err(into_io_error) } } @@ -103,16 +70,11 @@ impl StreamMuxerBox { /// Turns a stream muxer into a `StreamMuxerBox`. pub fn new(muxer: T) -> StreamMuxerBox where - T: StreamMuxer + Send + Sync + 'static, - T::OutboundSubstream: Send, - T::Substream: Send + Unpin + 'static, + T: StreamMuxer + Send + 'static, + T::Substream: Send + 'static, T::Error: Send + Sync + 'static, { - let wrap = Wrap { - inner: muxer, - outbound: Mutex::new(Default::default()), - next_outbound: AtomicUsize::new(0), - }; + let wrap = Wrap { inner: muxer }; StreamMuxerBox { inner: Box::new(wrap), @@ -122,46 +84,30 @@ impl StreamMuxerBox { impl StreamMuxer for StreamMuxerBox { type Substream = SubstreamBox; - type OutboundSubstream = usize; // TODO: use a newtype type Error = io::Error; #[inline] - fn poll_event( - &self, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - self.inner.poll_event(cx) + fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_close(cx) } - #[inline] - fn open_outbound(&self) -> Self::OutboundSubstream { - self.inner.open_outbound() + fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_inbound(cx) } - #[inline] - fn poll_outbound( - &self, - cx: &mut Context<'_>, - s: &mut Self::OutboundSubstream, - ) -> Poll> { - self.inner.poll_outbound(cx, s) + fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_outbound(cx) } - #[inline] - fn destroy_outbound(&self, substream: Self::OutboundSubstream) { - self.inner.destroy_outbound(substream) - } - - #[inline] - fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_close(cx) + fn poll_address_change(&self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_address_change(cx) } } impl SubstreamBox { /// Construct a new [`SubstreamBox`] from something that implements [`AsyncRead`] and [`AsyncWrite`]. - pub fn new(stream: S) -> Self { - Self(Box::new(stream)) + pub fn new(stream: S) -> Self { + Self(Box::pin(stream)) } } @@ -172,7 +118,7 @@ impl fmt::Debug for SubstreamBox { } /// Workaround because Rust does not allow `Box`. -trait AsyncReadWrite: AsyncRead + AsyncWrite + Unpin { +trait AsyncReadWrite: AsyncRead + AsyncWrite { /// Helper function to capture the erased inner type. /// /// Used to make the [`Debug`] implementation of [`SubstreamBox`] more useful. @@ -181,7 +127,7 @@ trait AsyncReadWrite: AsyncRead + AsyncWrite + Unpin { impl AsyncReadWrite for S where - S: AsyncRead + AsyncWrite + Unpin, + S: AsyncRead + AsyncWrite, { fn type_name(&self) -> &'static str { std::any::type_name::() @@ -190,44 +136,44 @@ where impl AsyncRead for SubstreamBox { fn poll_read( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - Pin::new(&mut self.get_mut().0).poll_read(cx, buf) + self.0.as_mut().poll_read(cx, buf) } fn poll_read_vectored( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll> { - Pin::new(&mut self.get_mut().0).poll_read_vectored(cx, bufs) + self.0.as_mut().poll_read_vectored(cx, bufs) } } impl AsyncWrite for SubstreamBox { fn poll_write( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - Pin::new(&mut self.get_mut().0).poll_write(cx, buf) + self.0.as_mut().poll_write(cx, buf) } fn poll_write_vectored( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll> { - Pin::new(&mut self.get_mut().0).poll_write_vectored(cx, bufs) + self.0.as_mut().poll_write_vectored(cx, bufs) } - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.get_mut().0).poll_flush(cx) + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.0.as_mut().poll_flush(cx) } - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.get_mut().0).poll_close(cx) + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.0.as_mut().poll_close(cx) } } diff --git a/core/src/muxing/singleton.rs b/core/src/muxing/singleton.rs index c461ed00fc3..d67cb5e9825 100644 --- a/core/src/muxing/singleton.rs +++ b/core/src/muxing/singleton.rs @@ -18,12 +18,10 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::{ - connection::Endpoint, - muxing::{StreamMuxer, StreamMuxerEvent}, -}; +use crate::{connection::Endpoint, muxing::StreamMuxer}; use futures::prelude::*; +use multiaddr::Multiaddr; use std::cell::Cell; use std::{io, task::Context, task::Poll}; @@ -52,55 +50,36 @@ impl SingletonMuxer { } } -/// Outbound substream attempt of the `SingletonMuxer`. -pub struct OutboundSubstream {} - impl StreamMuxer for SingletonMuxer where TSocket: AsyncRead + AsyncWrite + Unpin, { type Substream = TSocket; - type OutboundSubstream = OutboundSubstream; type Error = io::Error; - fn poll_event( - &self, - _: &mut Context<'_>, - ) -> Poll, io::Error>> { + fn poll_inbound(&self, _: &mut Context<'_>) -> Poll> { match self.endpoint { - Endpoint::Dialer => return Poll::Pending, - Endpoint::Listener => {} - } - - if let Some(stream) = self.inner.replace(None) { - Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(stream))) - } else { - Poll::Pending + Endpoint::Dialer => Poll::Pending, + Endpoint::Listener => match self.inner.replace(None) { + None => Poll::Pending, + Some(stream) => Poll::Ready(Ok(stream)), + }, } } - fn open_outbound(&self) -> Self::OutboundSubstream { - OutboundSubstream {} - } - - fn poll_outbound( - &self, - _: &mut Context<'_>, - _: &mut Self::OutboundSubstream, - ) -> Poll> { + fn poll_outbound(&self, _: &mut Context<'_>) -> Poll> { match self.endpoint { - Endpoint::Listener => return Poll::Pending, - Endpoint::Dialer => {} - } - - if let Some(stream) = self.inner.replace(None) { - Poll::Ready(Ok(stream)) - } else { - Poll::Pending + Endpoint::Listener => Poll::Pending, + Endpoint::Dialer => match self.inner.replace(None) { + None => Poll::Pending, + Some(stream) => Poll::Ready(Ok(stream)), + }, } } - fn destroy_outbound(&self, _: Self::OutboundSubstream) {} + fn poll_address_change(&self, _: &mut Context<'_>) -> Poll> { + Poll::Pending + } fn poll_close(&self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) diff --git a/core/src/transport/upgrade.rs b/core/src/transport/upgrade.rs index c872ec955e4..8fc0454794f 100644 --- a/core/src/transport/upgrade.rs +++ b/core/src/transport/upgrade.rs @@ -299,9 +299,8 @@ impl Multiplexed { T::Dial: Send + 'static, T::ListenerUpgrade: Send + 'static, T::Error: Send + Sync, - M: StreamMuxer + Send + Sync + 'static, - M::Substream: Send + Unpin + 'static, - M::OutboundSubstream: Send + 'static, + M: StreamMuxer + Send + 'static, + M::Substream: Send + 'static, M::Error: Send + Sync + 'static, { boxed(self.map(|(i, m), _| (i, StreamMuxerBox::new(m)))) diff --git a/core/tests/transport_upgrade.rs b/core/tests/transport_upgrade.rs index ecba64dfb2f..723a04b0780 100644 --- a/core/tests/transport_upgrade.rs +++ b/core/tests/transport_upgrade.rs @@ -18,8 +18,6 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -mod util; - use futures::prelude::*; use libp2p_core::identity; use libp2p_core::transport::{MemoryTransport, Transport}; @@ -91,11 +89,6 @@ fn upgrade_pipeline() { .apply(HelloUpgrade {}) .apply(HelloUpgrade {}) .multiplex(MplexConfig::default()) - .and_then(|(peer, mplex), _| { - // Gracefully close the connection to allow protocol - // negotiation to complete. - util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex)) - }) .boxed(); let dialer_keys = identity::Keypair::generate_ed25519(); @@ -110,11 +103,6 @@ fn upgrade_pipeline() { .apply(HelloUpgrade {}) .apply(HelloUpgrade {}) .multiplex(MplexConfig::default()) - .and_then(|(peer, mplex), _| { - // Gracefully close the connection to allow protocol - // negotiation to complete. - util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex)) - }) .boxed(); let listen_addr1 = Multiaddr::from(Protocol::Memory(random::())); diff --git a/core/tests/util.rs b/core/tests/util.rs deleted file mode 100644 index 7ca52188a52..00000000000 --- a/core/tests/util.rs +++ /dev/null @@ -1,47 +0,0 @@ -#![allow(dead_code)] - -use futures::prelude::*; -use libp2p_core::muxing::StreamMuxer; -use std::{pin::Pin, task::Context, task::Poll}; - -pub struct CloseMuxer { - state: CloseMuxerState, -} - -impl CloseMuxer { - pub fn new(m: M) -> CloseMuxer { - CloseMuxer { - state: CloseMuxerState::Close(m), - } - } -} - -pub enum CloseMuxerState { - Close(M), - Done, -} - -impl Future for CloseMuxer -where - M: StreamMuxer, - M::Error: From, -{ - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - loop { - match std::mem::replace(&mut self.state, CloseMuxerState::Done) { - CloseMuxerState::Close(muxer) => { - if !muxer.poll_close(cx)?.is_ready() { - self.state = CloseMuxerState::Close(muxer); - return Poll::Pending; - } - return Poll::Ready(Ok(muxer)); - } - CloseMuxerState::Done => panic!(), - } - } - } -} - -impl Unpin for CloseMuxer {} diff --git a/misc/keygen/Cargo.toml b/misc/keygen/Cargo.toml index 614aa2e6bf2..4be54014f0a 100644 --- a/misc/keygen/Cargo.toml +++ b/misc/keygen/Cargo.toml @@ -13,5 +13,5 @@ clap = {version = "3.1.6", features = ["derive"]} zeroize = "1" serde = { version = "1.0.136", features = ["derive"] } serde_json = "1.0.79" -libp2p-core = { path = "../../core", default-features = false, version = "0.34.0"} +libp2p-core = { path = "../../core", default-features = false, version = "0.35.0"} base64 = "0.13.0" diff --git a/misc/metrics/CHANGELOG.md b/misc/metrics/CHANGELOG.md index 979617bdb29..06e2163597e 100644 --- a/misc/metrics/CHANGELOG.md +++ b/misc/metrics/CHANGELOG.md @@ -12,6 +12,16 @@ - Update to `libp2p-kad` `v0.39.0`. +- Track number of connected nodes supporting a specific protocol via the identify protocol. See [PR 2734]. + +- Update to `libp2p-core` `v0.35.0`. + +- Update to `prometheus-client` `v0.17.0`. See [PR 2761]. + +[PR 2761]: https://github.com/libp2p/rust-libp2p/pull/2761/ + +[PR 2734]: https://github.com/libp2p/rust-libp2p/pull/2734/ + # 0.7.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/misc/metrics/Cargo.toml b/misc/metrics/Cargo.toml index f38e192947f..7ccd259e535 100644 --- a/misc/metrics/Cargo.toml +++ b/misc/metrics/Cargo.toml @@ -19,14 +19,14 @@ relay = ["libp2p-relay"] dcutr = ["libp2p-dcutr"] [dependencies] -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } libp2p-dcutr = { version = "0.5.0", path = "../../protocols/dcutr", optional = true } libp2p-identify = { version = "0.38.0", path = "../../protocols/identify", optional = true } libp2p-kad = { version = "0.39.0", path = "../../protocols/kad", optional = true } libp2p-ping = { version = "0.38.0", path = "../../protocols/ping", optional = true } libp2p-relay = { version = "0.11.0", path = "../../protocols/relay", optional = true } libp2p-swarm = { version = "0.38.0", path = "../../swarm" } -prometheus-client = "0.16.0" +prometheus-client = "0.17.0" [target.'cfg(not(target_os = "unknown"))'.dependencies] libp2p-gossipsub = { version = "0.40.0", path = "../../protocols/gossipsub", optional = true } diff --git a/misc/metrics/src/dcutr.rs b/misc/metrics/src/dcutr.rs index 27dcbc08dc8..b90e784f9b7 100644 --- a/misc/metrics/src/dcutr.rs +++ b/misc/metrics/src/dcutr.rs @@ -77,10 +77,9 @@ impl From<&libp2p_dcutr::behaviour::Event> for EventType { } } -impl super::Recorder for super::Metrics { +impl super::Recorder for Metrics { fn record(&self, event: &libp2p_dcutr::behaviour::Event) { - self.dcutr - .events + self.events .get_or_create(&EventLabels { event: event.into(), }) diff --git a/misc/metrics/src/gossipsub.rs b/misc/metrics/src/gossipsub.rs index 0bb6af5f452..a82c1a72a24 100644 --- a/misc/metrics/src/gossipsub.rs +++ b/misc/metrics/src/gossipsub.rs @@ -40,10 +40,10 @@ impl Metrics { } } -impl super::Recorder for super::Metrics { +impl super::Recorder for Metrics { fn record(&self, event: &libp2p_gossipsub::GossipsubEvent) { if let libp2p_gossipsub::GossipsubEvent::Message { .. } = event { - self.gossipsub.messages.inc(); + self.messages.inc(); } } } diff --git a/misc/metrics/src/identify.rs b/misc/metrics/src/identify.rs index 7431eda5d25..730528167a8 100644 --- a/misc/metrics/src/identify.rs +++ b/misc/metrics/src/identify.rs @@ -18,12 +18,18 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use libp2p_core::PeerId; +use prometheus_client::encoding::text::{EncodeMetric, Encoder}; use prometheus_client::metrics::counter::Counter; use prometheus_client::metrics::histogram::{exponential_buckets, Histogram}; +use prometheus_client::metrics::MetricType; use prometheus_client::registry::Registry; +use std::collections::HashMap; use std::iter; +use std::sync::{Arc, Mutex}; pub struct Metrics { + protocols: Protocols, error: Counter, pushed: Counter, received: Counter, @@ -36,6 +42,15 @@ impl Metrics { pub fn new(registry: &mut Registry) -> Self { let sub_registry = registry.sub_registry_with_prefix("identify"); + let protocols = Protocols::default(); + sub_registry.register( + "protocols", + "Number of connected nodes supporting a specific protocol, with \ + \"unrecognized\" for each peer supporting one or more unrecognized \ + protocols", + Box::new(protocols.clone()), + ); + let error = Counter::default(); sub_registry.register( "errors", @@ -86,6 +101,7 @@ impl Metrics { ); Self { + protocols, error, pushed, received, @@ -96,27 +112,136 @@ impl Metrics { } } -impl super::Recorder for super::Metrics { +impl super::Recorder for Metrics { fn record(&self, event: &libp2p_identify::IdentifyEvent) { match event { libp2p_identify::IdentifyEvent::Error { .. } => { - self.identify.error.inc(); + self.error.inc(); } libp2p_identify::IdentifyEvent::Pushed { .. } => { - self.identify.pushed.inc(); + self.pushed.inc(); } - libp2p_identify::IdentifyEvent::Received { info, .. } => { - self.identify.received.inc(); - self.identify - .received_info_protocols + libp2p_identify::IdentifyEvent::Received { peer_id, info, .. } => { + { + let mut protocols: Vec = info + .protocols + .iter() + .filter(|p| { + let allowed_protocols: &[&[u8]] = &[ + #[cfg(feature = "dcutr")] + libp2p_dcutr::PROTOCOL_NAME, + // #[cfg(feature = "gossipsub")] + // #[cfg(not(target_os = "unknown"))] + // TODO: Add Gossipsub protocol name + libp2p_identify::PROTOCOL_NAME, + libp2p_identify::PUSH_PROTOCOL_NAME, + #[cfg(feature = "kad")] + libp2p_kad::protocol::DEFAULT_PROTO_NAME, + #[cfg(feature = "ping")] + libp2p_ping::PROTOCOL_NAME, + #[cfg(feature = "relay")] + libp2p_relay::v2::STOP_PROTOCOL_NAME, + #[cfg(feature = "relay")] + libp2p_relay::v2::HOP_PROTOCOL_NAME, + ]; + + allowed_protocols.contains(&p.as_bytes()) + }) + .cloned() + .collect(); + + // Signal via an additional label value that one or more + // protocols of the remote peer have not been recognized. + if protocols.len() < info.protocols.len() { + protocols.push("unrecognized".to_string()); + } + + protocols.sort_unstable(); + protocols.dedup(); + + self.protocols.add(*peer_id, protocols); + } + + self.received.inc(); + self.received_info_protocols .observe(info.protocols.len() as f64); - self.identify - .received_info_listen_addrs + self.received_info_listen_addrs .observe(info.listen_addrs.len() as f64); } libp2p_identify::IdentifyEvent::Sent { .. } => { - self.identify.sent.inc(); + self.sent.inc(); } } } } + +impl super::Recorder> for Metrics { + fn record(&self, event: &libp2p_swarm::SwarmEvent) { + if let libp2p_swarm::SwarmEvent::ConnectionClosed { + peer_id, + num_established, + .. + } = event + { + if *num_established == 0 { + self.protocols.remove(*peer_id) + } + } + } +} + +#[derive(Default, Clone)] +struct Protocols { + peers: Arc>>>, +} + +impl Protocols { + fn add(&self, peer: PeerId, protocols: Vec) { + self.peers + .lock() + .expect("Lock not to be poisoned") + .insert(peer, protocols); + } + + fn remove(&self, peer: PeerId) { + self.peers + .lock() + .expect("Lock not to be poisoned") + .remove(&peer); + } +} + +impl EncodeMetric for Protocols { + fn encode(&self, mut encoder: Encoder) -> Result<(), std::io::Error> { + let count_by_protocol = self + .peers + .lock() + .expect("Lock not to be poisoned") + .iter() + .fold( + HashMap::::default(), + |mut acc, (_, protocols)| { + for protocol in protocols { + let count = acc.entry(protocol.to_string()).or_default(); + *count += 1; + } + acc + }, + ); + + for (protocol, count) in count_by_protocol { + encoder + .with_label_set(&("protocol", protocol)) + .no_suffix()? + .no_bucket()? + .encode_value(count)? + .no_exemplar()?; + } + + Ok(()) + } + + fn metric_type(&self) -> MetricType { + MetricType::Gauge + } +} diff --git a/misc/metrics/src/kad.rs b/misc/metrics/src/kad.rs index 8ab71befe91..5e5a1056060 100644 --- a/misc/metrics/src/kad.rs +++ b/misc/metrics/src/kad.rs @@ -159,25 +159,21 @@ impl Metrics { } } -impl super::Recorder for super::Metrics { +impl super::Recorder for Metrics { fn record(&self, event: &libp2p_kad::KademliaEvent) { match event { libp2p_kad::KademliaEvent::OutboundQueryCompleted { result, stats, .. } => { - self.kad - .query_result_num_requests + self.query_result_num_requests .get_or_create(&result.into()) .observe(stats.num_requests().into()); - self.kad - .query_result_num_success + self.query_result_num_success .get_or_create(&result.into()) .observe(stats.num_successes().into()); - self.kad - .query_result_num_failure + self.query_result_num_failure .get_or_create(&result.into()) .observe(stats.num_failures().into()); if let Some(duration) = stats.duration() { - self.kad - .query_result_duration + self.query_result_duration .get_or_create(&result.into()) .observe(duration.as_secs_f64()); } @@ -185,36 +181,30 @@ impl super::Recorder for super::Metrics { match result { libp2p_kad::QueryResult::GetRecord(result) => match result { Ok(ok) => self - .kad .query_result_get_record_ok .observe(ok.records.len() as f64), Err(error) => { - self.kad - .query_result_get_record_error + self.query_result_get_record_error .get_or_create(&error.into()) .inc(); } }, libp2p_kad::QueryResult::GetClosestPeers(result) => match result { Ok(ok) => self - .kad .query_result_get_closest_peers_ok .observe(ok.peers.len() as f64), Err(error) => { - self.kad - .query_result_get_closest_peers_error + self.query_result_get_closest_peers_error .get_or_create(&error.into()) .inc(); } }, libp2p_kad::QueryResult::GetProviders(result) => match result { Ok(ok) => self - .kad .query_result_get_providers_ok .observe(ok.providers.len() as f64), Err(error) => { - self.kad - .query_result_get_providers_error + self.query_result_get_providers_error .get_or_create(&error.into()) .inc(); } @@ -230,16 +220,14 @@ impl super::Recorder for super::Metrics { } => { let bucket = low.ilog2().unwrap_or(0); if *is_new_peer { - self.kad - .routing_updated + self.routing_updated .get_or_create(&RoutingUpdated { action: RoutingAction::Added, bucket, }) .inc(); } else { - self.kad - .routing_updated + self.routing_updated .get_or_create(&RoutingUpdated { action: RoutingAction::Updated, bucket, @@ -248,8 +236,7 @@ impl super::Recorder for super::Metrics { } if old_peer.is_some() { - self.kad - .routing_updated + self.routing_updated .get_or_create(&RoutingUpdated { action: RoutingAction::Evicted, bucket, @@ -259,10 +246,7 @@ impl super::Recorder for super::Metrics { } libp2p_kad::KademliaEvent::InboundRequest { request } => { - self.kad - .inbound_requests - .get_or_create(&request.into()) - .inc(); + self.inbound_requests.get_or_create(&request.into()).inc(); } _ => {} } diff --git a/misc/metrics/src/lib.rs b/misc/metrics/src/lib.rs index 634d13590df..d9fa3c40ffe 100644 --- a/misc/metrics/src/lib.rs +++ b/misc/metrics/src/lib.rs @@ -95,3 +95,55 @@ pub trait Recorder { /// Record the given event. fn record(&self, event: &Event); } + +#[cfg(feature = "dcutr")] +impl Recorder for Metrics { + fn record(&self, event: &libp2p_dcutr::behaviour::Event) { + self.dcutr.record(event) + } +} + +#[cfg(feature = "gossipsub")] +#[cfg(not(target_os = "unknown"))] +impl Recorder for Metrics { + fn record(&self, event: &libp2p_gossipsub::GossipsubEvent) { + self.gossipsub.record(event) + } +} + +#[cfg(feature = "identify")] +impl Recorder for Metrics { + fn record(&self, event: &libp2p_identify::IdentifyEvent) { + self.identify.record(event) + } +} + +#[cfg(feature = "kad")] +impl Recorder for Metrics { + fn record(&self, event: &libp2p_kad::KademliaEvent) { + self.kad.record(event) + } +} + +#[cfg(feature = "ping")] +impl Recorder for Metrics { + fn record(&self, event: &libp2p_ping::PingEvent) { + self.ping.record(event) + } +} + +#[cfg(feature = "relay")] +impl Recorder for Metrics { + fn record(&self, event: &libp2p_relay::v2::relay::Event) { + self.relay.record(event) + } +} + +impl Recorder> for Metrics { + fn record(&self, event: &libp2p_swarm::SwarmEvent) { + self.swarm.record(event); + + #[cfg(feature = "identify")] + self.identify.record(event) + } +} diff --git a/misc/metrics/src/ping.rs b/misc/metrics/src/ping.rs index 76d50b54d17..b7c3ef60f9b 100644 --- a/misc/metrics/src/ping.rs +++ b/misc/metrics/src/ping.rs @@ -92,17 +92,17 @@ impl Metrics { } } -impl super::Recorder for super::Metrics { +impl super::Recorder for Metrics { fn record(&self, event: &libp2p_ping::PingEvent) { match &event.result { Ok(libp2p_ping::PingSuccess::Pong) => { - self.ping.pong_received.inc(); + self.pong_received.inc(); } Ok(libp2p_ping::PingSuccess::Ping { rtt }) => { - self.ping.rtt.observe(rtt.as_secs_f64()); + self.rtt.observe(rtt.as_secs_f64()); } Err(failure) => { - self.ping.failure.get_or_create(&failure.into()).inc(); + self.failure.get_or_create(&failure.into()).inc(); } } } diff --git a/misc/metrics/src/relay.rs b/misc/metrics/src/relay.rs index 479dcaab724..9267a975b08 100644 --- a/misc/metrics/src/relay.rs +++ b/misc/metrics/src/relay.rs @@ -102,10 +102,9 @@ impl From<&libp2p_relay::v2::relay::Event> for EventType { } } -impl super::Recorder for super::Metrics { +impl super::Recorder for Metrics { fn record(&self, event: &libp2p_relay::v2::relay::Event) { - self.relay - .events + self.events .get_or_create(&EventLabels { event: event.into(), }) diff --git a/misc/metrics/src/swarm.rs b/misc/metrics/src/swarm.rs index d0fb0c664f2..e9c5a0493ce 100644 --- a/misc/metrics/src/swarm.rs +++ b/misc/metrics/src/swarm.rs @@ -138,34 +138,29 @@ impl Metrics { } } -impl super::Recorder> - for super::Metrics -{ +impl super::Recorder> for Metrics { fn record(&self, event: &libp2p_swarm::SwarmEvent) { match event { libp2p_swarm::SwarmEvent::Behaviour(_) => {} libp2p_swarm::SwarmEvent::ConnectionEstablished { endpoint, .. } => { - self.swarm - .connections_established + self.connections_established .get_or_create(&ConnectionEstablishedLabels { role: endpoint.into(), }) .inc(); } libp2p_swarm::SwarmEvent::ConnectionClosed { endpoint, .. } => { - self.swarm - .connections_closed + self.connections_closed .get_or_create(&ConnectionClosedLabels { role: endpoint.into(), }) .inc(); } libp2p_swarm::SwarmEvent::IncomingConnection { .. } => { - self.swarm.connections_incoming.inc(); + self.connections_incoming.inc(); } libp2p_swarm::SwarmEvent::IncomingConnectionError { error, .. } => { - self.swarm - .connections_incoming_error + self.connections_incoming_error .get_or_create(&IncomingConnectionErrorLabels { error: error.into(), }) @@ -178,8 +173,7 @@ impl super::Recorder super::Recorder { - self.swarm.connected_to_banned_peer.inc(); + self.connected_to_banned_peer.inc(); } libp2p_swarm::SwarmEvent::NewListenAddr { .. } => { - self.swarm.new_listen_addr.inc(); + self.new_listen_addr.inc(); } libp2p_swarm::SwarmEvent::ExpiredListenAddr { .. } => { - self.swarm.expired_listen_addr.inc(); + self.expired_listen_addr.inc(); } libp2p_swarm::SwarmEvent::ListenerClosed { .. } => { - self.swarm.listener_closed.inc(); + self.listener_closed.inc(); } libp2p_swarm::SwarmEvent::ListenerError { .. } => { - self.swarm.listener_error.inc(); + self.listener_error.inc(); } libp2p_swarm::SwarmEvent::Dialing(_) => { - self.swarm.dial_attempt.inc(); + self.dial_attempt.inc(); } } } diff --git a/muxers/mplex/CHANGELOG.md b/muxers/mplex/CHANGELOG.md index add3d1ace0d..6b374e1b66c 100644 --- a/muxers/mplex/CHANGELOG.md +++ b/muxers/mplex/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.35.0 [unreleased] + +- Update to `libp2p-core` `v0.35.0` + # 0.34.0 - `Substream` now implements `AsyncRead` and `AsyncWrite`. See [PR 2706]. diff --git a/muxers/mplex/Cargo.toml b/muxers/mplex/Cargo.toml index 3b5a82cd959..ac053a5020f 100644 --- a/muxers/mplex/Cargo.toml +++ b/muxers/mplex/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-mplex" edition = "2021" rust-version = "1.56.1" description = "Mplex multiplexing protocol for libp2p" -version = "0.34.0" +version = "0.35.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -14,7 +14,7 @@ categories = ["network-programming", "asynchronous"] bytes = "1" futures = "0.3.1" asynchronous-codec = "0.6" -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } log = "0.4" nohash-hasher = "0.2" parking_lot = "0.12" diff --git a/muxers/mplex/benches/split_send_size.rs b/muxers/mplex/benches/split_send_size.rs index f5bf771d1ab..d536edf4c8a 100644 --- a/muxers/mplex/benches/split_send_size.rs +++ b/muxers/mplex/benches/split_send_size.rs @@ -114,11 +114,9 @@ fn run( } transport::TransportEvent::Incoming { upgrade, .. } => { let (_peer, conn) = upgrade.await.unwrap(); - let mut s = poll_fn(|cx| conn.poll_event(cx)) + let mut s = poll_fn(|cx| conn.poll_inbound(cx)) .await - .expect("unexpected error") - .into_inbound_substream() - .expect("Unexpected muxer event"); + .expect("unexpected error"); let mut buf = vec![0u8; payload_len]; let mut off = 0; @@ -143,10 +141,7 @@ fn run( let sender = async move { let addr = addr_receiver.await.unwrap(); let (_peer, conn) = sender_trans.dial(addr).unwrap().await.unwrap(); - let mut handle = conn.open_outbound(); - let mut stream = poll_fn(|cx| conn.poll_outbound(cx, &mut handle)) - .await - .unwrap(); + let mut stream = poll_fn(|cx| conn.poll_outbound(cx)).await.unwrap(); let mut off = 0; loop { let n = poll_fn(|cx| Pin::new(&mut stream).poll_write(cx, &payload[off..])) diff --git a/muxers/mplex/src/lib.rs b/muxers/mplex/src/lib.rs index 80b1db16481..59b38db1156 100644 --- a/muxers/mplex/src/lib.rs +++ b/muxers/mplex/src/lib.rs @@ -28,9 +28,8 @@ use bytes::Bytes; use codec::LocalStreamId; use futures::{future, prelude::*, ready}; use libp2p_core::{ - muxing::StreamMuxerEvent, upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}, - StreamMuxer, + Multiaddr, StreamMuxer, }; use parking_lot::Mutex; use std::{cmp, iter, pin::Pin, sync::Arc, task::Context, task::Poll}; @@ -75,9 +74,6 @@ where } /// Multiplexer. Implements the `StreamMuxer` trait. -/// -/// This implementation isn't capable of detecting when the underlying socket changes its address, -/// and no [`StreamMuxerEvent::AddressChange`] event is ever emitted. pub struct Multiplex { io: Arc>>, } @@ -87,33 +83,24 @@ where C: AsyncRead + AsyncWrite + Unpin, { type Substream = Substream; - type OutboundSubstream = OutboundSubstream; type Error = io::Error; - fn poll_event( - &self, - cx: &mut Context<'_>, - ) -> Poll>> { - let stream_id = ready!(self.io.lock().poll_next_stream(cx))?; - let stream = Substream::new(stream_id, self.io.clone()); - Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(stream))) + fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll> { + self.io + .lock() + .poll_next_stream(cx) + .map_ok(|stream_id| Substream::new(stream_id, self.io.clone())) } - fn open_outbound(&self) -> Self::OutboundSubstream { - OutboundSubstream {} + fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll> { + self.io + .lock() + .poll_open_stream(cx) + .map_ok(|stream_id| Substream::new(stream_id, self.io.clone())) } - fn poll_outbound( - &self, - cx: &mut Context<'_>, - _: &mut Self::OutboundSubstream, - ) -> Poll> { - let stream_id = ready!(self.io.lock().poll_open_stream(cx))?; - Poll::Ready(Ok(Substream::new(stream_id, self.io.clone()))) - } - - fn destroy_outbound(&self, _substream: Self::OutboundSubstream) { - // Nothing to do, since `open_outbound` creates no new local state. + fn poll_address_change(&self, _: &mut Context<'_>) -> Poll> { + Poll::Pending } fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { @@ -121,9 +108,6 @@ where } } -/// Active attempt to open an outbound substream. -pub struct OutboundSubstream {} - impl AsyncRead for Substream where C: AsyncRead + AsyncWrite + Unpin, diff --git a/muxers/mplex/tests/async_write.rs b/muxers/mplex/tests/async_write.rs index 9dbda1a198d..2c4a2d10f0d 100644 --- a/muxers/mplex/tests/async_write.rs +++ b/muxers/mplex/tests/async_write.rs @@ -22,7 +22,6 @@ use futures::future::poll_fn; use futures::{channel::oneshot, prelude::*}; use libp2p_core::{upgrade, StreamMuxer, Transport}; use libp2p_tcp::TcpTransport; -use std::sync::Arc; #[test] fn async_write() { @@ -60,10 +59,7 @@ fn async_write() { .await .unwrap(); - let mut outbound_token = client.open_outbound(); - let mut outbound = poll_fn(|cx| client.poll_outbound(cx, &mut outbound_token)) - .await - .unwrap(); + let mut outbound = poll_fn(|cx| client.poll_outbound(cx)).await.unwrap(); let mut buf = Vec::new(); outbound.read_to_end(&mut buf).await.unwrap(); @@ -75,16 +71,8 @@ fn async_write() { let mut transport = TcpTransport::default() .and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1)); - let client = Arc::new(transport.dial(rx.await.unwrap()).unwrap().await.unwrap()); - let mut inbound = loop { - if let Some(s) = poll_fn(|cx| client.poll_event(cx)) - .await - .unwrap() - .into_inbound_substream() - { - break s; - } - }; + let client = transport.dial(rx.await.unwrap()).unwrap().await.unwrap(); + let mut inbound = poll_fn(|cx| client.poll_inbound(cx)).await.unwrap(); inbound.write_all(b"hello world").await.unwrap(); // The test consists in making sure that this flushes the substream. diff --git a/muxers/mplex/tests/two_peers.rs b/muxers/mplex/tests/two_peers.rs index 4283452fe07..2b976c12fea 100644 --- a/muxers/mplex/tests/two_peers.rs +++ b/muxers/mplex/tests/two_peers.rs @@ -22,7 +22,6 @@ use futures::future::poll_fn; use futures::{channel::oneshot, prelude::*}; use libp2p_core::{upgrade, StreamMuxer, Transport}; use libp2p_tcp::TcpTransport; -use std::sync::Arc; #[test] fn client_to_server_outbound() { @@ -60,10 +59,7 @@ fn client_to_server_outbound() { .await .unwrap(); - let mut outbound_token = client.open_outbound(); - let mut outbound = poll_fn(|cx| client.poll_outbound(cx, &mut outbound_token)) - .await - .unwrap(); + let mut outbound = poll_fn(|cx| client.poll_outbound(cx)).await.unwrap(); let mut buf = Vec::new(); outbound.read_to_end(&mut buf).await.unwrap(); @@ -76,16 +72,8 @@ fn client_to_server_outbound() { .and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1)) .boxed(); - let client = Arc::new(transport.dial(rx.await.unwrap()).unwrap().await.unwrap()); - let mut inbound = loop { - if let Some(s) = poll_fn(|cx| client.poll_event(cx)) - .await - .unwrap() - .into_inbound_substream() - { - break s; - } - }; + let client = transport.dial(rx.await.unwrap()).unwrap().await.unwrap(); + let mut inbound = poll_fn(|cx| client.poll_inbound(cx)).await.unwrap(); inbound.write_all(b"hello world").await.unwrap(); inbound.close().await.unwrap(); @@ -119,27 +107,17 @@ fn client_to_server_inbound() { tx.send(addr).unwrap(); - let client = Arc::new( - transport - .next() - .await - .expect("some event") - .into_incoming() - .unwrap() - .0 - .await - .unwrap(), - ); - - let mut inbound = loop { - if let Some(s) = poll_fn(|cx| client.poll_event(cx)) - .await - .unwrap() - .into_inbound_substream() - { - break s; - } - }; + let client = transport + .next() + .await + .expect("some event") + .into_incoming() + .unwrap() + .0 + .await + .unwrap(); + + let mut inbound = poll_fn(|cx| client.poll_inbound(cx)).await.unwrap(); let mut buf = Vec::new(); inbound.read_to_end(&mut buf).await.unwrap(); @@ -154,10 +132,7 @@ fn client_to_server_inbound() { let client = transport.dial(rx.await.unwrap()).unwrap().await.unwrap(); - let mut outbound_token = client.open_outbound(); - let mut outbound = poll_fn(|cx| client.poll_outbound(cx, &mut outbound_token)) - .await - .unwrap(); + let mut outbound = poll_fn(|cx| client.poll_outbound(cx)).await.unwrap(); outbound.write_all(b"hello world").await.unwrap(); outbound.close().await.unwrap(); @@ -199,10 +174,7 @@ fn protocol_not_match() { .await .unwrap(); - let mut outbound_token = client.open_outbound(); - let mut outbound = poll_fn(|cx| client.poll_outbound(cx, &mut outbound_token)) - .await - .unwrap(); + let mut outbound = poll_fn(|cx| client.poll_outbound(cx)).await.unwrap(); let mut buf = Vec::new(); outbound.read_to_end(&mut buf).await.unwrap(); diff --git a/muxers/yamux/CHANGELOG.md b/muxers/yamux/CHANGELOG.md index 95d01fbbfcd..e8eded11836 100644 --- a/muxers/yamux/CHANGELOG.md +++ b/muxers/yamux/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.39.0 [unreleased] + +- Update to `libp2p-core` `v0.35.0` + # 0.38.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/muxers/yamux/Cargo.toml b/muxers/yamux/Cargo.toml index a7c55f08949..02dfb832d8d 100644 --- a/muxers/yamux/Cargo.toml +++ b/muxers/yamux/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-yamux" edition = "2021" rust-version = "1.56.1" description = "Yamux multiplexing protocol for libp2p" -version = "0.38.0" +version = "0.39.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -12,7 +12,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } parking_lot = "0.12" thiserror = "1.0" yamux = "0.10.0" diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index 8eb6fb3e895..a06e7934cf0 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -24,11 +24,11 @@ use futures::{ future, prelude::*, - ready, stream::{BoxStream, LocalBoxStream}, }; -use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; +use libp2p_core::muxing::StreamMuxer; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use libp2p_core::Multiaddr; use parking_lot::Mutex; use std::{ fmt, io, iter, mem, @@ -36,6 +36,7 @@ use std::{ task::{Context, Poll}, }; use thiserror::Error; +use yamux::ConnectionError; /// A Yamux connection. pub struct Yamux(Mutex>); @@ -97,44 +98,35 @@ where pub type YamuxResult = Result; -/// > **Note**: This implementation never emits [`StreamMuxerEvent::AddressChange`] events. impl StreamMuxer for Yamux where S: Stream> + Unpin, { type Substream = yamux::Stream; - type OutboundSubstream = OpenSubstreamToken; type Error = YamuxError; - fn poll_event( - &self, - c: &mut Context<'_>, - ) -> Poll>> { - let mut inner = self.0.lock(); - match ready!(inner.incoming.poll_next_unpin(c)) { - Some(Ok(s)) => Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(s))), - Some(Err(e)) => Poll::Ready(Err(e)), - None => Poll::Ready(Err(yamux::ConnectionError::Closed.into())), - } - } - - fn open_outbound(&self) -> Self::OutboundSubstream { - OpenSubstreamToken(()) + fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll> { + self.0 + .lock() + .incoming + .poll_next_unpin(cx) + .map(|maybe_stream| { + let stream = maybe_stream + .transpose()? + .ok_or(YamuxError(ConnectionError::Closed))?; + + Ok(stream) + }) } - fn poll_outbound( - &self, - c: &mut Context<'_>, - _: &mut OpenSubstreamToken, - ) -> Poll> { - let mut inner = self.0.lock(); - Pin::new(&mut inner.control) - .poll_open_stream(c) + fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.0.lock().control) + .poll_open_stream(cx) .map_err(YamuxError) } - fn destroy_outbound(&self, _: Self::OutboundSubstream) { - self.0.lock().control.abort_open_stream() + fn poll_address_change(&self, _: &mut Context<'_>) -> Poll> { + Poll::Pending } fn poll_close(&self, c: &mut Context<'_>) -> Poll> { diff --git a/protocols/autonat/CHANGELOG.md b/protocols/autonat/CHANGELOG.md index eafaecff2a5..0c48b41951b 100644 --- a/protocols/autonat/CHANGELOG.md +++ b/protocols/autonat/CHANGELOG.md @@ -4,6 +4,8 @@ - Update to `libp2p-request-response` `v0.20.0`. +- Update to `libp2p-core` `v0.35.0`. + # 0.5.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/protocols/autonat/Cargo.toml b/protocols/autonat/Cargo.toml index 9e0d272785d..44f3c2b894d 100644 --- a/protocols/autonat/Cargo.toml +++ b/protocols/autonat/Cargo.toml @@ -18,7 +18,7 @@ async-trait = "0.1" futures = "0.3" futures-timer = "3.0" instant = "0.1" -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } libp2p-swarm = { version = "0.38.0", path = "../../swarm" } libp2p-request-response = { version = "0.20.0", path = "../request-response" } log = "0.4" diff --git a/protocols/dcutr/CHANGELOG.md b/protocols/dcutr/CHANGELOG.md index 7d144a20772..f740b188455 100644 --- a/protocols/dcutr/CHANGELOG.md +++ b/protocols/dcutr/CHANGELOG.md @@ -2,6 +2,12 @@ - Update to `libp2p-swarm` `v0.38.0`. +- Expose `PROTOCOL_NAME`. See [PR 2734]. + +- Update to `libp2p-core` `v0.35.0`. + +[PR 2734]: https://github.com/libp2p/rust-libp2p/pull/2734/ + # 0.4.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/protocols/dcutr/Cargo.toml b/protocols/dcutr/Cargo.toml index 1786412cadf..6f8c4c284fd 100644 --- a/protocols/dcutr/Cargo.toml +++ b/protocols/dcutr/Cargo.toml @@ -17,7 +17,7 @@ either = "1.6.0" futures = "0.3.1" futures-timer = "3.0" instant = "0.1.11" -libp2p-core = { version = "0.34.0", path = "../../core" } +libp2p-core = { version = "0.35.0", path = "../../core" } libp2p-swarm = { version = "0.38.0", path = "../../swarm" } log = "0.4" prost-codec = { version = "0.1", path = "../../misc/prost-codec" } diff --git a/protocols/dcutr/src/behaviour.rs b/protocols/dcutr/src/behaviour.rs index 893148890b7..5d93d90b339 100644 --- a/protocols/dcutr/src/behaviour.rs +++ b/protocols/dcutr/src/behaviour.rs @@ -65,6 +65,7 @@ pub enum UpgradeError { Handler(ConnectionHandlerUpgrErr), } +#[derive(Default)] pub struct Behaviour { /// Queue of actions to return when polled. queued_actions: VecDeque, @@ -145,40 +146,35 @@ impl NetworkBehaviour for Behaviour { handler: Self::ConnectionHandler, _error: &DialError, ) { - match handler { - handler::Prototype::DirectConnection { - relayed_connection_id, - role: handler::Role::Initiator { attempt }, - } => { - let peer_id = - peer_id.expect("Peer of `Prototype::DirectConnection` is always known."); - if attempt < MAX_NUMBER_OF_UPGRADE_ATTEMPTS { - self.queued_actions.push_back(ActionBuilder::Connect { + if let handler::Prototype::DirectConnection { + relayed_connection_id, + role: handler::Role::Initiator { attempt }, + } = handler + { + let peer_id = peer_id.expect("Peer of `Prototype::DirectConnection` is always known."); + if attempt < MAX_NUMBER_OF_UPGRADE_ATTEMPTS { + self.queued_actions.push_back(ActionBuilder::Connect { + peer_id, + handler: NotifyHandler::One(relayed_connection_id), + attempt: attempt + 1, + }); + } else { + self.queued_actions.extend([ + NetworkBehaviourAction::NotifyHandler { peer_id, handler: NotifyHandler::One(relayed_connection_id), - attempt: attempt + 1, - }); - } else { - self.queued_actions.extend([ - NetworkBehaviourAction::NotifyHandler { - peer_id, - handler: NotifyHandler::One(relayed_connection_id), - event: Either::Left( - handler::relayed::Command::UpgradeFinishedDontKeepAlive, - ), - } - .into(), - NetworkBehaviourAction::GenerateEvent( - Event::DirectConnectionUpgradeFailed { - remote_peer_id: peer_id, - error: UpgradeError::Dial, - }, - ) - .into(), - ]); - } + event: Either::Left( + handler::relayed::Command::UpgradeFinishedDontKeepAlive, + ), + } + .into(), + NetworkBehaviourAction::GenerateEvent(Event::DirectConnectionUpgradeFailed { + remote_peer_id: peer_id, + error: UpgradeError::Dial, + }) + .into(), + ]); } - _ => {} } } @@ -324,7 +320,6 @@ impl NetworkBehaviour for Behaviour { /// A [`NetworkBehaviourAction`], either complete, or still requiring data from [`PollParameters`] /// before being returned in [`Behaviour::poll`]. -#[allow(clippy::large_enum_variant)] enum ActionBuilder { Done(NetworkBehaviourAction), Connect { @@ -333,7 +328,7 @@ enum ActionBuilder { peer_id: PeerId, }, AcceptInboundConnect { - inbound_connect: protocol::inbound::PendingConnect, + inbound_connect: Box, handler: NotifyHandler, peer_id: PeerId, }, diff --git a/protocols/dcutr/src/handler/relayed.rs b/protocols/dcutr/src/handler/relayed.rs index 9f9e2e01c13..e172b8f6993 100644 --- a/protocols/dcutr/src/handler/relayed.rs +++ b/protocols/dcutr/src/handler/relayed.rs @@ -44,7 +44,7 @@ pub enum Command { }, AcceptInboundConnect { obs_addrs: Vec, - inbound_connect: protocol::inbound::PendingConnect, + inbound_connect: Box, }, /// Upgrading the relayed connection to a direct connection either failed for good or succeeded. /// There is no need to keep the relayed connection alive for the sake of upgrading to a direct @@ -76,7 +76,7 @@ impl fmt::Debug for Command { pub enum Event { InboundConnectRequest { - inbound_connect: protocol::inbound::PendingConnect, + inbound_connect: Box, remote_addr: Multiaddr, }, InboundNegotiationFailed { @@ -201,7 +201,7 @@ impl ConnectionHandler for Handler { }; self.queued_events.push_back(ConnectionHandlerEvent::Custom( Event::InboundConnectRequest { - inbound_connect, + inbound_connect: Box::new(inbound_connect), remote_addr, }, )); @@ -245,9 +245,10 @@ impl ConnectionHandler for Handler { inbound_connect, obs_addrs, } => { - if let Some(_) = self + if self .inbound_connect .replace(inbound_connect.accept(obs_addrs).boxed()) + .is_some() { log::warn!( "New inbound connect stream while still upgrading previous one. \ @@ -337,8 +338,7 @@ impl ConnectionHandler for Handler { _ => { // Anything else is considered a fatal error or misbehaviour of // the remote peer and results in closing the connection. - self.pending_error = - Some(error.map_upgrade_err(|e| e.map_err(|e| EitherError::B(e)))); + self.pending_error = Some(error.map_upgrade_err(|e| e.map_err(EitherError::B))); } } } diff --git a/protocols/dcutr/src/lib.rs b/protocols/dcutr/src/lib.rs index 20ca846d99b..c55f22427f8 100644 --- a/protocols/dcutr/src/lib.rs +++ b/protocols/dcutr/src/lib.rs @@ -27,6 +27,7 @@ mod protocol; pub use protocol::{ inbound::UpgradeError as InboundUpgradeError, outbound::UpgradeError as OutboundUpgradeError, + PROTOCOL_NAME, }; mod message_proto { diff --git a/protocols/dcutr/src/protocol.rs b/protocols/dcutr/src/protocol.rs index d2b8b39a6d0..67f9af69f70 100644 --- a/protocols/dcutr/src/protocol.rs +++ b/protocols/dcutr/src/protocol.rs @@ -21,6 +21,6 @@ pub mod inbound; pub mod outbound; -const PROTOCOL_NAME: &[u8; 13] = b"/libp2p/dcutr"; +pub const PROTOCOL_NAME: &[u8; 13] = b"/libp2p/dcutr"; const MAX_MESSAGE_SIZE_BYTES: usize = 4096; diff --git a/protocols/floodsub/CHANGELOG.md b/protocols/floodsub/CHANGELOG.md index 491c87b99b5..89d4dd22b16 100644 --- a/protocols/floodsub/CHANGELOG.md +++ b/protocols/floodsub/CHANGELOG.md @@ -2,6 +2,8 @@ - Update to `libp2p-swarm` `v0.38.0`. +- Update to `libp2p-core` `v0.35.0`. + # 0.37.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/protocols/floodsub/Cargo.toml b/protocols/floodsub/Cargo.toml index 2556384f00a..d240ef66d07 100644 --- a/protocols/floodsub/Cargo.toml +++ b/protocols/floodsub/Cargo.toml @@ -14,7 +14,7 @@ categories = ["network-programming", "asynchronous"] cuckoofilter = "0.5.0" fnv = "1.0" futures = "0.3.1" -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } libp2p-swarm = { version = "0.38.0", path = "../../swarm" } log = "0.4" prost = "0.10" diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index 0d0a5fa333f..233d3873e98 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -2,6 +2,12 @@ - Update to `libp2p-swarm` `v0.38.0`. +- Update to `libp2p-core` `v0.35.0`. + +- Update to `prometheus-client` `v0.17.0`. See [PR 2761]. + +[PR 2761]: https://github.com/libp2p/rust-libp2p/pull/2761/ + # 0.39.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 5e787076647..da97500808d 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -12,7 +12,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] libp2p-swarm = { version = "0.38.0", path = "../../swarm" } -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } bytes = "1.0" byteorder = "1.3.4" fnv = "1.0.7" @@ -31,7 +31,7 @@ serde = { version = "1", optional = true, features = ["derive"] } wasm-timer = "0.2.5" instant = "0.1.11" # Metrics dependencies -prometheus-client = "0.16.0" +prometheus-client = "0.17.0" [dev-dependencies] async-std = "1.6.3" diff --git a/protocols/identify/CHANGELOG.md b/protocols/identify/CHANGELOG.md index 60c77cd032a..38acfad9b64 100644 --- a/protocols/identify/CHANGELOG.md +++ b/protocols/identify/CHANGELOG.md @@ -2,6 +2,12 @@ - Update to `libp2p-swarm` `v0.38.0`. +- Expose `PROTOCOL_NAME` and `PUSH_PROTOCOL_NAME`. See [PR 2734]. + +- Update to `libp2p-core` `v0.35.0`. + +[PR 2734]: https://github.com/libp2p/rust-libp2p/pull/2734/ + # 0.37.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/protocols/identify/Cargo.toml b/protocols/identify/Cargo.toml index 2fc604d1c25..1b3341df3f8 100644 --- a/protocols/identify/Cargo.toml +++ b/protocols/identify/Cargo.toml @@ -14,7 +14,7 @@ categories = ["network-programming", "asynchronous"] asynchronous-codec = "0.6" futures = "0.3.1" futures-timer = "3.0.2" -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } libp2p-swarm = { version = "0.38.0", path = "../../swarm" } log = "0.4.1" lru = "0.7.2" diff --git a/protocols/identify/src/lib.rs b/protocols/identify/src/lib.rs index f5de8f7a6ac..17925fb6eed 100644 --- a/protocols/identify/src/lib.rs +++ b/protocols/identify/src/lib.rs @@ -45,7 +45,7 @@ //! [`IdentifyInfo`]: self::IdentifyInfo pub use self::identify::{Identify, IdentifyConfig, IdentifyEvent}; -pub use self::protocol::{IdentifyInfo, UpgradeError}; +pub use self::protocol::{IdentifyInfo, UpgradeError, PROTOCOL_NAME, PUSH_PROTOCOL_NAME}; mod handler; mod identify; diff --git a/protocols/identify/src/protocol.rs b/protocols/identify/src/protocol.rs index 735fbcb342b..163ac0aa396 100644 --- a/protocols/identify/src/protocol.rs +++ b/protocols/identify/src/protocol.rs @@ -34,6 +34,10 @@ use void::Void; const MAX_MESSAGE_SIZE_BYTES: usize = 4096; +pub const PROTOCOL_NAME: &[u8; 14] = b"/ipfs/id/1.0.0"; + +pub const PUSH_PROTOCOL_NAME: &[u8; 19] = b"/ipfs/id/push/1.0.0"; + /// Substream upgrade protocol for `/ipfs/id/1.0.0`. #[derive(Debug, Clone)] pub struct IdentifyProtocol; @@ -104,7 +108,7 @@ impl UpgradeInfo for IdentifyProtocol { type InfoIter = iter::Once; fn protocol_info(&self) -> Self::InfoIter { - iter::once(b"/ipfs/id/1.0.0") + iter::once(PROTOCOL_NAME) } } @@ -136,7 +140,7 @@ impl UpgradeInfo for IdentifyPushProtocol { type InfoIter = iter::Once; fn protocol_info(&self) -> Self::InfoIter { - iter::once(b"/ipfs/id/push/1.0.0") + iter::once(PUSH_PROTOCOL_NAME) } } diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index 66730eecde5..6b78a6fcbce 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -2,6 +2,8 @@ - Update to `libp2p-swarm` `v0.38.0`. +- Update to `libp2p-core` `v0.35.0`. + # 0.38.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index 6686664f9b2..f33697f2e32 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -18,7 +18,7 @@ fnv = "1.0" asynchronous-codec = "0.6" futures = "0.3.1" log = "0.4" -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } libp2p-swarm = { version = "0.38.0", path = "../../swarm" } prost = "0.10" rand = "0.7.2" diff --git a/protocols/mdns/CHANGELOG.md b/protocols/mdns/CHANGELOG.md index 320f8b13a2e..e3b37be4c60 100644 --- a/protocols/mdns/CHANGELOG.md +++ b/protocols/mdns/CHANGELOG.md @@ -1,6 +1,9 @@ # 0.39.0 [unreleased] - Update to `libp2p-swarm` `v0.38.0`. +- Update to `if-watch` `v1.1.1`. + +- Update to `libp2p-core` `v0.35.0`. # 0.38.0 diff --git a/protocols/mdns/Cargo.toml b/protocols/mdns/Cargo.toml index f97c185030a..bb060c9ed1f 100644 --- a/protocols/mdns/Cargo.toml +++ b/protocols/mdns/Cargo.toml @@ -15,9 +15,9 @@ async-io = "1.3.1" data-encoding = "2.3.2" dns-parser = "0.8.0" futures = "0.3.13" -if-watch = "1.0.0" +if-watch = "1.1.1" lazy_static = "1.4.0" -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } libp2p-swarm = { version = "0.38.0", path = "../../swarm" } log = "0.4.14" rand = "0.8.3" diff --git a/protocols/ping/CHANGELOG.md b/protocols/ping/CHANGELOG.md index a31b17d02f5..e14934fe7a2 100644 --- a/protocols/ping/CHANGELOG.md +++ b/protocols/ping/CHANGELOG.md @@ -2,6 +2,12 @@ - Update to `libp2p-swarm` `v0.38.0`. +- Expose `PROTOCOL_NAME`. See [PR 2734]. + +- Update to `libp2p-core` `v0.35.0`. + +[PR 2734]: https://github.com/libp2p/rust-libp2p/pull/2734/ + # 0.37.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/protocols/ping/Cargo.toml b/protocols/ping/Cargo.toml index 741cbf05fbc..14984332db7 100644 --- a/protocols/ping/Cargo.toml +++ b/protocols/ping/Cargo.toml @@ -14,7 +14,7 @@ categories = ["network-programming", "asynchronous"] futures = "0.3.1" futures-timer = "3.0.2" instant = "0.1.11" -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } libp2p-swarm = { version = "0.38.0", path = "../../swarm" } log = "0.4.1" rand = "0.7.2" diff --git a/protocols/ping/src/lib.rs b/protocols/ping/src/lib.rs index 81133b86d74..2a01025ee6d 100644 --- a/protocols/ping/src/lib.rs +++ b/protocols/ping/src/lib.rs @@ -57,8 +57,8 @@ use std::{ note = "Use re-exports that omit `Ping` prefix, i.e. `libp2p::ping::Config` etc" )] pub use self::{ - Config as PingConfig, Event as PingEvent, Failure as PingFailure, Result as PingResult, - Success as PingSuccess, + protocol::PROTOCOL_NAME, Config as PingConfig, Event as PingEvent, Failure as PingFailure, + Result as PingResult, Success as PingSuccess, }; #[deprecated(since = "0.30.0", note = "Use libp2p::ping::Behaviour instead.")] pub use Behaviour as Ping; diff --git a/protocols/ping/src/protocol.rs b/protocols/ping/src/protocol.rs index 499c5ad4a0f..659040e2d7f 100644 --- a/protocols/ping/src/protocol.rs +++ b/protocols/ping/src/protocol.rs @@ -26,6 +26,8 @@ use rand::{distributions, prelude::*}; use std::{io, iter, time::Duration}; use void::Void; +pub const PROTOCOL_NAME: &[u8; 16] = b"/ipfs/ping/1.0.0"; + /// The `Ping` protocol upgrade. /// /// The ping protocol sends 32 bytes of random data in configurable @@ -55,7 +57,7 @@ impl UpgradeInfo for Ping { type InfoIter = iter::Once; fn protocol_info(&self) -> Self::InfoIter { - iter::once(b"/ipfs/ping/1.0.0") + iter::once(PROTOCOL_NAME) } } diff --git a/protocols/relay/CHANGELOG.md b/protocols/relay/CHANGELOG.md index cd615778196..85e066668bc 100644 --- a/protocols/relay/CHANGELOG.md +++ b/protocols/relay/CHANGELOG.md @@ -2,6 +2,12 @@ - Update to `libp2p-swarm` `v0.38.0`. +- Expose `HOP_PROTOCOL_NAME` and `STOP_PROTOCOL_NAME`. See [PR 2734]. + +- Update to `libp2p-core` `v0.35.0`. + +[PR 2734]: https://github.com/libp2p/rust-libp2p/pull/2734/ + # 0.10.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/protocols/relay/Cargo.toml b/protocols/relay/Cargo.toml index 97d29ebdfd0..817025842f8 100644 --- a/protocols/relay/Cargo.toml +++ b/protocols/relay/Cargo.toml @@ -17,7 +17,7 @@ either = "1.6.0" futures = "0.3.1" futures-timer = "3" instant = "0.1.11" -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } libp2p-swarm = { version = "0.38.0", path = "../../swarm" } log = "0.4" pin-project = "1" diff --git a/protocols/relay/src/v2.rs b/protocols/relay/src/v2.rs index 7219ab3d69c..c610c1a3b2c 100644 --- a/protocols/relay/src/v2.rs +++ b/protocols/relay/src/v2.rs @@ -34,7 +34,8 @@ pub use protocol::{ inbound_hop::FatalUpgradeError as InboundHopFatalUpgradeError, inbound_stop::FatalUpgradeError as InboundStopFatalUpgradeError, outbound_hop::FatalUpgradeError as OutboundHopFatalUpgradeError, - outbound_stop::FatalUpgradeError as OutboundStopFatalUpgradeError, + outbound_stop::FatalUpgradeError as OutboundStopFatalUpgradeError, HOP_PROTOCOL_NAME, + STOP_PROTOCOL_NAME, }; /// The ID of an outgoing / incoming, relay / destination request. diff --git a/protocols/relay/src/v2/protocol.rs b/protocols/relay/src/v2/protocol.rs index ab2dc487b6f..27f69994957 100644 --- a/protocols/relay/src/v2/protocol.rs +++ b/protocols/relay/src/v2/protocol.rs @@ -26,8 +26,8 @@ pub mod inbound_stop; pub mod outbound_hop; pub mod outbound_stop; -const HOP_PROTOCOL_NAME: &[u8; 31] = b"/libp2p/circuit/relay/0.2.0/hop"; -const STOP_PROTOCOL_NAME: &[u8; 32] = b"/libp2p/circuit/relay/0.2.0/stop"; +pub const HOP_PROTOCOL_NAME: &[u8; 31] = b"/libp2p/circuit/relay/0.2.0/hop"; +pub const STOP_PROTOCOL_NAME: &[u8; 32] = b"/libp2p/circuit/relay/0.2.0/stop"; const MAX_MESSAGE_SIZE: usize = 4096; diff --git a/protocols/rendezvous/CHANGELOG.md b/protocols/rendezvous/CHANGELOG.md index 11120e0be1b..c087f77b393 100644 --- a/protocols/rendezvous/CHANGELOG.md +++ b/protocols/rendezvous/CHANGELOG.md @@ -2,6 +2,8 @@ - Update to `libp2p-swarm` `v0.38.0`. +- Update to `libp2p-core` `v0.35.0`. + # 0.7.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/protocols/rendezvous/Cargo.toml b/protocols/rendezvous/Cargo.toml index b8d81a87e20..745285b91bd 100644 --- a/protocols/rendezvous/Cargo.toml +++ b/protocols/rendezvous/Cargo.toml @@ -12,7 +12,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] asynchronous-codec = "0.6" -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } libp2p-swarm = { version = "0.38.0", path = "../../swarm" } prost = "0.10" void = "1" diff --git a/protocols/request-response/CHANGELOG.md b/protocols/request-response/CHANGELOG.md index 8acc422ee40..bf556496fc1 100644 --- a/protocols/request-response/CHANGELOG.md +++ b/protocols/request-response/CHANGELOG.md @@ -2,6 +2,8 @@ - Update to `libp2p-swarm` `v0.38.0`. +- Update to `libp2p-core` `v0.35.0`. + # 0.19.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index 010647fe6b3..91f95484f73 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -15,7 +15,7 @@ async-trait = "0.1" bytes = "1" futures = "0.3.1" instant = "0.1.11" -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } libp2p-swarm = { version = "0.38.0", path = "../../swarm" } log = "0.4.11" rand = "0.7" diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 174924889b0..02edf9beef4 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -2,6 +2,8 @@ - Update dial address concurrency factor to `8`, thus dialing up to 8 addresses concurrently for a single connection attempt. See `Swarm::dial_concurrency_factor` and [PR 2741]. +- Update to `libp2p-core` `v0.35.0`. + [PR 2741]: https://github.com/libp2p/rust-libp2p/pull/2741/ # 0.37.0 diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index 2953c984368..e2829f80093 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -16,7 +16,7 @@ fnv = "1.0" futures = "0.3.1" futures-timer = "3.0.2" instant = "0.1.11" -libp2p-core = { version = "0.34.0", path = "../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../core", default-features = false } log = "0.4" pin-project = "1.0.0" rand = "0.7" diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 733016dceb0..8d29ca53793 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -20,7 +20,6 @@ mod error; mod handler_wrapper; -mod substream; pub(crate) mod pool; @@ -30,18 +29,19 @@ pub use error::{ }; pub use pool::{ConnectionCounters, ConnectionLimits}; pub use pool::{EstablishedConnection, PendingConnection}; -pub use substream::{Close, SubstreamEndpoint}; use crate::handler::ConnectionHandler; use crate::IntoConnectionHandler; +use futures::future::poll_fn; use handler_wrapper::HandlerWrapper; use libp2p_core::connection::ConnectedPoint; use libp2p_core::multiaddr::Multiaddr; use libp2p_core::muxing::StreamMuxerBox; -use libp2p_core::upgrade; use libp2p_core::PeerId; -use std::{error::Error, fmt, pin::Pin, task::Context, task::Poll}; -use substream::{Muxing, SubstreamEvent}; +use libp2p_core::{upgrade, StreamMuxer}; +use std::collections::VecDeque; +use std::future::Future; +use std::{error::Error, fmt, io, pin::Pin, task::Context, task::Poll}; /// Information about a successfully established connection. #[derive(Debug, Clone, PartialEq, Eq)] @@ -52,6 +52,13 @@ pub struct Connected { pub peer_id: PeerId, } +/// Endpoint for a received substream. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum SubstreamEndpoint { + Dialer(TDialInfo), + Listener, +} + /// Event generated by a [`Connection`]. #[derive(Debug, Clone)] pub enum Event { @@ -67,19 +74,22 @@ where THandler: ConnectionHandler, { /// Node that handles the muxing. - muxing: substream::Muxing>, + muxing: StreamMuxerBox, /// Handler that processes substreams. handler: HandlerWrapper, + /// List of "open_info" that is waiting for new outbound substreams. + open_info: VecDeque>, } impl fmt::Debug for Connection where THandler: ConnectionHandler + fmt::Debug, + THandler::OutboundOpenInfo: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Connection") - .field("muxing", &self.muxing) .field("handler", &self.handler) + .field("open_info", &self.open_info) .finish() } } @@ -108,8 +118,9 @@ where max_negotiating_inbound_streams, ); Connection { - muxing: Muxing::new(muxer), + muxing: muxer, handler: wrapped_handler, + open_info: VecDeque::with_capacity(8), } } @@ -120,10 +131,10 @@ where /// Begins an orderly shutdown of the connection, returning the connection /// handler and a `Future` that resolves when connection shutdown is complete. - pub fn close(self) -> (THandler, Close) { + pub fn close(self) -> (THandler, impl Future>) { ( self.handler.into_connection_handler(), - self.muxing.close().0, + poll_fn(move |cx| self.muxing.poll_close(cx)), ) } @@ -138,38 +149,38 @@ where match self.handler.poll(cx)? { Poll::Pending => {} Poll::Ready(handler_wrapper::Event::OutboundSubstreamRequest(user_data)) => { - self.muxing.open_substream(user_data); - continue; + self.open_info.push_back(user_data); + continue; // Poll handler until exhausted. } Poll::Ready(handler_wrapper::Event::Custom(event)) => { return Poll::Ready(Ok(Event::Handler(event))); } } - // Perform I/O on the connection through the muxer, informing the handler - // of new substreams. - match self.muxing.poll(cx)? { - Poll::Pending => {} - Poll::Ready(SubstreamEvent::InboundSubstream { substream }) => { - self.handler - .inject_substream(substream, SubstreamEndpoint::Listener); - continue; - } - Poll::Ready(SubstreamEvent::OutboundSubstream { - user_data, - substream, - }) => { + if !self.open_info.is_empty() { + if let Poll::Ready(substream) = self.muxing.poll_outbound(cx)? { + let user_data = self + .open_info + .pop_front() + .expect("`open_info` is not empty"); let endpoint = SubstreamEndpoint::Dialer(user_data); self.handler.inject_substream(substream, endpoint); - continue; - } - Poll::Ready(SubstreamEvent::AddressChange(address)) => { - self.handler.inject_address_change(&address); - return Poll::Ready(Ok(Event::AddressChange(address))); + continue; // Go back to the top, handler can potentially make progress again. } } - return Poll::Pending; + if let Poll::Ready(substream) = self.muxing.poll_inbound(cx)? { + self.handler + .inject_substream(substream, SubstreamEndpoint::Listener); + continue; // Go back to the top, handler can potentially make progress again. + } + + if let Poll::Ready(address) = self.muxing.poll_address_change(cx)? { + self.handler.inject_address_change(&address); + return Poll::Ready(Ok(Event::AddressChange(address))); + } + + return Poll::Pending; // Nothing can make progress, return `Pending`. } } } diff --git a/swarm/src/connection/substream.rs b/swarm/src/connection/substream.rs deleted file mode 100644 index 47d5d315b20..00000000000 --- a/swarm/src/connection/substream.rs +++ /dev/null @@ -1,252 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use futures::prelude::*; -use libp2p_core::multiaddr::Multiaddr; -use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; -use smallvec::SmallVec; -use std::sync::Arc; -use std::{fmt, pin::Pin, task::Context, task::Poll}; - -/// Endpoint for a received substream. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum SubstreamEndpoint { - Dialer(TDialInfo), - Listener, -} - -/// Implementation of `Stream` that handles substream multiplexing. -/// -/// The stream will receive substreams and can be used to open new outgoing substreams. Destroying -/// the `Muxing` will **not** close the existing substreams. -/// -/// The stream will close once both the inbound and outbound channels are closed, and no more -/// outbound substream attempt is pending. -pub struct Muxing -where - TMuxer: StreamMuxer, -{ - /// The muxer used to manage substreams. - inner: Arc, - /// List of substreams we are currently opening. - outbound_substreams: SmallVec<[(TUserData, TMuxer::OutboundSubstream); 8]>, -} - -/// Future that signals the remote that we have closed the connection. -pub struct Close { - /// Muxer to close. - muxer: Arc, -} - -/// Event that can happen on the `Muxing`. -pub enum SubstreamEvent -where - TMuxer: StreamMuxer, -{ - /// A new inbound substream arrived. - InboundSubstream { - /// The newly-opened substream. Will return EOF of an error if the `Muxing` is - /// destroyed or `close_graceful` is called. - substream: TMuxer::Substream, - }, - - /// An outbound substream has successfully been opened. - OutboundSubstream { - /// User data that has been passed to the `open_substream` method. - user_data: TUserData, - /// The newly-opened substream. Will return EOF of an error if the `Muxing` is - /// destroyed or `close_graceful` is called. - substream: TMuxer::Substream, - }, - - /// Address to the remote has changed. The previous one is now obsolete. - /// - /// > **Note**: This can for example happen when using the QUIC protocol, where the two nodes - /// > can change their IP address while retaining the same QUIC connection. - AddressChange(Multiaddr), -} - -/// Identifier for a substream being opened. -#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] -pub struct OutboundSubstreamId(usize); - -impl Muxing -where - TMuxer: StreamMuxer, -{ - /// Creates a new node events stream. - pub fn new(muxer: TMuxer) -> Self { - Muxing { - inner: Arc::new(muxer), - outbound_substreams: SmallVec::new(), - } - } - - /// Starts the process of opening a new outbound substream. - /// - /// After calling this method, polling the stream should eventually produce either an - /// `OutboundSubstream` event or an `OutboundClosed` event containing the user data that has - /// been passed to this method. - pub fn open_substream(&mut self, user_data: TUserData) { - let raw = self.inner.open_outbound(); - self.outbound_substreams.push((user_data, raw)); - } - - /// Destroys the node stream and returns all the pending outbound substreams, plus an object - /// that signals the remote that we shut down the connection. - #[must_use] - pub fn close(mut self) -> (Close, Vec) { - let substreams = self.cancel_outgoing(); - let close = Close { - muxer: self.inner.clone(), - }; - (close, substreams) - } - - /// Destroys all outbound streams and returns the corresponding user data. - pub fn cancel_outgoing(&mut self) -> Vec { - let mut out = Vec::with_capacity(self.outbound_substreams.len()); - for (user_data, outbound) in self.outbound_substreams.drain(..) { - out.push(user_data); - self.inner.destroy_outbound(outbound); - } - out - } - - /// Provides an API similar to `Future`. - pub fn poll( - &mut self, - cx: &mut Context<'_>, - ) -> Poll, TMuxer::Error>> { - // Polling inbound substream. - match self.inner.poll_event(cx) { - Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(substream))) => { - return Poll::Ready(Ok(SubstreamEvent::InboundSubstream { substream })); - } - Poll::Ready(Ok(StreamMuxerEvent::AddressChange(addr))) => { - return Poll::Ready(Ok(SubstreamEvent::AddressChange(addr))) - } - Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), - Poll::Pending => {} - } - - // Polling outbound substreams. - // We remove each element from `outbound_substreams` one by one and add them back. - for n in (0..self.outbound_substreams.len()).rev() { - let (user_data, mut outbound) = self.outbound_substreams.swap_remove(n); - match self.inner.poll_outbound(cx, &mut outbound) { - Poll::Ready(Ok(substream)) => { - self.inner.destroy_outbound(outbound); - return Poll::Ready(Ok(SubstreamEvent::OutboundSubstream { - user_data, - substream, - })); - } - Poll::Pending => { - self.outbound_substreams.push((user_data, outbound)); - } - Poll::Ready(Err(err)) => { - self.inner.destroy_outbound(outbound); - return Poll::Ready(Err(err)); - } - } - } - - // Nothing happened. Register our task to be notified and return. - Poll::Pending - } -} - -impl fmt::Debug for Muxing -where - TMuxer: StreamMuxer, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - f.debug_struct("Muxing") - .field("outbound_substreams", &self.outbound_substreams.len()) - .finish() - } -} - -impl Drop for Muxing -where - TMuxer: StreamMuxer, -{ - fn drop(&mut self) { - // The substreams that were produced will continue to work, as the muxer is held in an Arc. - // However we will no longer process any further inbound or outbound substream, and we - // therefore close everything. - for (_, outbound) in self.outbound_substreams.drain(..) { - self.inner.destroy_outbound(outbound); - } - } -} - -impl Future for Close -where - TMuxer: StreamMuxer, -{ - type Output = Result<(), TMuxer::Error>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.muxer.poll_close(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(Ok(())) => Poll::Ready(Ok(())), - Poll::Ready(Err(err)) => Poll::Ready(Err(err)), - } - } -} - -impl fmt::Debug for Close -where - TMuxer: StreamMuxer, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - f.debug_struct("Close").finish() - } -} - -impl fmt::Debug for SubstreamEvent -where - TMuxer: StreamMuxer, - TMuxer::Substream: fmt::Debug, - TUserData: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - SubstreamEvent::InboundSubstream { substream } => f - .debug_struct("SubstreamEvent::OutboundClosed") - .field("substream", substream) - .finish(), - SubstreamEvent::OutboundSubstream { - user_data, - substream, - } => f - .debug_struct("SubstreamEvent::OutboundSubstream") - .field("user_data", user_data) - .field("substream", substream) - .finish(), - SubstreamEvent::AddressChange(address) => f - .debug_struct("SubstreamEvent::AddressChange") - .field("address", address) - .finish(), - } - } -} diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs index 2a301b70889..7060d33bc6a 100644 --- a/swarm/src/handler.rs +++ b/swarm/src/handler.rs @@ -116,6 +116,12 @@ pub trait ConnectionHandler: Send + 'static { fn listen_protocol(&self) -> SubstreamProtocol; /// Injects the output of a successful upgrade on a new inbound substream. + /// + /// Note that it is up to the [`ConnectionHandler`] implementation to manage the lifetime of the + /// negotiated inbound substreams. E.g. the implementation has to enforce a limit on the number + /// of simultaneously open negotiated inbound substreams. In other words it is up to the + /// [`ConnectionHandler`] implementation to stop a malicious remote node to open and keep alive + /// an excessive amount of inbound substreams. fn inject_fully_negotiated_inbound( &mut self, protocol: ::Output, diff --git a/transports/deflate/CHANGELOG.md b/transports/deflate/CHANGELOG.md index ebc2d811375..317a9ab78d3 100644 --- a/transports/deflate/CHANGELOG.md +++ b/transports/deflate/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.35.0 [unreleased] + +- Update to `libp2p-core` `v0.35.0`. + # 0.34.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/transports/deflate/Cargo.toml b/transports/deflate/Cargo.toml index 82536d8acc3..4ac8661d0a3 100644 --- a/transports/deflate/Cargo.toml +++ b/transports/deflate/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-deflate" edition = "2021" rust-version = "1.56.1" description = "Deflate encryption protocol for libp2p" -version = "0.34.0" +version = "0.35.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -12,7 +12,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } flate2 = "1.0" [dev-dependencies] diff --git a/transports/dns/CHANGELOG.md b/transports/dns/CHANGELOG.md index a32d5a95eb5..c9b9083cb3a 100644 --- a/transports/dns/CHANGELOG.md +++ b/transports/dns/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.35.0 [unreleased] + +- Update to `libp2p-core` `v0.35.0`. + # 0.34.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/transports/dns/Cargo.toml b/transports/dns/Cargo.toml index 1aaa7a15302..46ca3aca1af 100644 --- a/transports/dns/Cargo.toml +++ b/transports/dns/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-dns" edition = "2021" rust-version = "1.56.1" description = "DNS transport implementation for libp2p" -version = "0.34.0" +version = "0.35.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -11,7 +11,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } log = "0.4.1" futures = "0.3.1" async-std-resolver = { version = "0.21", optional = true } diff --git a/transports/noise/CHANGELOG.md b/transports/noise/CHANGELOG.md index ca830796b59..6b323eabd27 100644 --- a/transports/noise/CHANGELOG.md +++ b/transports/noise/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.38.0 [unreleased] + +- Update to `libp2p-core` `v0.35.0`. + # 0.37.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/transports/noise/Cargo.toml b/transports/noise/Cargo.toml index f0b8ef8996e..171a89a4715 100644 --- a/transports/noise/Cargo.toml +++ b/transports/noise/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-noise" edition = "2021" rust-version = "1.56.1" description = "Cryptographic handshake protocol using the noise framework." -version = "0.37.0" +version = "0.38.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -13,7 +13,7 @@ bytes = "1" curve25519-dalek = "3.0.0" futures = "0.3.1" lazy_static = "1.2" -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } log = "0.4" prost = "0.10" rand = "0.8.3" diff --git a/transports/plaintext/CHANGELOG.md b/transports/plaintext/CHANGELOG.md index 560075bc0a2..7c5c389a0cd 100644 --- a/transports/plaintext/CHANGELOG.md +++ b/transports/plaintext/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.35.0 [unreleased] + +- Update to `libp2p-core` `v0.35.0`. + # 0.34.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/transports/plaintext/Cargo.toml b/transports/plaintext/Cargo.toml index e5534f93c1e..c8cd9395da0 100644 --- a/transports/plaintext/Cargo.toml +++ b/transports/plaintext/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-plaintext" edition = "2021" rust-version = "1.56.1" description = "Plaintext encryption dummy protocol for libp2p" -version = "0.34.0" +version = "0.35.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -14,7 +14,7 @@ categories = ["network-programming", "asynchronous"] bytes = "1" futures = "0.3.1" asynchronous-codec = "0.6" -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } log = "0.4.8" prost = "0.10" unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] } diff --git a/transports/quic/Cargo.toml b/transports/quic/Cargo.toml index 9425209e68b..6ef5a3f019b 100644 --- a/transports/quic/Cargo.toml +++ b/transports/quic/Cargo.toml @@ -12,7 +12,7 @@ async-global-executor = "2.0.2" async-io = "1.6.0" futures = "0.3.15" if-watch = "1.0.0" -libp2p-core = { version = "0.34.0", path = "../../core" } +libp2p-core = { version = "0.35.0", path = "../../core" } parking_lot = "0.12.0" quinn-proto = { version = "0.8.2", default-features = false, features = ["tls-rustls"] } rand = "0.8.5" diff --git a/transports/tcp/CHANGELOG.md b/transports/tcp/CHANGELOG.md index 4a9d0245b2b..bf7a5e0daba 100644 --- a/transports/tcp/CHANGELOG.md +++ b/transports/tcp/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.35.0 [unreleased] + +- Update to `libp2p-core` `v0.35.0`. + +- Update to `if-watch` `v1.1.1`. + # 0.34.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/transports/tcp/Cargo.toml b/transports/tcp/Cargo.toml index e72f3eaee56..7273db58c51 100644 --- a/transports/tcp/Cargo.toml +++ b/transports/tcp/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-tcp" edition = "2021" rust-version = "1.56.1" description = "TCP/IP transport protocol for libp2p" -version = "0.34.0" +version = "0.35.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -14,11 +14,11 @@ categories = ["network-programming", "asynchronous"] async-io-crate = { package = "async-io", version = "1.2.0", optional = true } futures = "0.3.8" futures-timer = "3.0" -if-watch = { version = "1.0.0", optional = true } +if-watch = { version = "1.1.1", optional = true } if-addrs = { version = "0.7.0", optional = true } ipnet = "2.0.0" libc = "0.2.80" -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } log = "0.4.11" socket2 = { version = "0.4.0", features = ["all"] } tokio-crate = { package = "tokio", version = "1.19.0", default-features = false, features = ["net"], optional = true } diff --git a/transports/tcp/src/provider/tokio.rs b/transports/tcp/src/provider/tokio.rs index fa9ebe3b3ff..564eebfa48b 100644 --- a/transports/tcp/src/provider/tokio.rs +++ b/transports/tcp/src/provider/tokio.rs @@ -155,9 +155,9 @@ impl Provider for Tcp { #[derive(Debug)] pub struct TcpStream(pub tokio_crate::net::TcpStream); -impl Into for TcpStream { - fn into(self: TcpStream) -> tokio_crate::net::TcpStream { - self.0 +impl From for tokio_crate::net::TcpStream { + fn from(t: TcpStream) -> tokio_crate::net::TcpStream { + t.0 } } diff --git a/transports/uds/CHANGELOG.md b/transports/uds/CHANGELOG.md index 65c5da0559a..c27c12b8c3e 100644 --- a/transports/uds/CHANGELOG.md +++ b/transports/uds/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.34.0 [unreleased] + +- Update to `libp2p-core` `v0.35.0`. + # 0.33.0 - Update dependencies. diff --git a/transports/uds/Cargo.toml b/transports/uds/Cargo.toml index 30d01c4f490..e00e6ae09f3 100644 --- a/transports/uds/Cargo.toml +++ b/transports/uds/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-uds" edition = "2021" rust-version = "1.56.1" description = "Unix domain sockets transport for libp2p" -version = "0.33.0" +version = "0.34.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -12,7 +12,7 @@ categories = ["network-programming", "asynchronous"] [target.'cfg(all(unix, not(target_os = "emscripten")))'.dependencies] async-std = { version = "1.6.2", optional = true } -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } log = "0.4.1" futures = "0.3.1" tokio = { version = "1.15", default-features = false, features = ["net"], optional = true } diff --git a/transports/wasm-ext/CHANGELOG.md b/transports/wasm-ext/CHANGELOG.md index 65ff72d10bf..c3438b68b20 100644 --- a/transports/wasm-ext/CHANGELOG.md +++ b/transports/wasm-ext/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.35.0 [unreleased] + +- Update to `libp2p-core` `v0.35.0`. + # 0.34.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/transports/wasm-ext/Cargo.toml b/transports/wasm-ext/Cargo.toml index a0f67226513..34926d04b52 100644 --- a/transports/wasm-ext/Cargo.toml +++ b/transports/wasm-ext/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-wasm-ext" edition = "2021" rust-version = "1.56.1" description = "Allows passing in an external transport in a WASM environment" -version = "0.34.0" +version = "0.35.0" authors = ["Pierre Krieger "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -13,7 +13,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" js-sys = "0.3.50" -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } parity-send-wrapper = "0.1.0" wasm-bindgen = "0.2.42" wasm-bindgen-futures = "0.4.4" diff --git a/transports/websocket/CHANGELOG.md b/transports/websocket/CHANGELOG.md index 3783fef6c44..1a46978cd3b 100644 --- a/transports/websocket/CHANGELOG.md +++ b/transports/websocket/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.37.0 [unreleased] + +- Update to `libp2p-core` `v0.35.0`. + # 0.36.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/transports/websocket/Cargo.toml b/transports/websocket/Cargo.toml index 624fc0cbe4f..49121c4f22b 100644 --- a/transports/websocket/Cargo.toml +++ b/transports/websocket/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-websocket" edition = "2021" rust-version = "1.56.1" description = "WebSocket transport for libp2p" -version = "0.36.0" +version = "0.37.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -14,7 +14,7 @@ categories = ["network-programming", "asynchronous"] futures-rustls = "0.22" either = "1.5.3" futures = "0.3.1" -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } log = "0.4.8" parking_lot = "0.12.0" quicksink = "0.1"