Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transports/quic: Refactor listener handling #17

Closed
wants to merge 44 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
f134c8b
core/transport: remove Transport::Listener
elenaf9 May 15, 2022
aac9e1c
swarm: remove ListenerStream, poll Transport
elenaf9 May 15, 2022
68663fd
transports/tcp: handle transport changes
elenaf9 May 15, 2022
72c76f0
*: adapt majority of other transports
elenaf9 May 16, 2022
945c4a0
transports/tcp: rename *TcpConfig to *TcpTransport
elenaf9 May 21, 2022
d6f0e75
core/transport: adapt remaining transports
elenaf9 May 22, 2022
858590f
core/transports: unify imports, clean code
elenaf9 May 22, 2022
45f9c96
core/transport: split TransportEvent generics
elenaf9 May 22, 2022
bfd5fb0
core/transport: impl Stream for transport::Boxed
elenaf9 May 22, 2022
90721b9
transports/tcp: impl Stream for GenTcpTransport
elenaf9 May 22, 2022
80c3da1
*: format
elenaf9 May 26, 2022
bcb71a1
transports/dns: adapt dns transport
elenaf9 May 26, 2022
38a2b78
transports/dns: adapt websocket transport
elenaf9 May 26, 2022
607412d
Merge branch 'master' of github.com:libp2p/rust-libp2p into refactor-…
elenaf9 May 29, 2022
d7f5019
Remove various `Sync` bounds
thomaseizinger May 19, 2022
b4164e8
transports/tcp: revert Stream impl for GenTcpTransport
elenaf9 May 29, 2022
a7766cd
core/transport/memory: fix listener polling
elenaf9 May 29, 2022
9824acd
transports/uds: adapt uds transport
elenaf9 May 29, 2022
a2988b5
transports/tcp: impl Default for GenTcpTransport
elenaf9 May 29, 2022
2edb0cd
core/transport/memory: add MemoryTransport::new
elenaf9 May 29, 2022
907a5ab
protocols/*: adapt network behaviour protocols
elenaf9 May 29, 2022
1924c12
muxers/mplex: adapt tests and benches
elenaf9 May 29, 2022
8ba3c20
transports/*: adapt tests in upgrade transports
elenaf9 May 29, 2022
96ae97b
src/lib: adapt development transports
elenaf9 May 29, 2022
c686b5a
examples: adapt exmaples to tcp transport changes
elenaf9 May 29, 2022
354d2f0
*: add Transport::remove_listener
elenaf9 May 29, 2022
2226092
core/transport: create ListenerId within Transport
elenaf9 May 29, 2022
8f608cd
Merge branch 'master' of github.com:libp2p/rust-libp2p into refactor-…
elenaf9 May 29, 2022
5f9ebb7
*: fix CI
elenaf9 May 29, 2022
199ce12
Merge branch 'master' of github.com:libp2p/rust-libp2p into refactor-…
elenaf9 Jun 11, 2022
27ee9ca
transports/tcp: fix port-reuse tests
elenaf9 Jun 11, 2022
08f4f80
*: fix intra-doc links
elenaf9 Jun 11, 2022
f92c2a4
transports/tcp: rm unneeded trait-bounds in tests
elenaf9 Jun 11, 2022
a357d71
*: use random ListenerIds instead of namespaced
elenaf9 Jun 20, 2022
a4a745e
core/transport: remove (Partial)Ord for ListenerId
elenaf9 Jun 20, 2022
81c945c
Merge branch 'master' of github.com:libp2p/rust-libp2p into refactor-…
elenaf9 Jun 20, 2022
1c2b9e5
transports/relay: adapt ClientTransport
elenaf9 Jun 26, 2022
b19e11a
*: apply comments from review
elenaf9 Jun 26, 2022
c95c97c
transport/wasm-ext: adapt wasm-ext transport
elenaf9 Jun 26, 2022
58a85e5
Merge branch 'refactor-transport-trait' of github.com:elenaf9/rust-li…
elenaf9 Jun 27, 2022
b239de6
transports/quic: adapt to transport trait changes
elenaf9 Jun 27, 2022
3dfb453
transports/quic: support multiple listening endpoints
elenaf9 Jul 4, 2022
b278d31
transports/quic: re-use endpoints for dialing
elenaf9 Jul 4, 2022
4f7aed8
transports/quic: test endpoint re-use
elenaf9 Jul 4, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ default = [
"websocket",
"yamux",
]

autonat = ["dep:libp2p-autonat"]
dcutr = ["dep:libp2p-dcutr", "libp2p-metrics?/dcutr"]
deflate = ["dep:libp2p-deflate"]
Expand Down
19 changes: 0 additions & 19 deletions core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,25 +43,6 @@ impl std::ops::Add<usize> for ConnectionId {
}
}

/// The ID of a single listener.
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct ListenerId(u64);

impl ListenerId {
/// Creates a `ListenerId` from a non-negative integer.
pub fn new(id: u64) -> Self {
Self(id)
}
}

impl std::ops::Add<u64> for ListenerId {
type Output = Self;

fn add(self, other: u64) -> Self {
Self(self.0 + other)
}
}

/// The endpoint roles associated with a peer-to-peer communication channel.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum Endpoint {
Expand Down
106 changes: 45 additions & 61 deletions core/src/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

use crate::{
muxing::{StreamMuxer, StreamMuxerEvent},
transport::{ListenerEvent, Transport, TransportError},
transport::{ListenerId, Transport, TransportError, TransportEvent},
Multiaddr, ProtocolName,
};
use futures::{
Expand Down Expand Up @@ -353,48 +353,6 @@ pub enum EitherOutbound<A: StreamMuxer, B: StreamMuxer> {
B(B::OutboundSubstream),
}

/// Implements `Stream` and dispatches all method calls to either `First` or `Second`.
#[pin_project(project = EitherListenStreamProj)]
#[derive(Debug, Copy, Clone)]
#[must_use = "futures do nothing unless polled"]
pub enum EitherListenStream<A, B> {
First(#[pin] A),
Second(#[pin] B),
}

impl<AStream, BStream, AInner, BInner, AError, BError> Stream
for EitherListenStream<AStream, BStream>
where
AStream: TryStream<Ok = ListenerEvent<AInner, AError>, Error = AError>,
BStream: TryStream<Ok = ListenerEvent<BInner, BError>, Error = BError>,
{
type Item = Result<
ListenerEvent<EitherFuture<AInner, BInner>, EitherError<AError, BError>>,
EitherError<AError, BError>,
>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.project() {
EitherListenStreamProj::First(a) => match TryStream::try_poll_next(a, cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Ok(le))) => Poll::Ready(Some(Ok(le
.map(EitherFuture::First)
.map_err(EitherError::A)))),
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(EitherError::A(err)))),
},
EitherListenStreamProj::Second(a) => match TryStream::try_poll_next(a, cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Ok(le))) => Poll::Ready(Some(Ok(le
.map(EitherFuture::Second)
.map_err(EitherError::B)))),
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(EitherError::B(err)))),
},
}
}
}

/// Implements `Future` and dispatches all method calls to either `First` or `Second`.
#[pin_project(project = EitherFutureProj)]
#[derive(Debug, Copy, Clone)]
Expand Down Expand Up @@ -464,11 +422,12 @@ impl<A: ProtocolName, B: ProtocolName> ProtocolName for EitherName<A, B> {
}
}
}

#[pin_project(project = EitherTransportProj)]
#[derive(Debug, Copy, Clone)]
#[must_use = "futures do nothing unless polled"]
pub enum EitherTransport<A, B> {
Left(A),
Right(B),
Left(#[pin] A),
Right(#[pin] B),
}

impl<A, B> Transport for EitherTransport<A, B>
Expand All @@ -478,29 +437,54 @@ where
{
type Output = EitherOutput<A::Output, B::Output>;
type Error = EitherError<A::Error, B::Error>;
type Listener = EitherListenStream<A::Listener, B::Listener>;
type ListenerUpgrade = EitherFuture<A::ListenerUpgrade, B::ListenerUpgrade>;
type Dial = EitherFuture<A::Dial, B::Dial>;

fn listen_on(
&mut self,
addr: Multiaddr,
) -> Result<Self::Listener, TransportError<Self::Error>> {
use TransportError::*;
match self {
EitherTransport::Left(a) => match a.listen_on(addr) {
Ok(listener) => Ok(EitherListenStream::First(listener)),
Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
Err(Other(err)) => Err(Other(EitherError::A(err))),
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
match self.project() {
EitherTransportProj::Left(a) => match a.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(event) => Poll::Ready(
event
.map_upgrade(EitherFuture::First)
.map_err(EitherError::A),
),
},
EitherTransport::Right(b) => match b.listen_on(addr) {
Ok(listener) => Ok(EitherListenStream::Second(listener)),
Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
Err(Other(err)) => Err(Other(EitherError::B(err))),
EitherTransportProj::Right(b) => match b.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(event) => Poll::Ready(
event
.map_upgrade(EitherFuture::Second)
.map_err(EitherError::B),
),
},
}
}

fn remove_listener(&mut self, id: ListenerId) -> bool {
match self {
EitherTransport::Left(t) => t.remove_listener(id),
EitherTransport::Right(t) => t.remove_listener(id),
}
}

fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
use TransportError::*;
match self {
EitherTransport::Left(a) => a.listen_on(addr).map_err(|e| match e {
MultiaddrNotSupported(addr) => MultiaddrNotSupported(addr),
Other(err) => Other(EitherError::A(err)),
}),
EitherTransport::Right(b) => b.listen_on(addr).map_err(|e| match e {
MultiaddrNotSupported(addr) => MultiaddrNotSupported(addr),
Other(err) => Other(EitherError::B(err)),
}),
}
}

fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
use TransportError::*;
match self {
Expand Down
Loading