Skip to content

Commit

Permalink
Merge branch 'master' of github.com:libp2p/rust-libp2p into quic/mult…
Browse files Browse the repository at this point in the history
…iple-endpoints-2
  • Loading branch information
elenaf9 committed Jul 31, 2022
2 parents bf06d96 + eaf3f3a commit 7019d49
Show file tree
Hide file tree
Showing 89 changed files with 641 additions and 915 deletions.
2 changes: 1 addition & 1 deletion .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[alias]
# Temporary solution to have clippy config in a single place until https://github.com/rust-lang/rust-clippy/blob/master/doc/roadmap-2021.md#lintstoml-configuration is shipped.
custom-clippy = "clippy -- -A clippy::type_complexity -A clippy::pedantic -D warnings"
custom-clippy = "clippy --all-features -- -A clippy::type_complexity -A clippy::pedantic -D warnings"
10 changes: 5 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:

- uses: actions/checkout@v3

- uses: Swatinem/rust-cache@cb2cf0cc7c5198d3364b9630e2c3d457f160790c # v1.4.0
- uses: Swatinem/rust-cache@6720f05bc48b77f96918929a9019fb2203ff71f8 # v2.0.0
with:
key: ${{ matrix.args }}

Expand Down Expand Up @@ -72,7 +72,7 @@ jobs:
- name: Install CMake
run: sudo apt-get install -y cmake

- uses: Swatinem/rust-cache@cb2cf0cc7c5198d3364b9630e2c3d457f160790c # v1.4.0
- uses: Swatinem/rust-cache@6720f05bc48b77f96918929a9019fb2203ff71f8 # v2.0.0
with:
key: ${{ matrix.toolchain }}

Expand All @@ -99,7 +99,7 @@ jobs:
toolchain: stable
override: true

- uses: Swatinem/rust-cache@cb2cf0cc7c5198d3364b9630e2c3d457f160790c # v1.4.0
- uses: Swatinem/rust-cache@6720f05bc48b77f96918929a9019fb2203ff71f8 # v2.0.0

- name: Check rustdoc links
run: RUSTDOCFLAGS="--deny broken_intra_doc_links" cargo doc --verbose --workspace --no-deps --document-private-items --all-features
Expand All @@ -122,7 +122,7 @@ jobs:
override: true
components: clippy

- uses: Swatinem/rust-cache@cb2cf0cc7c5198d3364b9630e2c3d457f160790c # v1.4.0
- uses: Swatinem/rust-cache@6720f05bc48b77f96918929a9019fb2203ff71f8 # v2.0.0

- name: Run cargo clippy
uses: actions-rs/cargo@844f36862e911db73fe0815f00a4a2602c279505 # v1.0.3
Expand All @@ -147,7 +147,7 @@ jobs:
toolchain: stable
override: true

- uses: Swatinem/rust-cache@cb2cf0cc7c5198d3364b9630e2c3d457f160790c # v1.4.0
- uses: Swatinem/rust-cache@6720f05bc48b77f96918929a9019fb2203ff71f8 # v2.0.0

- name: Run ipfs-kad example
run: RUST_LOG=libp2p_swarm=debug,libp2p_kad=trace,libp2p_tcp=debug cargo run --example ipfs-kad
Expand Down
22 changes: 11 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,38 +80,38 @@ instant = "0.1.11" # Explicit dependency to be used in `wasm-bindgen` feature
lazy_static = "1.2"

libp2p-autonat = { version = "0.6.0", path = "protocols/autonat", optional = true }
libp2p-core = { version = "0.34.0", path = "core", default-features = false }
libp2p-core = { version = "0.35.0", path = "core", default-features = false }
libp2p-dcutr = { version = "0.5.0", path = "protocols/dcutr", optional = true }
libp2p-floodsub = { version = "0.38.0", path = "protocols/floodsub", optional = true }
libp2p-identify = { version = "0.38.0", path = "protocols/identify", optional = true }
libp2p-kad = { version = "0.39.0", path = "protocols/kad", optional = true }
libp2p-metrics = { version = "0.8.0", path = "misc/metrics", optional = true }
libp2p-mplex = { version = "0.34.0", path = "muxers/mplex", optional = true }
libp2p-noise = { version = "0.37.0", path = "transports/noise", optional = true }
libp2p-mplex = { version = "0.35.0", path = "muxers/mplex", optional = true }
libp2p-noise = { version = "0.38.0", path = "transports/noise", optional = true }
libp2p-ping = { version = "0.38.0", path = "protocols/ping", optional = true }
libp2p-plaintext = { version = "0.34.0", path = "transports/plaintext", optional = true }
libp2p-plaintext = { version = "0.35.0", path = "transports/plaintext", optional = true }
libp2p-pnet = { version = "0.22.0", path = "transports/pnet", optional = true }
libp2p-relay = { version = "0.11.0", path = "protocols/relay", optional = true }
libp2p-rendezvous = { version = "0.8.0", path = "protocols/rendezvous", optional = true }
libp2p-request-response = { version = "0.20.0", path = "protocols/request-response", optional = true }
libp2p-swarm = { version = "0.38.0", path = "swarm" }
libp2p-swarm-derive = { version = "0.28.0", path = "swarm-derive" }
libp2p-uds = { version = "0.33.0", path = "transports/uds", optional = true }
libp2p-wasm-ext = { version = "0.34.0", path = "transports/wasm-ext", default-features = false, optional = true }
libp2p-yamux = { version = "0.38.0", path = "muxers/yamux", optional = true }
libp2p-uds = { version = "0.34.0", path = "transports/uds", optional = true }
libp2p-wasm-ext = { version = "0.35.0", path = "transports/wasm-ext", default-features = false, optional = true }
libp2p-yamux = { version = "0.39.0", path = "muxers/yamux", optional = true }
multiaddr = { version = "0.14.0" }
parking_lot = "0.12.0"
pin-project = "1.0.0"
rand = "0.7.3" # Explicit dependency to be used in `wasm-bindgen` feature
smallvec = "1.6.1"

[target.'cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))'.dependencies]
libp2p-deflate = { version = "0.34.0", path = "transports/deflate", optional = true }
libp2p-dns = { version = "0.34.0", path = "transports/dns", optional = true, default-features = false }
libp2p-deflate = { version = "0.35.0", path = "transports/deflate", optional = true }
libp2p-dns = { version = "0.35.0", path = "transports/dns", optional = true, default-features = false }
libp2p-mdns = { version = "0.39.0", path = "protocols/mdns", optional = true }
libp2p-quic = { version = "0.7.0", path = "transports/quic", optional = true }
libp2p-tcp = { version = "0.34.0", path = "transports/tcp", default-features = false, optional = true }
libp2p-websocket = { version = "0.36.0", path = "transports/websocket", optional = true }
libp2p-tcp = { version = "0.35.0", path = "transports/tcp", default-features = false, optional = true }
libp2p-websocket = { version = "0.37.0", path = "transports/websocket", optional = true }

[target.'cfg(not(target_os = "unknown"))'.dependencies]
libp2p-gossipsub = { version = "0.40.0", path = "protocols/gossipsub", optional = true }
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

<a href="http://libp2p.io/"><img src="https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square" /></a>
[![dependency status](https://deps.rs/repo/github/libp2p/rust-libp2p/status.svg?style=flat-square)](https://deps.rs/repo/github/libp2p/rust-libp2p)
[![Crates.io](https://img.shields.io/crates/v/libp2p.svg)](https://crates.io/crates/libp2p)
[![docs.rs](https://img.shields.io/badge/api-rustdoc-blue.svg)](https://docs.rs/libp2p)

This repository is the central place for Rust development of the [libp2p](https://libp2p.io) spec.

Expand Down
13 changes: 12 additions & 1 deletion core/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
# 0.35.0 [unreleased]

- Remove `StreamMuxer::poll_event` in favor of individual functions: `poll_inbound`, `poll_outbound`
and `poll_address_change`. Consequently, `StreamMuxerEvent` is also removed. See [PR 2724].
- Drop `Unpin` requirement from `SubstreamBox`. See [PR 2762] and [PR 2776].
- Drop `Sync` requirement on `StreamMuxer` for constructing `StreamMuxerBox`. See [PR 2775].

[PR 2724]: https://github.com/libp2p/rust-libp2p/pull/2724
[PR 2762]: https://github.com/libp2p/rust-libp2p/pull/2762
[PR 2775]: https://github.com/libp2p/rust-libp2p/pull/2775
[PR 2776]: https://github.com/libp2p/rust-libp2p/pull/2776

# 0.34.0

- Introduce `StreamMuxerEvent::map_inbound_stream`. See [PR 2691].
- Remove `{read,write,flush,shutdown,destroy}_substream` functions from `StreamMuxer` trait
in favor of forcing `StreamMuxer::Substream` to implement `AsyncRead + AsyncWrite`. See [PR 2707].
- Replace `Into<std::io::Error>` bound on `StreamMuxer::Error` with `std::error::Error`. See [PR 2710].
Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-core"
edition = "2021"
rust-version = "1.56.1"
description = "Core traits and structs of libp2p"
version = "0.34.0"
version = "0.35.0"
authors = ["Parity Technologies <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
65 changes: 18 additions & 47 deletions core/src/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.

use crate::{
muxing::{StreamMuxer, StreamMuxerEvent},
muxing::StreamMuxer,
transport::{ListenerId, Transport, TransportError, TransportEvent},
Multiaddr, ProtocolName,
};
Expand Down Expand Up @@ -202,60 +202,38 @@ where
B: StreamMuxer,
{
type Substream = EitherOutput<A::Substream, B::Substream>;
type OutboundSubstream = EitherOutbound<A, B>;
type Error = EitherError<A::Error, B::Error>;

fn poll_event(
&self,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent<Self::Substream>, Self::Error>> {
fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>> {
match self {
EitherOutput::First(inner) => inner
.poll_event(cx)
.map_err(EitherError::A)
.map_ok(|event| event.map_inbound_stream(EitherOutput::First)),
.poll_inbound(cx)
.map_ok(EitherOutput::First)
.map_err(EitherError::A),
EitherOutput::Second(inner) => inner
.poll_event(cx)
.map_err(EitherError::B)
.map_ok(|event| event.map_inbound_stream(EitherOutput::Second)),
.poll_inbound(cx)
.map_ok(EitherOutput::Second)
.map_err(EitherError::B),
}
}

fn open_outbound(&self) -> Self::OutboundSubstream {
fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>> {
match self {
EitherOutput::First(inner) => EitherOutbound::A(inner.open_outbound()),
EitherOutput::Second(inner) => EitherOutbound::B(inner.open_outbound()),
}
}

fn poll_outbound(
&self,
cx: &mut Context<'_>,
substream: &mut Self::OutboundSubstream,
) -> Poll<Result<Self::Substream, Self::Error>> {
match (self, substream) {
(EitherOutput::First(ref inner), EitherOutbound::A(ref mut substream)) => inner
.poll_outbound(cx, substream)
.map(|p| p.map(EitherOutput::First))
EitherOutput::First(inner) => inner
.poll_outbound(cx)
.map_ok(EitherOutput::First)
.map_err(EitherError::A),
(EitherOutput::Second(ref inner), EitherOutbound::B(ref mut substream)) => inner
.poll_outbound(cx, substream)
.map(|p| p.map(EitherOutput::Second))
EitherOutput::Second(inner) => inner
.poll_outbound(cx)
.map_ok(EitherOutput::Second)
.map_err(EitherError::B),
_ => panic!("Wrong API usage"),
}
}

fn destroy_outbound(&self, substream: Self::OutboundSubstream) {
fn poll_address_change(&self, cx: &mut Context<'_>) -> Poll<Result<Multiaddr, Self::Error>> {
match self {
EitherOutput::First(inner) => match substream {
EitherOutbound::A(substream) => inner.destroy_outbound(substream),
_ => panic!("Wrong API usage"),
},
EitherOutput::Second(inner) => match substream {
EitherOutbound::B(substream) => inner.destroy_outbound(substream),
_ => panic!("Wrong API usage"),
},
EitherOutput::First(inner) => inner.poll_address_change(cx).map_err(EitherError::A),
EitherOutput::Second(inner) => inner.poll_address_change(cx).map_err(EitherError::B),
}
}

Expand All @@ -267,13 +245,6 @@ where
}
}

#[derive(Debug, Copy, Clone)]
#[must_use = "futures do nothing unless polled"]
pub enum EitherOutbound<A: StreamMuxer, B: StreamMuxer> {
A(A::OutboundSubstream),
B(B::OutboundSubstream),
}

/// Implements `Future` and dispatches all method calls to either `First` or `Second`.
#[pin_project(project = EitherFutureProj)]
#[derive(Debug, Copy, Clone)]
Expand Down
6 changes: 3 additions & 3 deletions core/src/identity/ecdsa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl PublicKey {
let buf = Self::del_asn1_header(k).ok_or_else(|| {
DecodingError::new("failed to parse asn.1 encoded ecdsa p256 public key")
})?;
Self::from_bytes(&buf)
Self::from_bytes(buf)
}

// ecPublicKey (ANSI X9.62 public key type) OID: 1.2.840.10045.2.1
Expand Down Expand Up @@ -198,8 +198,8 @@ impl PublicKey {
if asn1_head[0] != 0x30
|| asn1_head[2] != 0x30
|| asn1_head[3] as usize != oids_len
|| &oids_buf[..Self::EC_PUBLIC_KEY_OID.len()] != &Self::EC_PUBLIC_KEY_OID
|| &oids_buf[Self::EC_PUBLIC_KEY_OID.len()..] != &Self::SECP_256_R1_OID
|| oids_buf[..Self::EC_PUBLIC_KEY_OID.len()] != Self::EC_PUBLIC_KEY_OID
|| oids_buf[Self::EC_PUBLIC_KEY_OID.len()..] != Self::SECP_256_R1_OID
|| bitstr_head[0] != 0x03
|| bitstr_head[2] != 0x00
{
Expand Down
90 changes: 9 additions & 81 deletions core/src/muxing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,62 +63,25 @@ mod singleton;
/// Provides multiplexing for a connection by allowing users to open substreams.
///
/// A substream created by a [`StreamMuxer`] is a type that implements [`AsyncRead`] and [`AsyncWrite`].
///
/// Inbound substreams are reported via [`StreamMuxer::poll_event`].
/// Outbound substreams can be opened via [`StreamMuxer::open_outbound`] and subsequent polling via
/// [`StreamMuxer::poll_outbound`].
/// The [`StreamMuxer`] itself is modelled closely after [`AsyncWrite`]. It features `poll`-style
/// functions that allow the implementation to make progress on various tasks.
pub trait StreamMuxer {
/// Type of the object that represents the raw substream where data can be read and written.
type Substream: AsyncRead + AsyncWrite;

/// Future that will be resolved when the outgoing substream is open.
type OutboundSubstream;

/// Error type of the muxer
type Error: std::error::Error;

/// Polls for a connection-wide event.
///
/// This function behaves the same as a `Stream`.
///
/// If `Pending` is returned, then the current task will be notified once the muxer
/// is ready to be polled, similar to the API of `Stream::poll()`.
/// Only the latest task that was used to call this method may be notified.
///
/// It is permissible and common to use this method to perform background
/// work, such as processing incoming packets and polling timers.
///
/// An error can be generated if the connection has been closed.
fn poll_event(
&self,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent<Self::Substream>, Self::Error>>;
/// Poll for new inbound substreams.
fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>>;

/// Opens a new outgoing substream, and produces the equivalent to a future that will be
/// resolved when it becomes available.
///
/// The API of `OutboundSubstream` is totally opaque, and the object can only be interfaced
/// through the methods on the `StreamMuxer` trait.
fn open_outbound(&self) -> Self::OutboundSubstream;
/// Poll for a new, outbound substream.
fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>>;

/// Polls the outbound substream.
///
/// If `Pending` is returned, then the current task will be notified once the substream
/// is ready to be polled, similar to the API of `Future::poll()`.
/// However, for each individual outbound substream, only the latest task that was used to
/// call this method may be notified.
/// Poll for an address change of the underlying connection.
///
/// May panic or produce an undefined result if an earlier polling of the same substream
/// returned `Ready` or `Err`.
fn poll_outbound(
&self,
cx: &mut Context<'_>,
s: &mut Self::OutboundSubstream,
) -> Poll<Result<Self::Substream, Self::Error>>;

/// Destroys an outbound substream future. Use this after the outbound substream has finished,
/// or if you want to interrupt it.
fn destroy_outbound(&self, s: Self::OutboundSubstream);
/// Not all implementations may support this feature.
fn poll_address_change(&self, cx: &mut Context<'_>) -> Poll<Result<Multiaddr, Self::Error>>;

/// Closes this `StreamMuxer`.
///
Expand All @@ -132,38 +95,3 @@ pub trait StreamMuxer {
/// > immediately dropping the muxer.
fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
}

/// Event about a connection, reported by an implementation of [`StreamMuxer`].
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StreamMuxerEvent<T> {
/// Remote has opened a new substream. Contains the substream in question.
InboundSubstream(T),

/// Address to the remote has changed. The previous one is now obsolete.
///
/// > **Note**: This can for example happen when using the QUIC protocol, where the two nodes
/// > can change their IP address while retaining the same QUIC connection.
AddressChange(Multiaddr),
}

impl<T> StreamMuxerEvent<T> {
/// If `self` is a [`StreamMuxerEvent::InboundSubstream`], returns the content. Otherwise
/// returns `None`.
pub fn into_inbound_substream(self) -> Option<T> {
if let StreamMuxerEvent::InboundSubstream(s) = self {
Some(s)
} else {
None
}
}

/// Map the stream within [`StreamMuxerEvent::InboundSubstream`] to a new type.
pub fn map_inbound_stream<O>(self, map: impl FnOnce(T) -> O) -> StreamMuxerEvent<O> {
match self {
StreamMuxerEvent::InboundSubstream(stream) => {
StreamMuxerEvent::InboundSubstream(map(stream))
}
StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr),
}
}
}
Loading

0 comments on commit 7019d49

Please sign in to comment.