Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transports/quic: Adapt QuicMuxer to upstream StreamMuxer changes #6

Closed
wants to merge 97 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
97 commits
Select commit Hold shift + click to select a range
7c8a977
swarm/src/handler: Document responsibility limiting inbound streams (…
mxinden Jul 14, 2022
d4f8ec2
misc/metrics: Track # connected nodes supporting specific protocol (…
mxinden Jul 15, 2022
1a553db
core/muxing: Flatten `StreamMuxer` interface to `poll_{inbound,outbou…
thomaseizinger Jul 18, 2022
e95232c
build(deps): Bump Swatinem/rust-cache from 1.4.0 to 2.0.0 (#2759)
dependabot[bot] Jul 19, 2022
66c2319
transports/tcp: Bump to v0.35.0 (#2760)
mxinden Jul 19, 2022
c8066df
*: Update to `if-watch` `1.1.1` (#2754)
tgmichel Jul 19, 2022
163c5c1
README.md: Add crates.io and docs.rs badges (#2766)
LesnyRumcajs Jul 21, 2022
51a8471
build(deps): Update prometheus-client requirement from 0.16.0 to 0.17…
dependabot[bot] Jul 22, 2022
f15a3dc
core/muxing: Drop `Unpin` requirement from `SubstreamBox` (#2762)
thomaseizinger Jul 22, 2022
2e2c117
core/tests: Remove unnecessary `Arc` (#2763)
thomaseizinger Jul 22, 2022
95713ab
core: fix PR number in changelog entry (#2769)
elenaf9 Jul 23, 2022
f85a990
core/tests: Remove unnecessary util module (#2764)
thomaseizinger Jul 25, 2022
c19a211
misc/metrics: fix clippy::assign-op-pattern (#2773)
elenaf9 Jul 25, 2022
0ec3bbc
core/muxing: Remove `Unpin` requirement from `StreamMuxer::Substream`…
thomaseizinger Jul 25, 2022
74f01e4
transports/tcp: fix clippy::from-over-into (#2774)
elenaf9 Jul 25, 2022
ce963df
core: fix clippy::op-ref, clippy::needless-borrow (#2770)
elenaf9 Jul 25, 2022
56c492c
core/muxing: Drop `Sync` requirement for `StreamMuxer` on `StreamMuxe…
thomaseizinger Jul 27, 2022
09c6908
protocols/dcutr: Fix clippy lints (#2772)
elenaf9 Jul 28, 2022
eaf3f3a
.cargo: Check all features in custom-clippy (#2771)
elenaf9 Jul 28, 2022
7019d49
Merge branch 'master' of github.com:libp2p/rust-libp2p into quic/mult…
elenaf9 Jul 31, 2022
2b9e212
examples/README.md: Fix tutorial link (#2790)
lukehinds Aug 2, 2022
028dece
core/muxing: Have functions on `StreamMuxer` take `Pin<&mut Self>` (#…
thomaseizinger Aug 3, 2022
07c0dba
Merge branch 'master' of github.com:libp2p/rust-libp2p into quic/muxer
elenaf9 Aug 3, 2022
57840a3
transports/quic: adapt QuicMuxer to libp2p#2724
elenaf9 Aug 3, 2022
579b1be
swarm-derive/: Generate OutEvent if not provided (#2792)
mxinden Aug 8, 2022
e2b83b7
SECURITY.md: Document supported releases and security mail addr (#2800)
mxinden Aug 8, 2022
3da8b42
README: Point to [email protected] (#2799)
mxinden Aug 8, 2022
1012579
protocols/: Remove passing default variant to `WithPeerId::condition`…
K0UR05H Aug 10, 2022
a4110a2
*: Remove `inject_connected` / `inject_disconnected` from docs (#2805)
K0UR05H Aug 10, 2022
0a01c81
misc/multistream-select: Replace msg.get(0) with msg.first() (#2816)
mxinden Aug 13, 2022
3ce0ef9
transports/quic: apply suggestions from review
elenaf9 Aug 13, 2022
3060d12
transports/quic: rename QuicMuxerInner -> Inner
elenaf9 Aug 13, 2022
63c6edc
transports/quic: improve poll_{inbound, outbound}
elenaf9 Aug 13, 2022
06aaea6
*: Fix `clippy::derive-partial-eq-without-eq` (#2818)
elenaf9 Aug 14, 2022
cef5056
core/muxing: Generalise `StreamMuxer::poll_address_change` to `poll` …
thomaseizinger Aug 16, 2022
0e5a25d
examples/file-sharing: Support binary files (#2786)
qidu Aug 16, 2022
878c49f
swarm/src/behaviour: Deprecate NetworkBehaviourEventProcess (#2784)
mxinden Aug 16, 2022
6a9fa3d
build(deps): Update prost requirement from 0.10 to 0.11 (#2788)
dependabot[bot] Aug 16, 2022
8dc0188
swarm/src/connection: Test max_negotiating_inbound_streams (#2785)
mxinden Aug 16, 2022
67266c6
swarm-derive/: Add where clause of behaviour to generated out event (…
mxinden Aug 17, 2022
d2c5053
build(deps): Update prometheus-client requirement from 0.17.0 to 0.18…
dependabot[bot] Aug 17, 2022
a2738fd
swarm-derive/: Derive Debug for generated OutEvent (#2821)
mxinden Aug 17, 2022
475289c
docs/coding-guidelines: Add document (#2780)
mxinden Aug 17, 2022
8931860
core/identity: Allow clippy::large-enum-variant on `Keypair` (#2827)
elenaf9 Aug 19, 2022
1aeaba3
Merge branch 'master' of github.com:libp2p/rust-libp2p into quic/muxer
elenaf9 Aug 19, 2022
95fc6da
transports/quic: drive connection in `QuicMuxer::poll`
elenaf9 Aug 19, 2022
3d3666e
*: Enforce no clippy warnings for examples (#2826)
thomaseizinger Aug 20, 2022
217dd2c
clippy.toml: Create config and disallow unbounded channels (#2823)
mxinden Aug 20, 2022
0d7c8a5
transports/quic: refactor `Connection::poll_event`
elenaf9 Aug 21, 2022
67b52aa
transports/quic: rm `Connection::is_handshaking`
elenaf9 Aug 21, 2022
66974fc
transports/quic: refactor connection closing
elenaf9 Aug 22, 2022
4253080
*: Prepare v0.47.0 (#2830)
mxinden Aug 22, 2022
c88efe8
transports/quic: rm mutex around to_endpoint tx
elenaf9 Aug 22, 2022
0a82be4
transports/quic/tests: drive peers concurrently
elenaf9 Aug 22, 2022
d610e4b
protocols/dcutr: Disable `libp2p-core` default features (#2836)
elenaf9 Aug 23, 2022
d92cab8
build(deps): Update p256 requirement from 0.10.0 to 0.11.0 (#2636)
dependabot[bot] Aug 23, 2022
ca07ce4
swarm/behaviour: Remove deprecated NetworkBehaviourEventProcess (#2840)
mxinden Aug 26, 2022
a3dec47
docs/coding-guidelines: Document limit on number of tasks (#2839)
mxinden Aug 26, 2022
247b553
swarm-derive/: Remove support for custom poll method (#2841)
mxinden Aug 28, 2022
6855ab9
swarm-derive/: Remove support for ignoring fields on struct (#2842)
mxinden Aug 29, 2022
e01f77b
transports/noise: Migrate away from deprecated `sodiumoxide` for test…
pinkforest Aug 30, 2022
f16561c
.github/workflows: Split advisory issues from PR workflows using `car…
pinkforest Aug 30, 2022
36a2773
*: Update changelogs for prost dep update (#2851)
divagant-martian Aug 30, 2022
89f898c
protocols/mdns: Allow users to choose between async-io and tokio runt…
gallegogt Sep 2, 2022
cee199a
protocols/kad: Support multiple protocol names (#2846)
dmitry-markin Sep 3, 2022
f04df29
.git-blame-ignore-revs/: Initialize and add rustfmt commit (#2864)
thomaseizinger Sep 4, 2022
b8c3b28
protocols/gossipsub: Allow publishing to anything that implements `In…
GamePad64 Sep 5, 2022
a40180c
.github/: Introduce interop tests (#2835)
laurentsenta Sep 7, 2022
8644c65
core/: Introduce `rsa` feature flag to avoid `ring` dependency (#2860)
GamePad64 Sep 7, 2022
2eca38c
core/upgrade/: Add `ReadyUpgrade` (#2855)
thomaseizinger Sep 7, 2022
d2eddf4
muxers/yamux: Remove `OpenSubstreamToken` (#2873)
thomaseizinger Sep 7, 2022
83c6795
*: Prepare v0.48.0 (#2869)
mxinden Sep 7, 2022
c650dc1
*: Replace _serde with dep:serde in Cargo.toml (#2868)
GamePad64 Sep 8, 2022
69caf98
Merge branch 'master' of github.com:libp2p/rust-libp2p into quic/muxer
elenaf9 Sep 9, 2022
fe3e09b
transports/quic: upgrade to if-watch v2.0.0
elenaf9 Sep 9, 2022
b6924db
transports/quic: fix clippy
elenaf9 Sep 9, 2022
689460f
transports/quic: fix smoke test
elenaf9 Sep 9, 2022
457fb51
transports/tcp: Simplify IfWatcher integration (#2813)
elenaf9 Sep 10, 2022
41d39fb
transports/quic: add `Endpoint::try_send`
elenaf9 Sep 10, 2022
66c2755
swarm/: Fix rare test failure of `multiple_addresses_err` (#2882)
thomaseizinger Sep 11, 2022
72bade1
build(deps): Update env_logger to 0.9 and criterion to 0.4 (#2896)
kpp Sep 14, 2022
5906140
protocols/kad: Remove deprecated `set_protocol_name()` (#2866)
dmitry-markin Sep 15, 2022
2c739e9
protocols/noise: Introduce `NoiseAuthenticated::xx` constructor with …
thomaseizinger Sep 16, 2022
c81b06a
*: Fix various clippy warnings (#2900)
umgefahren Sep 16, 2022
2025de3
swarm-derive/: Allow for templated behaviours (#2907)
thomaseizinger Sep 16, 2022
4c617a0
subscribe
elenaf9 Sep 17, 2022
4e027b1
transports/quic: handle substream being dropped
elenaf9 Sep 19, 2022
bdba780
transports/quic: return err on read after reset
elenaf9 Sep 19, 2022
40cb4f3
transports/quic: apply comments from code review
elenaf9 Sep 19, 2022
f8d1430
transports/quic: better naming, fix docs
elenaf9 Sep 20, 2022
4c3229b
transports/quic: add doc for `Endpoint:try_send`
elenaf9 Sep 20, 2022
e393fe5
transports/quic: add `ip_to_listenaddr`
elenaf9 Sep 20, 2022
d28db18
transports/quic: disable connection migration
elenaf9 Sep 20, 2022
42db0ed
transports/quic: minor fix
elenaf9 Sep 20, 2022
d46b72e
transports/quic: minor fixes
elenaf9 Sep 20, 2022
ec3c74a
transports/quic: rework forwarding of new connections
elenaf9 Sep 20, 2022
b7103aa
transports/quic: fix broken intra-doc link
elenaf9 Sep 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions transports/quic/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ pub enum Error {
/// Endpoint has force-killed this connection because it was too busy.
#[error("Endpoint has force-killed our connection")]
ClosedChannel,
/// The background task driving the endpoint has crashed.
#[error("Background task crashed.")]
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved
TaskCrashed,
/// Error in the inner state machine.
#[error("{0}")]
Quinn(#[from] quinn_proto::ConnectionError),
Expand Down Expand Up @@ -109,15 +112,15 @@ impl Connection {
/// Works for server connections only.
pub fn local_addr(&self) -> SocketAddr {
debug_assert_eq!(self.connection.side(), quinn_proto::Side::Server);
let endpoint_addr = self.endpoint.socket_addr;
let endpoint_addr = self.endpoint.socket_addr();
self.connection
.local_ip()
.map(|ip| SocketAddr::new(ip, endpoint_addr.port()))
.unwrap_or_else(|| {
// In a normal case scenario this should not happen, because
// we get want to get a local addr for a server connection only.
tracing::error!("trying to get quinn::local_ip for a client");
endpoint_addr
*endpoint_addr
})
}

Expand Down Expand Up @@ -214,17 +217,17 @@ impl Connection {
// However we don't deliver substream-related events to the user as long as
// `to_endpoint` is full. This should propagate the back-pressure of `to_endpoint`
// being full to the user.
if self.pending_to_endpoint.is_some() {
match self.endpoint.to_endpoint.poll_ready_unpin(cx) {
Poll::Ready(Ok(())) => {
let to_endpoint = self.pending_to_endpoint.take().expect("is some");
self.endpoint
.to_endpoint
.start_send(to_endpoint)
.expect("Channel is ready.");
if let Some(to_endpoint) = self.pending_to_endpoint.take() {
match self.endpoint.try_send(to_endpoint, cx) {
Ok(Ok(())) => {}
Ok(Err(to_endpoint)) => {
self.pending_to_endpoint = Some(to_endpoint);
return Poll::Pending;
}
Err(_) => {
tracing::error!("Background task crashed.");
return Poll::Ready(ConnectionEvent::ConnectionLost(Error::TaskCrashed));
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved
}
Poll::Ready(Err(_)) => panic!("Background task crashed"),
Poll::Pending => return Poll::Pending,
}
}

Expand Down
28 changes: 24 additions & 4 deletions transports/quic/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@
use crate::{connection::Connection, tls, transport};

use futures::{
channel::{mpsc, oneshot},
channel::{
mpsc::{self, SendError},
oneshot,
},
prelude::*,
};
use quinn_proto::{ClientConfig as QuinnClientConfig, ServerConfig as QuinnServerConfig};
Expand All @@ -40,7 +43,7 @@ use std::{
fmt,
net::{Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket},
sync::Arc,
task::{Poll, Waker},
task::{Context, Poll, Waker},
time::{Duration, Instant},
};

Expand Down Expand Up @@ -87,9 +90,9 @@ impl Config {
#[derive(Clone)]
pub struct Endpoint {
/// Channel to the background of the endpoint.
pub to_endpoint: mpsc::Sender<ToEndpoint>,
to_endpoint: mpsc::Sender<ToEndpoint>,

pub socket_addr: SocketAddr,
socket_addr: SocketAddr,
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved
}

impl Endpoint {
Expand Down Expand Up @@ -142,6 +145,23 @@ impl Endpoint {

Ok(endpoint)
}

pub fn socket_addr(&self) -> &SocketAddr {
&self.socket_addr
}

pub fn try_send(
&mut self,
to_endpoint: ToEndpoint,
cx: &mut Context<'_>,
) -> Result<Result<(), ToEndpoint>, SendError> {
match self.to_endpoint.poll_ready_unpin(cx) {
Poll::Ready(Ok(())) => {}
Poll::Ready(Err(err)) => return Err(err),
Poll::Pending => return Ok(Err(to_endpoint)),
};
self.to_endpoint.start_send(to_endpoint).map(Ok)
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved
}
}

/// Message sent to the endpoint background task.
Expand Down
66 changes: 32 additions & 34 deletions transports/quic/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::endpoint::ToEndpoint;
use crate::Config;
use crate::{endpoint::Endpoint, muxer::QuicMuxer, upgrade::Upgrade};

use futures::channel::mpsc::SendError;
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved
use futures::channel::oneshot;
use futures::ready;
use futures::stream::StreamExt;
Expand Down Expand Up @@ -140,7 +141,7 @@ impl Transport for QuicTransport {
.listeners
.iter_mut()
.filter(|l| {
let listen_addr = l.endpoint.socket_addr;
let listen_addr = l.endpoint.socket_addr();
listen_addr.is_ipv4() == socket_addr.is_ipv4()
&& listen_addr.ip().is_loopback() == socket_addr.ip().is_loopback()
})
Expand Down Expand Up @@ -177,7 +178,7 @@ impl Transport for QuicTransport {
Ok(async move {
let connection = rx
.await
.expect("background task has crashed")
.map_err(|_| Error::TaskCrashed)?
.map_err(Error::Reach)?;
let final_connec = Upgrade::from_connection(connection).await?;
Ok(final_connec)
Expand All @@ -201,10 +202,18 @@ impl Transport for QuicTransport {
cx: &mut Context<'_>,
) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
if let Some(dialer) = self.ipv4_dialer.as_mut() {
dialer.drive_dials(cx)
if dialer.drive_dials(cx).is_err() {
// Background task of dialer crashed.
// Drop dialer and all pending dials so that the connection receiver is notified.
self.ipv4_dialer = None;
}
}
if let Some(dialer) = self.ipv6_dialer.as_mut() {
dialer.drive_dials(cx)
if dialer.drive_dials(cx).is_err() {
// Background task of dialer crashed.
// Drop dialer and all pending dials so that the connection receiver is notified.
self.ipv4_dialer = None;
}
}
match self.listeners.poll_next_unpin(cx) {
Poll::Ready(Some(ev)) => Poll::Ready(ev),
Expand All @@ -228,20 +237,18 @@ impl Dialer {
})
}

fn drive_dials(&mut self, cx: &mut Context<'_>) {
if !self.pending_dials.is_empty() {
match self.endpoint.to_endpoint.poll_ready_unpin(cx) {
Poll::Ready(Ok(())) => {
let to_endpoint = self.pending_dials.pop_front().expect("!is_empty");
self.endpoint
.to_endpoint
.start_send(to_endpoint)
.expect("Channel is ready.");
fn drive_dials(&mut self, cx: &mut Context<'_>) -> Result<(), SendError> {
if let Some(to_endpoint) = self.pending_dials.pop_front() {
match self.endpoint.try_send(to_endpoint, cx) {
Ok(Ok(())) => {}
Ok(Err(to_endpoint)) => self.pending_dials.push_front(to_endpoint),
Err(err) => {
tracing::error!("Background task of dialing endpoint crashed.");
return Err(err);
}
Poll::Ready(Err(_)) => panic!("Background task crashed."),
Poll::Pending => {}
}
}
Ok(())
}
}

Expand Down Expand Up @@ -282,7 +289,7 @@ impl Listener {
pending_event = None;
} else {
if_watcher = None;
let ma = socketaddr_to_multiaddr(&endpoint.socket_addr);
let ma = socketaddr_to_multiaddr(endpoint.socket_addr());
pending_event = Some(TransportEvent::NewAddress {
listener_id,
listen_addr: ma,
Expand Down Expand Up @@ -324,8 +331,8 @@ impl Listener {
match ready!(if_watcher.poll_if_event(cx)) {
Ok(IfEvent::Up(inet)) => {
let ip = inet.addr();
if self.endpoint.socket_addr.is_ipv4() == ip.is_ipv4() {
let socket_addr = SocketAddr::new(ip, self.endpoint.socket_addr.port());
if self.endpoint.socket_addr().is_ipv4() == ip.is_ipv4() {
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved
let socket_addr = SocketAddr::new(ip, self.endpoint.socket_addr().port());
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved
let ma = socketaddr_to_multiaddr(&socket_addr);
tracing::debug!("New listen address: {}", ma);
return Poll::Ready(TransportEvent::NewAddress {
Expand All @@ -336,8 +343,8 @@ impl Listener {
}
Ok(IfEvent::Down(inet)) => {
let ip = inet.addr();
if self.endpoint.socket_addr.is_ipv4() == ip.is_ipv4() {
let socket_addr = SocketAddr::new(ip, self.endpoint.socket_addr.port());
if self.endpoint.socket_addr().is_ipv4() == ip.is_ipv4() {
let socket_addr = SocketAddr::new(ip, self.endpoint.socket_addr().port());
let ma = socketaddr_to_multiaddr(&socket_addr);
tracing::debug!("Expired listen address: {}", ma);
return Poll::Ready(TransportEvent::AddressExpired {
Expand Down Expand Up @@ -371,23 +378,14 @@ impl Stream for Listener {
Poll::Ready(event) => return Poll::Ready(Some(event)),
Poll::Pending => {}
}
if !self.pending_dials.is_empty() {
match self.endpoint.to_endpoint.poll_ready_unpin(cx) {
Poll::Ready(Ok(_)) => {
let to_endpoint = self
.pending_dials
.pop_front()
.expect("Pending dials is not empty.");
self.endpoint
.to_endpoint
.start_send(to_endpoint)
.expect("Channel is ready");
}
Poll::Ready(Err(_)) => {
if let Some(to_endpoint) = self.pending_dials.pop_front() {
match self.endpoint.try_send(to_endpoint, cx) {
Ok(Ok(())) => {}
Ok(Err(to_endpoint)) => self.pending_dials.push_front(to_endpoint),
Err(_) => {
self.close(Err(Error::TaskCrashed));
continue;
}
Poll::Pending => {}
}
}
match self.new_connections_rx.poll_next_unpin(cx) {
Expand Down
7 changes: 7 additions & 0 deletions transports/quic/tests/smoke.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::Result;
use async_trait::async_trait;
use futures::channel::oneshot;
use futures::future::{join, FutureExt};
use futures::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use futures::select;
Expand Down Expand Up @@ -83,6 +84,8 @@ async fn smoke() -> Result<()> {

let b_id = *b.local_peer_id();

let (sync_tx, sync_rx) = oneshot::channel();

let fut_a = async move {
match a.next().await {
Some(SwarmEvent::IncomingConnection { .. }) => {}
Expand Down Expand Up @@ -133,6 +136,8 @@ async fn smoke() -> Result<()> {
e => panic!("{:?}", e),
}

sync_rx.await.unwrap();

a.disconnect_peer_id(b_id).unwrap();

match a.next().await {
Expand Down Expand Up @@ -188,6 +193,8 @@ async fn smoke() -> Result<()> {
e => panic!("{:?}", e),
}

sync_tx.send(()).unwrap();

match b.next().await {
Some(SwarmEvent::ConnectionClosed {
cause: Some(ConnectionError::IO(_)),
Expand Down