From d9c00fb161bd22c8ff61ffa09e8937abc5c9ccf9 Mon Sep 17 00:00:00 2001 From: Diva M Date: Thu, 21 Nov 2024 12:23:28 -0500 Subject: [PATCH] Add transmission and retransmission logic --- quinn-proto/src/connection/mod.rs | 122 ++++++++++++++++++++++++++- quinn-proto/src/connection/paths.rs | 48 ++++++++++- quinn-proto/src/connection/spaces.rs | 3 + quinn/Cargo.toml | 1 + quinn/src/connection.rs | 17 +++- 5 files changed, 185 insertions(+), 6 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 4a8e6b2f4..a29d25932 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -226,6 +226,12 @@ pub struct Connection { /// no outgoing application data. app_limited: bool, + // + // ObservedAddr + // + /// Sequence number for the next observed address frame sent to the peer. + next_observed_addr_seq_no: VarInt, + streams: StreamsState, /// Surplus remote CIDs for future use on new paths rem_cids: CidQueue, @@ -345,6 +351,8 @@ impl Connection { receiving_ecn: false, total_authed_packets: 0, + next_observed_addr_seq_no: 0u32.into(), + streams: StreamsState::new( side, config.max_concurrent_uni_streams, @@ -2638,6 +2646,9 @@ impl Connection { let mut close = None; let payload_len = payload.len(); let mut ack_eliciting = false; + // if this packet triggers a path migration and includes a observed address frame, it's + // stored here + let mut migration_observed_addr = None; for result in frame::Iter::new(payload)? { let frame = result?; let span = match frame { @@ -2910,7 +2921,33 @@ impl Connection { self.discard_space(now, SpaceId::Handshake); } } - Frame::ObservedAddr(_observed) => {} + Frame::ObservedAddr(observed) => { + // check if params allows the peer to send report and this node to receive it + if !self + .peer_params + .address_discovery_role + .should_report(&self.config.address_discovery_role) + { + return Err(TransportError::PROTOCOL_VIOLATION( + "received OBSERVED_ADDRESS frame when not negotiated", + )); + } + // must only be sent in data space + if packet.header.space() != SpaceId::Data { + return Err(TransportError::PROTOCOL_VIOLATION( + "OBSERVED_ADDRESS frame outside data space", + )); + } + + if remote == self.path.remote { + if let Some(updated) = self.path.update_observed_addr_report(observed) { + self.events.push_back(Event::ObservedAddr(updated)); + } + } else { + // include in migration + migration_observed_addr = Some(observed) + } + } } } @@ -2947,7 +2984,7 @@ impl Connection { .migration, "migration-initiating packets should have been dropped immediately" ); - self.migrate(now, remote); + self.migrate(now, remote, migration_observed_addr); // Break linkability, if possible self.update_rem_cid(); self.spin = false; @@ -2956,7 +2993,7 @@ impl Connection { Ok(()) } - fn migrate(&mut self, now: Instant, remote: SocketAddr) { + fn migrate(&mut self, now: Instant, remote: SocketAddr, observed_addr: Option) { trace!(%remote, "migration initiated"); // Reset rtt/congestion state for new path unless it looks like a NAT rebinding. // Note that the congestion window will not grow until validation terminates. Helps mitigate @@ -2976,6 +3013,12 @@ impl Connection { &self.config, ) }; + new_path.last_observed_addr_report = self.path.last_observed_addr_report.clone(); + if let Some(report) = observed_addr { + if let Some(updated) = new_path.update_observed_addr_report(report) { + self.events.push_back(Event::ObservedAddr(updated)); + } + } new_path.challenge = Some(self.rng.gen()); new_path.challenge_pending = true; let prev_pto = self.pto(SpaceId::Data); @@ -3060,6 +3103,53 @@ impl Connection { self.stats.frame_tx.handshake_done.saturating_add(1); } + // OBSERVED_ADDR + let mut send_observed_address = + |space_id: SpaceId, + buf: &mut Vec, + max_size: usize, + space: &mut PacketSpace, + sent: &mut SentFrames, + stats: &mut ConnectionStats, + skip_sent_check: bool| { + // should only be sent within Data space and only if allowed by extension + // negotiation + // send is also skipped if the path has already sent an observed address + let send_allowed = self + .config + .address_discovery_role + .should_report(&self.peer_params.address_discovery_role); + let send_required = + space.pending.observed_addr || !self.path.observed_addr_sent || skip_sent_check; + if space_id != SpaceId::Data || !send_allowed || !send_required { + return; + } + + let observed = + frame::ObservedAddr::new(self.path.remote, self.next_observed_addr_seq_no); + + if buf.len() + observed.size() < max_size { + observed.write(buf); + + self.next_observed_addr_seq_no = + self.next_observed_addr_seq_no.saturating_add(1u8); + self.path.observed_addr_sent = true; + + stats.frame_tx.observed_addr += 1; + sent.retransmits.get_or_create().observed_addr = true; + space.pending.observed_addr = false; + } + }; + send_observed_address( + space_id, + buf, + max_size, + space, + &mut sent, + &mut self.stats, + false, + ); + // PING if mem::replace(&mut space.ping_pending, false) { trace!("PING"); @@ -3129,7 +3219,16 @@ impl Connection { trace!("PATH_CHALLENGE {:08x}", token); buf.write(frame::FrameType::PATH_CHALLENGE); buf.write(token); - self.stats.frame_tx.path_challenge += 1; + + send_observed_address( + space_id, + buf, + max_size, + space, + &mut sent, + &mut self.stats, + true, + ); } } @@ -3142,6 +3241,19 @@ impl Connection { buf.write(frame::FrameType::PATH_RESPONSE); buf.write(token); self.stats.frame_tx.path_response += 1; + + // NOTE: this is technically not required but might be useful to ride the + // request/response nature of path challenges to refresh an observation + // Since PATH_RESPONSE is a probing frame, this is allowed by the spec. + send_observed_address( + space_id, + buf, + max_size, + space, + &mut sent, + &mut self.stats, + true, + ); } } @@ -3762,6 +3874,8 @@ pub enum Event { DatagramReceived, /// One or more application datagrams have been sent after blocking DatagramsUnblocked, + /// Received an observation of our external address from the peer. + ObservedAddr(SocketAddr), } fn instant_saturating_sub(x: Instant, y: Instant) -> Duration { diff --git a/quinn-proto/src/connection/paths.rs b/quinn-proto/src/connection/paths.rs index 2c0476c06..9f7a4e0b9 100644 --- a/quinn-proto/src/connection/paths.rs +++ b/quinn-proto/src/connection/paths.rs @@ -7,7 +7,10 @@ use super::{ pacing::Pacer, spaces::{PacketSpace, SentPacket}, }; -use crate::{congestion, packet::SpaceId, Duration, Instant, TransportConfig, TIMER_GRANULARITY}; +use crate::{ + congestion, frame::ObservedAddr, packet::SpaceId, Duration, Instant, TransportConfig, + TIMER_GRANULARITY, +}; /// Description of a particular network path pub(super) struct PathData { @@ -37,6 +40,11 @@ pub(super) struct PathData { /// Used in persistent congestion determination. pub(super) first_packet_after_rtt_sample: Option<(SpaceId, u64)>, pub(super) in_flight: InFlight, + /// Whether this path has had it's remote address reported back to the peer. This only happens + /// if both peers agree to so based on their transport parameters. + pub(super) observed_addr_sent: bool, + /// Observed address frame with the largest sequence number received from the peer on this path. + pub(super) last_observed_addr_report: Option, /// Number of the first packet sent on this path /// /// Used to determine whether a packet was sent on an earlier path. Insufficient to determine if @@ -90,10 +98,15 @@ impl PathData { ), first_packet_after_rtt_sample: None, in_flight: InFlight::new(), + observed_addr_sent: false, + last_observed_addr_report: None, first_packet: None, } } + /// Create a new path from a previous one. + /// + /// This should only be called when migrating paths. pub(super) fn from_previous(remote: SocketAddr, prev: &Self, now: Instant) -> Self { let congestion = prev.congestion.clone_box(); let smoothed_rtt = prev.rtt.get(); @@ -111,6 +124,8 @@ impl PathData { mtud: prev.mtud.clone(), first_packet_after_rtt_sample: prev.first_packet_after_rtt_sample, in_flight: InFlight::new(), + observed_addr_sent: false, + last_observed_addr_report: None, first_packet: None, } } @@ -156,6 +171,37 @@ impl PathData { self.in_flight.remove(packet); true } + + /// Updates the last observed address report received on this path. + /// + /// If the address was updated, it's returned to be informed to the application. + #[must_use = "updated observed address must be reported to the application"] + pub(super) fn update_observed_addr_report( + &mut self, + observed: ObservedAddr, + ) -> Option { + match self.last_observed_addr_report.as_mut() { + Some(prev) => { + if prev.seq_no >= observed.seq_no { + // frames that do not increase the sequence number on this path are ignored + None + } else if prev.ip == observed.ip && prev.port == observed.port { + // keep track of the last seq_no but do not report the address as updated + prev.seq_no = observed.seq_no; + None + } else { + let addr = observed.socket_addr(); + self.last_observed_addr_report = Some(observed); + Some(addr) + } + } + None => { + let addr = observed.socket_addr(); + self.last_observed_addr_report = Some(observed); + Some(addr) + } + } + } } /// RTT estimation for a particular network path diff --git a/quinn-proto/src/connection/spaces.rs b/quinn-proto/src/connection/spaces.rs index ed58b51c1..0d0edad68 100644 --- a/quinn-proto/src/connection/spaces.rs +++ b/quinn-proto/src/connection/spaces.rs @@ -309,6 +309,7 @@ pub struct Retransmits { pub(super) retire_cids: Vec, pub(super) ack_frequency: bool, pub(super) handshake_done: bool, + pub(super) observed_addr: bool, } impl Retransmits { @@ -326,6 +327,7 @@ impl Retransmits { && self.retire_cids.is_empty() && !self.ack_frequency && !self.handshake_done + && !self.observed_addr } } @@ -347,6 +349,7 @@ impl ::std::ops::BitOrAssign for Retransmits { self.retire_cids.extend(rhs.retire_cids); self.ack_frequency |= rhs.ack_frequency; self.handshake_done |= rhs.handshake_done; + self.observed_addr |= rhs.observed_addr; } } diff --git a/quinn/Cargo.toml b/quinn/Cargo.toml index a061520d7..d60989abc 100644 --- a/quinn/Cargo.toml +++ b/quinn/Cargo.toml @@ -68,6 +68,7 @@ tokio = { workspace = true, features = ["rt", "rt-multi-thread", "time", "macros tracing-subscriber = { workspace = true } tracing-futures = { workspace = true } url = { workspace = true } +tokio-stream = "0.1.15" [[example]] name = "server" diff --git a/quinn/src/connection.rs b/quinn/src/connection.rs index cf09c6e67..0969f514f 100644 --- a/quinn/src/connection.rs +++ b/quinn/src/connection.rs @@ -14,7 +14,7 @@ use bytes::Bytes; use pin_project_lite::pin_project; use rustc_hash::FxHashMap; use thiserror::Error; -use tokio::sync::{futures::Notified, mpsc, oneshot, Notify}; +use tokio::sync::{futures::Notified, mpsc, oneshot, watch, Notify}; use tracing::{debug_span, Instrument, Span}; use crate::{ @@ -636,6 +636,12 @@ impl Connection { // May need to send MAX_STREAMS to make progress conn.wake(); } + + /// Track changed on our external address as reported by the peer. + pub fn observed_external_addr(&self) -> watch::Receiver> { + let conn = self.0.state.lock("external_addr"); + conn.observed_external_addr.subscribe() + } } pin_project! { @@ -892,6 +898,7 @@ impl ConnectionRef { runtime, send_buffer: Vec::new(), buffered_transmit: None, + observed_external_addr: watch::Sender::new(None), }), shared: Shared::default(), })) @@ -974,6 +981,8 @@ pub(crate) struct State { send_buffer: Vec, /// We buffer a transmit when the underlying I/O would block buffered_transmit: Option, + /// Our last external address reported by the peer. + pub(crate) observed_external_addr: watch::Sender>, } impl State { @@ -1131,6 +1140,12 @@ impl State { wake_stream(id, &mut self.stopped); wake_stream(id, &mut self.blocked_writers); } + ObservedAddr(observed) => { + self.observed_external_addr.send_if_modified(|addr| { + let old = addr.replace(observed); + old != *addr + }); + } } } }