Skip to content

Commit

Permalink
Add transmission and retransmission logic
Browse files Browse the repository at this point in the history
  • Loading branch information
divagant-martian committed Nov 21, 2024
1 parent 2071a7c commit d9c00fb
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 6 deletions.
122 changes: 118 additions & 4 deletions quinn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,12 @@ pub struct Connection {
/// no outgoing application data.
app_limited: bool,

//
// ObservedAddr
//
/// Sequence number for the next observed address frame sent to the peer.
next_observed_addr_seq_no: VarInt,

streams: StreamsState,
/// Surplus remote CIDs for future use on new paths
rem_cids: CidQueue,
Expand Down Expand Up @@ -345,6 +351,8 @@ impl Connection {
receiving_ecn: false,
total_authed_packets: 0,

next_observed_addr_seq_no: 0u32.into(),

streams: StreamsState::new(
side,
config.max_concurrent_uni_streams,
Expand Down Expand Up @@ -2638,6 +2646,9 @@ impl Connection {
let mut close = None;
let payload_len = payload.len();
let mut ack_eliciting = false;
// if this packet triggers a path migration and includes a observed address frame, it's
// stored here
let mut migration_observed_addr = None;
for result in frame::Iter::new(payload)? {
let frame = result?;
let span = match frame {
Expand Down Expand Up @@ -2910,7 +2921,33 @@ impl Connection {
self.discard_space(now, SpaceId::Handshake);
}
}
Frame::ObservedAddr(_observed) => {}
Frame::ObservedAddr(observed) => {
// check if params allows the peer to send report and this node to receive it
if !self
.peer_params
.address_discovery_role
.should_report(&self.config.address_discovery_role)
{
return Err(TransportError::PROTOCOL_VIOLATION(
"received OBSERVED_ADDRESS frame when not negotiated",
));
}
// must only be sent in data space
if packet.header.space() != SpaceId::Data {
return Err(TransportError::PROTOCOL_VIOLATION(
"OBSERVED_ADDRESS frame outside data space",
));
}

if remote == self.path.remote {
if let Some(updated) = self.path.update_observed_addr_report(observed) {
self.events.push_back(Event::ObservedAddr(updated));
}
} else {
// include in migration
migration_observed_addr = Some(observed)
}
}
}
}

Expand Down Expand Up @@ -2947,7 +2984,7 @@ impl Connection {
.migration,
"migration-initiating packets should have been dropped immediately"
);
self.migrate(now, remote);
self.migrate(now, remote, migration_observed_addr);
// Break linkability, if possible
self.update_rem_cid();
self.spin = false;
Expand All @@ -2956,7 +2993,7 @@ impl Connection {
Ok(())
}

fn migrate(&mut self, now: Instant, remote: SocketAddr) {
fn migrate(&mut self, now: Instant, remote: SocketAddr, observed_addr: Option<ObservedAddr>) {
trace!(%remote, "migration initiated");
// Reset rtt/congestion state for new path unless it looks like a NAT rebinding.
// Note that the congestion window will not grow until validation terminates. Helps mitigate
Expand All @@ -2976,6 +3013,12 @@ impl Connection {
&self.config,
)
};
new_path.last_observed_addr_report = self.path.last_observed_addr_report.clone();
if let Some(report) = observed_addr {
if let Some(updated) = new_path.update_observed_addr_report(report) {
self.events.push_back(Event::ObservedAddr(updated));
}
}
new_path.challenge = Some(self.rng.gen());
new_path.challenge_pending = true;
let prev_pto = self.pto(SpaceId::Data);
Expand Down Expand Up @@ -3060,6 +3103,53 @@ impl Connection {
self.stats.frame_tx.handshake_done.saturating_add(1);
}

// OBSERVED_ADDR
let mut send_observed_address =
|space_id: SpaceId,
buf: &mut Vec<u8>,
max_size: usize,
space: &mut PacketSpace,
sent: &mut SentFrames,
stats: &mut ConnectionStats,
skip_sent_check: bool| {
// should only be sent within Data space and only if allowed by extension
// negotiation
// send is also skipped if the path has already sent an observed address
let send_allowed = self
.config
.address_discovery_role
.should_report(&self.peer_params.address_discovery_role);
let send_required =
space.pending.observed_addr || !self.path.observed_addr_sent || skip_sent_check;
if space_id != SpaceId::Data || !send_allowed || !send_required {
return;
}

let observed =
frame::ObservedAddr::new(self.path.remote, self.next_observed_addr_seq_no);

if buf.len() + observed.size() < max_size {
observed.write(buf);

self.next_observed_addr_seq_no =
self.next_observed_addr_seq_no.saturating_add(1u8);
self.path.observed_addr_sent = true;

stats.frame_tx.observed_addr += 1;
sent.retransmits.get_or_create().observed_addr = true;
space.pending.observed_addr = false;
}
};
send_observed_address(
space_id,
buf,
max_size,
space,
&mut sent,
&mut self.stats,
false,
);

// PING
if mem::replace(&mut space.ping_pending, false) {
trace!("PING");
Expand Down Expand Up @@ -3129,7 +3219,16 @@ impl Connection {
trace!("PATH_CHALLENGE {:08x}", token);
buf.write(frame::FrameType::PATH_CHALLENGE);
buf.write(token);
self.stats.frame_tx.path_challenge += 1;

send_observed_address(
space_id,
buf,
max_size,
space,
&mut sent,
&mut self.stats,
true,
);
}
}

Expand All @@ -3142,6 +3241,19 @@ impl Connection {
buf.write(frame::FrameType::PATH_RESPONSE);
buf.write(token);
self.stats.frame_tx.path_response += 1;

// NOTE: this is technically not required but might be useful to ride the
// request/response nature of path challenges to refresh an observation
// Since PATH_RESPONSE is a probing frame, this is allowed by the spec.
send_observed_address(
space_id,
buf,
max_size,
space,
&mut sent,
&mut self.stats,
true,
);
}
}

Expand Down Expand Up @@ -3762,6 +3874,8 @@ pub enum Event {
DatagramReceived,
/// One or more application datagrams have been sent after blocking
DatagramsUnblocked,
/// Received an observation of our external address from the peer.
ObservedAddr(SocketAddr),
}

fn instant_saturating_sub(x: Instant, y: Instant) -> Duration {
Expand Down
48 changes: 47 additions & 1 deletion quinn-proto/src/connection/paths.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use super::{
pacing::Pacer,
spaces::{PacketSpace, SentPacket},
};
use crate::{congestion, packet::SpaceId, Duration, Instant, TransportConfig, TIMER_GRANULARITY};
use crate::{
congestion, frame::ObservedAddr, packet::SpaceId, Duration, Instant, TransportConfig,
TIMER_GRANULARITY,
};

/// Description of a particular network path
pub(super) struct PathData {
Expand Down Expand Up @@ -37,6 +40,11 @@ pub(super) struct PathData {
/// Used in persistent congestion determination.
pub(super) first_packet_after_rtt_sample: Option<(SpaceId, u64)>,
pub(super) in_flight: InFlight,
/// Whether this path has had it's remote address reported back to the peer. This only happens
/// if both peers agree to so based on their transport parameters.
pub(super) observed_addr_sent: bool,
/// Observed address frame with the largest sequence number received from the peer on this path.
pub(super) last_observed_addr_report: Option<ObservedAddr>,
/// Number of the first packet sent on this path
///
/// Used to determine whether a packet was sent on an earlier path. Insufficient to determine if
Expand Down Expand Up @@ -90,10 +98,15 @@ impl PathData {
),
first_packet_after_rtt_sample: None,
in_flight: InFlight::new(),
observed_addr_sent: false,
last_observed_addr_report: None,
first_packet: None,
}
}

/// Create a new path from a previous one.
///
/// This should only be called when migrating paths.
pub(super) fn from_previous(remote: SocketAddr, prev: &Self, now: Instant) -> Self {
let congestion = prev.congestion.clone_box();
let smoothed_rtt = prev.rtt.get();
Expand All @@ -111,6 +124,8 @@ impl PathData {
mtud: prev.mtud.clone(),
first_packet_after_rtt_sample: prev.first_packet_after_rtt_sample,
in_flight: InFlight::new(),
observed_addr_sent: false,
last_observed_addr_report: None,
first_packet: None,
}
}
Expand Down Expand Up @@ -156,6 +171,37 @@ impl PathData {
self.in_flight.remove(packet);
true
}

/// Updates the last observed address report received on this path.
///
/// If the address was updated, it's returned to be informed to the application.
#[must_use = "updated observed address must be reported to the application"]
pub(super) fn update_observed_addr_report(
&mut self,
observed: ObservedAddr,
) -> Option<SocketAddr> {
match self.last_observed_addr_report.as_mut() {
Some(prev) => {
if prev.seq_no >= observed.seq_no {
// frames that do not increase the sequence number on this path are ignored
None
} else if prev.ip == observed.ip && prev.port == observed.port {
// keep track of the last seq_no but do not report the address as updated
prev.seq_no = observed.seq_no;
None
} else {
let addr = observed.socket_addr();
self.last_observed_addr_report = Some(observed);
Some(addr)
}
}
None => {
let addr = observed.socket_addr();
self.last_observed_addr_report = Some(observed);
Some(addr)
}
}
}
}

/// RTT estimation for a particular network path
Expand Down
3 changes: 3 additions & 0 deletions quinn-proto/src/connection/spaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ pub struct Retransmits {
pub(super) retire_cids: Vec<u64>,
pub(super) ack_frequency: bool,
pub(super) handshake_done: bool,
pub(super) observed_addr: bool,
}

impl Retransmits {
Expand All @@ -326,6 +327,7 @@ impl Retransmits {
&& self.retire_cids.is_empty()
&& !self.ack_frequency
&& !self.handshake_done
&& !self.observed_addr
}
}

Expand All @@ -347,6 +349,7 @@ impl ::std::ops::BitOrAssign for Retransmits {
self.retire_cids.extend(rhs.retire_cids);
self.ack_frequency |= rhs.ack_frequency;
self.handshake_done |= rhs.handshake_done;
self.observed_addr |= rhs.observed_addr;
}
}

Expand Down
1 change: 1 addition & 0 deletions quinn/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ tokio = { workspace = true, features = ["rt", "rt-multi-thread", "time", "macros
tracing-subscriber = { workspace = true }
tracing-futures = { workspace = true }
url = { workspace = true }
tokio-stream = "0.1.15"

[[example]]
name = "server"
Expand Down
17 changes: 16 additions & 1 deletion quinn/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use bytes::Bytes;
use pin_project_lite::pin_project;
use rustc_hash::FxHashMap;
use thiserror::Error;
use tokio::sync::{futures::Notified, mpsc, oneshot, Notify};
use tokio::sync::{futures::Notified, mpsc, oneshot, watch, Notify};
use tracing::{debug_span, Instrument, Span};

use crate::{
Expand Down Expand Up @@ -636,6 +636,12 @@ impl Connection {
// May need to send MAX_STREAMS to make progress
conn.wake();
}

/// Track changed on our external address as reported by the peer.
pub fn observed_external_addr(&self) -> watch::Receiver<Option<SocketAddr>> {
let conn = self.0.state.lock("external_addr");
conn.observed_external_addr.subscribe()
}
}

pin_project! {
Expand Down Expand Up @@ -892,6 +898,7 @@ impl ConnectionRef {
runtime,
send_buffer: Vec::new(),
buffered_transmit: None,
observed_external_addr: watch::Sender::new(None),
}),
shared: Shared::default(),
}))
Expand Down Expand Up @@ -974,6 +981,8 @@ pub(crate) struct State {
send_buffer: Vec<u8>,
/// We buffer a transmit when the underlying I/O would block
buffered_transmit: Option<proto::Transmit>,
/// Our last external address reported by the peer.
pub(crate) observed_external_addr: watch::Sender<Option<SocketAddr>>,
}

impl State {
Expand Down Expand Up @@ -1131,6 +1140,12 @@ impl State {
wake_stream(id, &mut self.stopped);
wake_stream(id, &mut self.blocked_writers);
}
ObservedAddr(observed) => {
self.observed_external_addr.send_if_modified(|addr| {
let old = addr.replace(observed);
old != *addr
});
}
}
}
}
Expand Down

0 comments on commit d9c00fb

Please sign in to comment.