From 8830d95461146cea038113f03a3c11bd01a268af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adolfo=20Ochagav=C3=ADa?= Date: Wed, 19 Apr 2023 11:59:38 +0200 Subject: [PATCH] WIP: implement acknowledgement frequency Sponsored by Stormshield --- quinn-proto/src/connection/mod.rs | 164 ++++++++++-- quinn-proto/src/connection/spaces.rs | 329 ++++++++++++++++++++++-- quinn-proto/src/connection/stats.rs | 4 + quinn-proto/src/connection/timer.rs | 7 +- quinn-proto/src/frame.rs | 78 +++++- quinn-proto/src/shared.rs | 5 + quinn-proto/src/tests/mod.rs | 290 ++++++++++++++++++--- quinn-proto/src/tests/util.rs | 21 +- quinn-proto/src/transport_parameters.rs | 56 +++- 9 files changed, 867 insertions(+), 87 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 40e00f278d..e91e0221fe 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -190,6 +190,13 @@ pub struct Connection { path_response: Option, close: bool, + // + // ACK frequency + // + last_ack_frequency_frame: Option, + max_ack_delay: Duration, + peer_max_ack_delay: Duration, + // // Loss Detection // @@ -301,6 +308,10 @@ impl Connection { path_response: None, close: false, + last_ack_frequency_frame: None, + max_ack_delay: get_max_ack_delay(&TransportParameters::default()), + peer_max_ack_delay: get_max_ack_delay(&TransportParameters::default()), + pto_count: 0, app_limited: false, @@ -471,8 +482,10 @@ impl Connection { } // If we need to send a probe, make sure we have something to send. - for space in SpaceId::iter() { - self.spaces[space].maybe_queue_probe(&self.streams); + for (space, allow_immediate_ack) in + SpaceId::iter().zip([false, false, self.peer_supports_immediate_ack()]) + { + self.spaces[space].maybe_queue_probe(allow_immediate_ack, &self.streams); } // Check whether we need to send a close message @@ -528,7 +541,8 @@ impl Connection { } let mut ack_eliciting = !self.spaces[space_id].pending.is_empty(&self.streams) - || self.spaces[space_id].ping_pending; + || self.spaces[space_id].ping_pending + || self.spaces[space_id].immediate_ack_pending; if space_id == SpaceId::Data { ack_eliciting |= self.can_send_1rtt(); } @@ -687,6 +701,7 @@ impl Connection { // have gotten any other ACK for the data earlier on. if !self.spaces[space_id].pending_acks.ranges().is_empty() { Self::populate_acks( + now, self.receiving_ecn, &mut SentFrames::default(), &mut self.spaces[space_id], @@ -733,7 +748,8 @@ impl Connection { break; } - let sent = self.populate_packet(space_id, &mut buf, buf_capacity - builder.tag_len); + let sent = + self.populate_packet(now, space_id, &mut buf, buf_capacity - builder.tag_len); // ACK-only packets should only be sent when explicitly allowed. If we write them due // to any other reason, there is a bug which leads to one component announcing write @@ -751,6 +767,7 @@ impl Connection { if sent.largest_acked.is_some() { self.spaces[space_id].pending_acks.acks_sent(); + self.timers.stop(Timer::MaxAckDelay); } // Keep information about the packet around until it gets finalized @@ -802,6 +819,14 @@ impl Connection { // We implement MTU probes as ping packets padded up to the probe size buf.write(frame::Type::PING); + self.stats.frame_tx.ping += 1; + + // If supported by the peer, we want no delays to the probe's ACK + if self.peer_supports_immediate_ack() { + buf.write(frame::Type::IMMEDIATE_ACK); + self.stats.frame_tx.immediate_ack += 1; + } + builder.pad_to(probe_size); let sent_frames = SentFrames { non_retransmits: true, @@ -809,7 +834,6 @@ impl Connection { }; builder.finish_and_track(now, self, Some(sent_frames), &mut buf); - self.stats.frame_tx.ping += 1; self.stats.path.sent_plpmtud_probes += 1; num_datagrams = 1; @@ -997,6 +1021,13 @@ impl Connection { .push_back(EndpointEventInner::NeedIdentifiers(now, num_new_cid)); } } + Timer::MaxAckDelay => { + trace!("max ack delay reached"); + // This timer is only armed in the Data space + self.spaces[SpaceId::Data] + .pending_acks + .on_max_ack_delay_timeout() + } } } } @@ -1237,7 +1268,7 @@ impl Connection { Duration::from_micros(0) } else { cmp::min( - self.max_ack_delay(), + self.peer_max_ack_delay, Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0), ) }; @@ -1535,7 +1566,7 @@ impl Connection { return result; } // Include max_ack_delay and backoff for ApplicationData. - duration += self.max_ack_delay() * backoff; + duration += self.peer_max_ack_delay * backoff; } let last_ack_eliciting = match self.spaces[space].time_of_last_ack_eliciting_packet { Some(time) => time, @@ -1597,7 +1628,7 @@ impl Connection { fn pto(&self, space: SpaceId) -> Duration { let max_ack_delay = match space { SpaceId::Initial | SpaceId::Handshake => Duration::new(0, 0), - SpaceId::Data => self.max_ack_delay(), + SpaceId::Data => self.peer_max_ack_delay, }; self.path.rtt.pto_base() + max_ack_delay } @@ -1617,7 +1648,12 @@ impl Connection { self.permit_idle_reset = true; self.receiving_ecn |= ecn.is_some(); if let Some(x) = ecn { - self.spaces[space_id].ecn_counters += x; + let space = &mut self.spaces[space_id]; + space.ecn_counters += x; + + if x.is_ce() { + space.pending_acks.set_immediate_ack_required(); + } } let packet = match packet { @@ -1731,6 +1767,7 @@ impl Connection { preferred_address: None, retry_src_cid: None, stateless_reset_token: None, + min_ack_delay: None, ack_delay_exponent: TransportParameters::default().ack_delay_exponent, max_ack_delay: TransportParameters::default().max_ack_delay, ..params @@ -2402,9 +2439,13 @@ impl Connection { } } } - self.spaces[packet.header.space()] - .pending_acks - .packet_received(ack_eliciting); + + if ack_eliciting { + // In the initial and handshake spaces, ACKs must be sent immediately + self.spaces[packet.header.space()] + .pending_acks + .set_immediate_ack_required(); + } self.write_crypto(); Ok(()) @@ -2504,7 +2545,11 @@ impl Connection { if remote == self.path.remote { // PATH_CHALLENGE on active path, possible off-path packet forwarding // attack. Send a non-probing packet to recover the active path. - self.ping(); + if self.peer_supports_immediate_ack() { + self.immediate_ack(); + } else { + self.ping(); + } } } Frame::PathResponse(token) => { @@ -2654,6 +2699,48 @@ impl Connection { self.events.push_back(Event::DatagramReceived); } } + Frame::AckFrequency(ack_frequency) => { + if self + .last_ack_frequency_frame + .map_or(true, |sequence| ack_frequency.sequence > sequence) + { + // This frame can only be sent in the Data space + let space = &mut self.spaces[SpaceId::Data]; + + // Update max_ack_delay + let max_ack_delay = + Duration::from_micros(ack_frequency.request_max_ack_delay); + if max_ack_delay < TIMER_GRANULARITY { + return Err(TransportError::PROTOCOL_VIOLATION( + "Request Max Ack Delay in ACK_FREQUENCY frame is less than min_ack_delay", + )); + } + self.max_ack_delay = max_ack_delay; + if let Some(timeout) = + space.pending_acks.max_ack_delay_timeout(max_ack_delay) + { + // Update the timeout if necessary + self.timers.set(Timer::MaxAckDelay, timeout); + } + + // Update ack frequency params + space + .pending_acks + .set_reordering_threshold(ack_frequency.reordering_threshold); + space + .pending_acks + .set_ack_eliciting_threshold(ack_frequency.ack_eliciting_threshold); + + // Keep track of the frame number so we can ignore older out-of-order frames + self.last_ack_frequency_frame = Some(ack_frequency.sequence); + } + } + Frame::ImmediateAck => { + // This frame can only be sent in the Data space + self.spaces[SpaceId::Data] + .pending_acks + .set_immediate_ack_required(); + } Frame::HandshakeDone => { if self.side.is_server() { return Err(TransportError::PROTOCOL_VIOLATION( @@ -2667,9 +2754,14 @@ impl Connection { } } - self.spaces[SpaceId::Data] + let space = &mut self.spaces[SpaceId::Data]; + if space .pending_acks - .packet_received(ack_eliciting); + .packet_received(now, number, ack_eliciting, &space.dedup) + { + self.timers + .set(Timer::MaxAckDelay, now + self.max_ack_delay); + } // Issue stream ID credit due to ACKs of outgoing finish/resets and incoming finish/resets // on stopped streams. Incoming finishes/resets on open streams are not handled here as they @@ -2786,6 +2878,7 @@ impl Connection { fn populate_packet( &mut self, + now: Instant, space_id: SpaceId, buf: &mut Vec, max_size: usize, @@ -2811,10 +2904,25 @@ impl Connection { self.stats.frame_tx.ping += 1; } + // IMMEDIATE_ACK + if mem::replace(&mut space.immediate_ack_pending, false) { + trace!("IMMEDIATE_ACK"); + buf.write(frame::Type::IMMEDIATE_ACK); + sent.non_retransmits = true; + self.stats.frame_tx.immediate_ack += 1; + } + // ACK if space.pending_acks.can_send() { debug_assert!(!space.pending_acks.ranges().is_empty()); - Self::populate_acks(self.receiving_ecn, &mut sent, space, buf, &mut self.stats); + Self::populate_acks( + now, + self.receiving_ecn, + &mut sent, + space, + buf, + &mut self.stats, + ); } // PATH_CHALLENGE @@ -2956,6 +3064,7 @@ impl Connection { /// This method assumes ACKs are pending, and should only be called if /// `!PendingAcks::ranges().is_empty()` returns `true`. fn populate_acks( + now: Instant, receiving_ecn: bool, sent: &mut SentFrames, space: &mut PacketSpace, @@ -2973,7 +3082,7 @@ impl Connection { }; sent.largest_acked = space.pending_acks.ranges().max(); - let delay_micros = space.pending_acks.ack_delay().as_micros() as u64; + let delay_micros = space.pending_acks.ack_delay(now).as_micros() as u64; // TODO: This should come frome `TransportConfig` if that gets configurable let ack_delay_exp = TransportParameters::default().ack_delay_exponent; @@ -3029,6 +3138,7 @@ impl Connection { retire_prior_to: 0, }).expect("preferred address CID is the first received, and hence is guaranteed to be legal"); } + self.peer_max_ack_delay = get_max_ack_delay(¶ms); self.peer_params = params; self.path.mtud.on_peer_max_udp_payload_size_received( u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX), @@ -3139,6 +3249,10 @@ impl Connection { self.key_phase = !self.key_phase; } + fn peer_supports_immediate_ack(&self) -> bool { + self.peer_params.min_ack_delay.is_some() + } + /// The number of bytes of packets containing retransmittable frames that have not been /// acknowledged or declared lost. #[cfg(test)] @@ -3198,6 +3312,14 @@ impl Connection { .push_back(EndpointEventInner::NeedIdentifiers(now, n)); } + /// Send an IMMEDIATE_ACK frame to the remote endpoint + /// + /// According to the spec, this will result in an error if the remote endpoint does not support + /// the Acknowledgement Frequency extension + pub fn immediate_ack(&mut self) { + self.spaces[self.highest_space].immediate_ack_pending = true; + } + /// Check the current active remote CID sequence #[cfg(test)] pub(crate) fn active_rem_cid_seq(&self) -> u64 { @@ -3210,10 +3332,6 @@ impl Connection { self.path.current_mtu() } - fn max_ack_delay(&self) -> Duration { - Duration::from_micros(self.peer_params.max_ack_delay.0 * 1000) - } - /// Whether we have 1-RTT data to send /// /// See also `self.space(SpaceId::Data).can_send()` @@ -3444,6 +3562,10 @@ fn instant_saturating_sub(x: Instant, y: Instant) -> Duration { } } +fn get_max_ack_delay(params: &TransportParameters) -> Duration { + Duration::from_micros(params.max_ack_delay.0 * 1000) +} + // Prevents overflow and improves behavior in extreme circumstances const MAX_BACKOFF_EXPONENT: u32 = 16; // Minimal remaining size to allow packet coalescing diff --git a/quinn-proto/src/connection/spaces.rs b/quinn-proto/src/connection/spaces.rs index 4a3a986ff4..cec261079d 100644 --- a/quinn-proto/src/connection/spaces.rs +++ b/quinn-proto/src/connection/spaces.rs @@ -57,6 +57,7 @@ pub(super) struct PacketSpace { /// Number of tail loss probes to send pub(super) loss_probes: u32, pub(super) ping_pending: bool, + pub(super) immediate_ack_pending: bool, /// Number of congestion control "in flight" bytes pub(super) in_flight: u64, /// Number of packets sent in the current key phase @@ -71,7 +72,7 @@ impl PacketSpace { rx_packet: 0, pending: Retransmits::default(), - pending_acks: PendingAcks::default(), + pending_acks: PendingAcks::new(), next_packet_number: 0, largest_acked_packet: None, @@ -87,6 +88,7 @@ impl PacketSpace { loss_time: None, loss_probes: 0, ping_pending: false, + immediate_ack_pending: false, in_flight: 0, sent_with_keys: 0, } @@ -104,11 +106,17 @@ impl PacketSpace { /// waiting to be sent, then we retransmit in-flight data to reduce odds of loss. If there's no /// in-flight data either, we're probably a client guarding against a handshake /// anti-amplification deadlock and we just make something up. - pub(super) fn maybe_queue_probe(&mut self, streams: &StreamsState) { + pub(super) fn maybe_queue_probe(&mut self, allow_immediate_ack: bool, streams: &StreamsState) { if self.loss_probes == 0 { return; } + if allow_immediate_ack { + // The probe should be ACKed without delay, but that is only allowed in the Data space + // and when the peer supports the acknowledgement frequency extension + self.immediate_ack_pending = true; + } + // Retransmit the data of the oldest in-flight packet if !self.pending.is_empty(streams) { // There's real data to send here, no need to make something up @@ -141,7 +149,8 @@ impl PacketSpace { pub(super) fn can_send(&self, streams: &StreamsState) -> SendableFrames { let acks = self.pending_acks.can_send(); - let other = !self.pending.is_empty(streams) || self.ping_pending; + let other = + !self.pending.is_empty(streams) || self.ping_pending || self.immediate_ack_pending; SendableFrames { acks, other } } @@ -390,6 +399,45 @@ impl Dedup { true } } + + /// Checks whether there is a gap of unreceived packets between `left_packet` and `right_packet` + /// + /// The provided packet numbers must have been received before calling this function. + fn count_missing_packets(&self, left_packet: u64, right_packet: u64) -> u32 { + debug_assert!(left_packet <= right_packet); + debug_assert!(right_packet <= self.highest()); + const BITFIELD_MAX_OFFSET: u64 = 127; + + // Since we already know the packets have been received, we only need to check those in + // between them (this removes the necessity of extra logic to deal with the highest packet) + let left_packet = left_packet + 1; + let right_packet = right_packet.saturating_sub(1); + + // Note: the offsets are counted from the right + // The highest packet is not included in the bitfield, so we subtract 1 to account for that + let start_offset = (self.highest() - right_packet).max(1) - 1; + if start_offset > BITFIELD_MAX_OFFSET { + // The start offset is outside of the window. All packets outside of the window are + // considered to be received, so no gaps there. + return 0; + } + + let end_offset_exclusive = self.highest().saturating_sub(left_packet); + + // The range is clamped at the edge of the window, because any earlier packets are + // considered to be received (so we know there are no gaps there) + let range_len = end_offset_exclusive + .saturating_sub(start_offset) + .min(BITFIELD_MAX_OFFSET); + if range_len == 0 { + // An empty range has no gaps + return 0; + } + + let mask = ((1u128 << range_len) - 1) << start_offset; + let gaps = !self.window & mask; + gaps.count_ones() + } } /// Indicates which data is available for sending @@ -414,51 +462,183 @@ impl SendableFrames { } } -#[derive(Debug, Default)] +#[derive(Debug)] pub(super) struct PendingAcks { - permit_ack_only: bool, - ranges: ArrayRangeSet, - /// This value will be used for calculating ACK delay once it is implemented + /// Whether we should send an ACK immediately, even if that means sending an ACK-only packet + /// + /// When `immediate_ack_required` is false, the normal behavior is to send ACK frames only when + /// there is other data to send. + immediate_ack_required: bool, + /// The number of ack-eliciting packets received since the last ACK frame was sent + /// + /// Once the count _exceeds_ `ack_eliciting_threshold`, we an immediate ACK is required + ack_eliciting_since_last_ack_sent: u64, + ack_eliciting_threshold: u64, + /// The reordering threshold, controlling how we respond to out-of-order ack-eliciting packets /// - /// ACK delay will be the delay between when a packet arrived (`latest_incoming`) - /// and between it will be allowed to be acknowledged (`can_send() == true`). - latest_incoming: Option, - ack_delay: Duration, + /// Different values enable different behavior: + /// + /// * `0`: no special action is taken + /// * `1`: an ACK is immediately sent if it is out-of-order according to RFC 9000 + /// * `>1`: an ACK is immediately sent if the gap between the largest acked and the largest + /// unacked packet is greater than or equal to `reordering_threshold` + reordering_threshold: u64, + /// The earliest ack-eliciting packet since the last ACK was sent, used to calculate the moment + /// upon which `max_ack_delay` elapses + earliest_ack_eliciting_since_last_ack_sent: Option, + /// The packet number ranges of ack-eliciting packets yet to be ACKed + ranges: ArrayRangeSet, + /// The packet with the largest packet number, and the time upon which it was received (used to + /// calculate ACK delay in [`PendingAcks::ack_delay`]) + largest_packet: Option<(u64, Instant)>, + /// The ack-eliciting packet we have received with the largest packet number + largest_ack_eliciting_packet: Option, + /// The largest acknowledged value sent in an ACK frame + largest_acked: Option, } impl PendingAcks { + fn new() -> Self { + Self { + immediate_ack_required: false, + ack_eliciting_since_last_ack_sent: 0, + ack_eliciting_threshold: 1, + reordering_threshold: 1, + earliest_ack_eliciting_since_last_ack_sent: None, + ranges: ArrayRangeSet::default(), + largest_packet: None, + largest_ack_eliciting_packet: None, + largest_acked: None, + } + } + + pub(super) fn set_ack_eliciting_threshold(&mut self, threshold: u64) { + self.ack_eliciting_threshold = threshold; + } + + pub(super) fn set_reordering_threshold(&mut self, threshold: u64) { + self.reordering_threshold = threshold; + } + + pub(super) fn set_immediate_ack_required(&mut self) { + self.immediate_ack_required = true; + } + + pub(super) fn on_max_ack_delay_timeout(&mut self) { + self.immediate_ack_required = self.ack_eliciting_since_last_ack_sent > 0; + } + + pub(super) fn max_ack_delay_timeout(&self, max_ack_delay: Duration) -> Option { + self.earliest_ack_eliciting_since_last_ack_sent + .map(|earliest_unacked| earliest_unacked + max_ack_delay) + } + /// Whether any ACK frames can be sent pub(super) fn can_send(&self) -> bool { - self.permit_ack_only && !self.ranges.is_empty() + self.immediate_ack_required && !self.ranges.is_empty() } - /// Returns the duration the acknowledgement of the latest incoming packet has been delayed - pub(super) fn ack_delay(&self) -> Duration { - self.ack_delay + /// Returns the delay since the packet with the largest packet number was received + pub(super) fn ack_delay(&self, now: Instant) -> Duration { + self.largest_packet + .map_or(Duration::default(), |(_, received)| now - received) } /// Handle receipt of a new packet - pub(super) fn packet_received(&mut self, ack_eliciting: bool) { - self.permit_ack_only |= ack_eliciting; + /// + /// Returns true if the max ack delay timer should be armed + pub(super) fn packet_received( + &mut self, + now: Instant, + packet_number: u64, + ack_eliciting: bool, + dedup: &Dedup, + ) -> bool { + if !ack_eliciting { + return false; + } + + let prev_largest_ack_eliciting = self.largest_ack_eliciting_packet.unwrap_or(0); + + // Track largest ack-eliciting packet + self.largest_ack_eliciting_packet = self + .largest_ack_eliciting_packet + .map(|pn| pn.max(packet_number)) + .or(Some(packet_number)); + + // Handle ack_eliciting_threshold + self.ack_eliciting_since_last_ack_sent += 1; + self.immediate_ack_required |= + self.ack_eliciting_since_last_ack_sent > self.ack_eliciting_threshold; + + // Handle out-of-order packets + self.immediate_ack_required |= + self.is_out_of_order(packet_number, prev_largest_ack_eliciting, dedup); + + // Arm max_ack_delay timer if necessary + if self.earliest_ack_eliciting_since_last_ack_sent.is_none() && !self.can_send() + { + self.earliest_ack_eliciting_since_last_ack_sent = Some(now); + return true; + } + + false + } + + fn is_out_of_order( + &self, + packet_number: u64, + prev_largest_ack_eliciting: u64, + dedup: &Dedup, + ) -> bool { + match self.reordering_threshold { + 0 => false, + 1 => { + // From https://www.rfc-editor.org/rfc/rfc9000#section-13.2.1-7 + packet_number < prev_largest_ack_eliciting + || dedup.count_missing_packets(prev_largest_ack_eliciting, packet_number) >= 1 + } + _ => { + // From acknowledgement frequency draft, section 6.1 + if let Some((largest_acked, largest_unacked)) = + self.largest_acked.zip(self.largest_ack_eliciting_packet) + { + let unreported_missing = + dedup.count_missing_packets(largest_acked, largest_unacked); + unreported_missing as u64 >= self.reordering_threshold + } else { + false + } + } + } } /// Should be called whenever ACKs have been sent /// /// This will suppress sending further ACKs until additional ACK eliciting frames arrive pub(super) fn acks_sent(&mut self) { - // If we sent any acks, don't immediately resend them. Setting this even if ack_only is - // false needlessly prevents us from ACKing the next packet if it's ACK-only, but saves - // the need for subtler logic to avoid double-transmitting acks all the time. - // This reset needs to happen before we check whether more data - // is available in this space - because otherwise it would return - // `true` purely due to the ACKs - self.permit_ack_only = false; + // It is possible (though unlikely) that the ACKs we just sent do not cover all the + // ACK-eliciting packets we have received (e.g. if there is not enough room in the packet to + // fit all the ranges). To keep things simple, however, we assume they do. If there are + // indeed some ACKs that weren't covered, the packets might be ACKed later anyway, because + // they are still contained in `self.ranges`. If we somehow fail to send the ACKs at a later + // moment, the peer will assume the packets got lost and will retransmit their frames in a + // new packet, which is suboptimal, because we already received them. Our assumption here is + // that simplicity results in code that is more performant, even in the presence of + // occasional redundant retransmits. + self.immediate_ack_required = false; + self.ack_eliciting_since_last_ack_sent = 0; + self.earliest_ack_eliciting_since_last_ack_sent = None; + self.largest_acked = self.largest_ack_eliciting_packet; } /// Insert one packet that needs to be acknowledged pub(super) fn insert_one(&mut self, packet: u64, now: Instant) { self.ranges.insert_one(packet); - self.latest_incoming = Some(now); + + if self.largest_packet.map_or(true, |(pn, _)| packet > pn) { + self.largest_packet = Some((packet, now)); + } if self.ranges.len() > MAX_ACK_BLOCKS { self.ranges.pop_min(); @@ -539,6 +719,105 @@ mod test { assert_eq!(dedup.window, 1 << (WINDOW_SIZE - 2)); } + #[test] + fn dedup_has_gap() { + let mut dedup = Dedup::new(); + + dedup.insert(0); + dedup.insert(1); + + assert_eq!(dedup.count_missing_packets(0, 1), 0); + dedup.insert(3); + + assert_eq!(dedup.count_missing_packets(1, 3), 1); + + dedup.insert(4); + assert_eq!(dedup.count_missing_packets(3, 4), 0); + assert_eq!(dedup.count_missing_packets(0, 4), 1); + + dedup.insert(2); + assert_eq!(dedup.count_missing_packets(0, 4), 0); + } + + #[test] + fn dedup_outside_of_window_has_gap() { + let mut dedup = Dedup::new(); + + for i in 0..140 { + dedup.insert(i); + } + + // 0 and 4 are outside of the window + assert_eq!(dedup.count_missing_packets(0, 4), 0); + dedup.insert(160); + assert_eq!(dedup.count_missing_packets(0, 4), 0); + assert_eq!(dedup.count_missing_packets(0, 140), 0); + assert_eq!(dedup.count_missing_packets(0, 160), 20); + } + + #[test] + fn pending_acks_first_packet_is_not_considered_reordered() { + let mut acks = PendingAcks::new(); + let mut dedup = Dedup::new(); + dedup.insert(0); + acks.packet_received(Instant::now(), 0, true, &dedup); + assert!(!acks.immediate_ack_required); + } + + #[test] + fn pending_acks_after_immediate_ack_set() { + let mut acks = PendingAcks::new(); + let mut dedup = Dedup::new(); + + // Receive ack-eliciting packet + dedup.insert(0); + let now = Instant::now(); + acks.insert_one(0, now); + acks.packet_received(now, 0, true, &dedup); + + // Sanity check + assert!(!acks.ranges.is_empty()); + assert!(!acks.can_send()); + + // Can send ACK after max_ack_delay exceeded + acks.set_immediate_ack_required(); + assert!(acks.can_send()); + } + + #[test] + fn pending_acks_ack_delay() { + let mut acks = PendingAcks::new(); + let mut dedup = Dedup::new(); + + let t1 = Instant::now(); + let t2 = t1 + Duration::from_millis(2); + let t3 = t2 + Duration::from_millis(5); + assert_eq!(acks.ack_delay(t1), Duration::from_millis(0)); + assert_eq!(acks.ack_delay(t2), Duration::from_millis(0)); + assert_eq!(acks.ack_delay(t3), Duration::from_millis(0)); + + // In-order packet + dedup.insert(0); + acks.insert_one(0, t1); + acks.packet_received(t1, 0, true, &dedup); + assert_eq!(acks.ack_delay(t1), Duration::from_millis(0)); + assert_eq!(acks.ack_delay(t2), Duration::from_millis(2)); + assert_eq!(acks.ack_delay(t3), Duration::from_millis(7)); + + // Out of order (higher than expected) + dedup.insert(3); + acks.insert_one(3, t2); + acks.packet_received(t2, 3, true, &dedup); + assert_eq!(acks.ack_delay(t2), Duration::from_millis(0)); + assert_eq!(acks.ack_delay(t3), Duration::from_millis(5)); + + // Out of order (lower than expected, so previous instant is kept) + dedup.insert(2); + acks.insert_one(2, t3); + acks.packet_received(t3, 2, true, &dedup); + assert_eq!(acks.ack_delay(t3), Duration::from_millis(5)); + } + #[test] fn sent_packet_size() { // The tracking state of sent packets should be minimal, and not grow diff --git a/quinn-proto/src/connection/stats.rs b/quinn-proto/src/connection/stats.rs index 2224383798..47454223b6 100644 --- a/quinn-proto/src/connection/stats.rs +++ b/quinn-proto/src/connection/stats.rs @@ -23,11 +23,13 @@ pub struct UdpStats { #[non_exhaustive] pub struct FrameStats { pub acks: u64, + pub ack_frequency: u64, pub crypto: u64, pub connection_close: u64, pub data_blocked: u64, pub datagram: u64, pub handshake_done: u8, + pub immediate_ack: u64, pub max_data: u64, pub max_stream_data: u64, pub max_streams_bidi: u64, @@ -81,6 +83,8 @@ impl FrameStats { Frame::PathChallenge(_) => self.path_challenge += 1, Frame::PathResponse(_) => self.path_response += 1, Frame::Close(_) => self.connection_close += 1, + Frame::AckFrequency(_) => self.ack_frequency += 1, + Frame::ImmediateAck => self.immediate_ack += 1, Frame::HandshakeDone => self.handshake_done += 1, Frame::Invalid { .. } => {} } diff --git a/quinn-proto/src/connection/timer.rs b/quinn-proto/src/connection/timer.rs index d4f9e34879..02b1483dc7 100644 --- a/quinn-proto/src/connection/timer.rs +++ b/quinn-proto/src/connection/timer.rs @@ -18,10 +18,12 @@ pub(crate) enum Timer { Pacing = 6, /// When to invalidate old CID and proactively push new one via NEW_CONNECTION_ID frame PushNewCid = 7, + /// When to send an immediate ACK if there are unacked ack-elicited packets waiting + MaxAckDelay = 8, } impl Timer { - pub(crate) const VALUES: [Self; 8] = [ + pub(crate) const VALUES: [Self; 9] = [ Self::LossDetection, Self::Idle, Self::Close, @@ -30,13 +32,14 @@ impl Timer { Self::KeepAlive, Self::Pacing, Self::PushNewCid, + Self::MaxAckDelay, ]; } /// A table of data associated with each distinct kind of `Timer` #[derive(Debug, Copy, Clone, Default)] pub(crate) struct TimerTable { - data: [Option; 8], + data: [Option; 9], } impl TimerTable { diff --git a/quinn-proto/src/frame.rs b/quinn-proto/src/frame.rs index 5ac9a79937..2d0d996577 100644 --- a/quinn-proto/src/frame.rs +++ b/quinn-proto/src/frame.rs @@ -129,6 +129,9 @@ frame_types! { CONNECTION_CLOSE = 0x1c, APPLICATION_CLOSE = 0x1d, HANDSHAKE_DONE = 0x1e, + // ACK Frequency + ACK_FREQUENCY = 0xaf, + IMMEDIATE_ACK = 0xac, // DATAGRAM } @@ -157,6 +160,8 @@ pub(crate) enum Frame { PathResponse(u64), Close(Close), Datagram(Datagram), + AckFrequency(AckFrequency), + ImmediateAck, Invalid { ty: Type, reason: &'static str }, HandshakeDone, } @@ -197,6 +202,8 @@ impl Frame { Crypto(_) => Type::CRYPTO, NewToken { .. } => Type::NEW_TOKEN, Datagram(_) => Type(*DATAGRAM_TYS.start()), + AckFrequency(_) => Type::ACK_FREQUENCY, + ImmediateAck => Type::IMMEDIATE_ACK, Invalid { ty, .. } => ty, HandshakeDone => Type::HANDSHAKE_DONE, } @@ -684,6 +691,19 @@ impl Iter { token: self.take_len()?, }, Type::HANDSHAKE_DONE => Frame::HandshakeDone, + Type::ACK_FREQUENCY => { + let sequence = self.bytes.get_var()?; + let ack_eliciting_threshold = self.bytes.get_var()?; + let request_max_ack_delay = self.bytes.get_var()?; + let reordering_threshold = self.bytes.get_var()?; + Frame::AckFrequency(AckFrequency { + sequence, + ack_eliciting_threshold, + request_max_ack_delay, + reordering_threshold, + }) + } + Type::IMMEDIATE_ACK => Frame::ImmediateAck, _ => { if let Some(s) = ty.stream() { Frame::Stream(Stream { @@ -871,9 +891,39 @@ impl Datagram { } } +#[derive(Debug, Copy, Clone)] +pub(crate) struct AckFrequency { + pub(crate) sequence: u64, + pub(crate) ack_eliciting_threshold: u64, + pub(crate) request_max_ack_delay: u64, + pub(crate) reordering_threshold: u64, +} + +impl AckFrequency { + pub(crate) fn encode( + sequence: u64, + ack_eliciting_threshold: u64, + request_max_ack_delay: u64, + reordering_threshold: u64, + buf: &mut W, + ) { + buf.write(Type::ACK_FREQUENCY); + buf.write_var(sequence); + buf.write_var(ack_eliciting_threshold); + buf.write_var(request_max_ack_delay); + buf.write_var(reordering_threshold); + } +} + #[cfg(test)] mod test { use super::*; + use crate::coding::Codec; + use assert_matches::assert_matches; + + fn frames(buf: Vec) -> Vec { + Iter::new(Bytes::from(buf)).collect::>() + } #[test] #[allow(clippy::range_plus_one)] @@ -890,7 +940,7 @@ mod test { ce: 12, }; Ack::encode(42, &ranges, Some(&ECN), &mut buf); - let frames = Iter::new(Bytes::from(buf)).collect::>(); + let frames = frames(buf); assert_eq!(frames.len(), 1); match frames[0] { Frame::Ack(ref ack) => { @@ -902,4 +952,30 @@ mod test { ref x => panic!("incorrect frame {x:?}"), } } + + #[test] + fn ack_frequency_coding() { + let mut buf = Vec::new(); + AckFrequency::encode(42, 20, 50_000, 1, &mut buf); + let frames = frames(buf); + assert_eq!(frames.len(), 1); + match &frames[0] { + Frame::AckFrequency(ack_frequency) => { + assert_eq!(ack_frequency.sequence, 42); + assert_eq!(ack_frequency.ack_eliciting_threshold, 20); + assert_eq!(ack_frequency.request_max_ack_delay, 50_000); + assert_eq!(ack_frequency.reordering_threshold, 1); + } + x => panic!("incorrect frame {x:?}"), + } + } + + #[test] + fn immediate_ack_coding() { + let mut buf = Vec::new(); + Type::IMMEDIATE_ACK.encode(&mut buf); + let frames = frames(buf); + assert_eq!(frames.len(), 1); + assert_matches!(&frames[0], Frame::ImmediateAck); + } } diff --git a/quinn-proto/src/shared.rs b/quinn-proto/src/shared.rs index e670f1d6f5..a8bd274b28 100644 --- a/quinn-proto/src/shared.rs +++ b/quinn-proto/src/shared.rs @@ -161,6 +161,11 @@ impl EcnCodepoint { } }) } + + /// Returns whether the codepoint is a CE, signalling that congestion was experienced + pub fn is_ce(self) -> bool { + matches!(self, Self::Ce) + } } #[derive(Debug, Copy, Clone)] diff --git a/quinn-proto/src/tests/mod.rs b/quinn-proto/src/tests/mod.rs index 0cdc6ed451..5ffd6d227c 100644 --- a/quinn-proto/src/tests/mod.rs +++ b/quinn-proto/src/tests/mod.rs @@ -1,5 +1,6 @@ use std::{ convert::TryInto, + mem, net::{Ipv4Addr, Ipv6Addr, SocketAddr}, sync::Arc, time::{Duration, Instant}, @@ -1048,6 +1049,10 @@ fn migration() { let _guard = subscribe(); let mut pair = Pair::default(); let (client_ch, server_ch) = pair.connect(); + pair.drive(); + + let client_stats_after_connect = pair.client_conn_mut(client_ch).stats(); + pair.client.addr = SocketAddr::new( Ipv4Addr::new(127, 0, 0, 1).into(), CLIENT_PORTS.lock().unwrap().next().unwrap(), @@ -1066,6 +1071,19 @@ fn migration() { pair.server_conn_mut(server_ch).remote_address(), pair.client.addr ); + + // Assert that the client's response to the PATH_CHALLENGE was an IMMEDIATE_ACK, instead of a + // second ping + let client_stats_after_migrate = pair.client_conn_mut(client_ch).stats(); + assert_eq!( + client_stats_after_migrate.frame_tx.ping - client_stats_after_connect.frame_tx.ping, + 1 + ); + assert_eq!( + client_stats_after_migrate.frame_tx.immediate_ack + - client_stats_after_connect.frame_tx.immediate_ack, + 1 + ); } fn test_flow_control(config: TransportConfig, window_size: usize) { @@ -1912,6 +1930,32 @@ fn malformed_token_len() { ); } +#[test] +fn loss_probe_requests_immediate_ack() { + let _guard = subscribe(); + let mut pair = Pair::default(); + let (client_ch, _) = pair.connect(); + pair.drive(); + + let stats_after_connect = pair.client_conn_mut(client_ch).stats(); + + // Lose a ping + let default_mtu = mem::replace(&mut pair.max_udp_payload_size, 0); + pair.client_conn_mut(client_ch).ping(); + pair.drive_client(); + pair.max_udp_payload_size = default_mtu; + + // Drive the connection further so a loss probe is sent and the ping is recovered + pair.drive(); + + // Assert that two IMMEDIATE_ACKs were sent (two loss probes) + let stats_after_recovery = pair.client_conn_mut(client_ch).stats(); + assert_eq!( + stats_after_recovery.frame_tx.immediate_ack - stats_after_connect.frame_tx.immediate_ack, + 2 + ); +} + #[test] /// This is mostly a sanity check to ensure our testing code is correctly dropping packets above the /// pmtu @@ -1927,10 +1971,46 @@ fn connect_too_low_mtu() { pair.server.assert_no_accept() } +#[test] +/// This is mostly a sanity check to ensure our testing code is correctly dropping packets above the +/// pmtu, and recovery works properly +fn packet_loss_and_retry_too_low_mtu() { + let _guard = subscribe(); + let mut pair = Pair::default(); + let (client_ch, server_ch) = pair.connect(); + + let s = pair.client_streams(client_ch).open(Dir::Uni).unwrap(); + + pair.client_send(client_ch, s).write(b"hello").unwrap(); + pair.drive(); + + // Nothing will get past this mtu + pair.max_udp_payload_size = 10; + pair.client_send(client_ch, s).write(b" world").unwrap(); + pair.drive_client(); + + // The packet was dropped + assert!(pair.client.outbound.is_empty()); + assert!(pair.server.inbound.is_empty()); + + // Restore the default mtu, so future packets are properly transmitted + pair.max_udp_payload_size = DEFAULT_MAX_UDP_PAYLOAD_SIZE; + + // The lost packet is resent + pair.drive(); + assert!(pair.client.outbound.is_empty()); + + let recv = pair.server_recv(server_ch, s); + let buf = stream_chunks(recv); + + assert_eq!(buf, b"hello world".as_slice()); +} + #[test] fn connect_lost_mtu_probes_do_not_trigger_congestion_control() { let _guard = subscribe(); let mut pair = Pair::default(); + pair.max_udp_payload_size = 1200; let (client_ch, server_ch) = pair.connect(); pair.drive(); @@ -1955,7 +2035,6 @@ fn connect_detects_mtu() { let max_udp_payload_and_expected_mtu = &[(1200, 1200), (1400, 1389), (1500, 1452)]; for &(pair_max_udp, expected_mtu) in max_udp_payload_and_expected_mtu { - println!("Trying {pair_max_udp}"); let mut pair = Pair::default(); pair.max_udp_payload_size = pair_max_udp; let (client_ch, server_ch) = pair.connect(); @@ -2067,39 +2146,6 @@ fn connect_runs_mtud_again_after_600_seconds() { assert_eq!(pair.server_conn_mut(server_ch).path_mtu(), 1452); } -#[test] -fn packet_loss_and_retry_too_low_mtu() { - let _guard = subscribe(); - let mut pair = Pair::default(); - let (client_ch, server_ch) = pair.connect(); - - let s = pair.client_streams(client_ch).open(Dir::Uni).unwrap(); - - pair.client_send(client_ch, s).write(b"hello").unwrap(); - pair.drive(); - - // Nothing will get past this mtu - pair.max_udp_payload_size = 10; - pair.client_send(client_ch, s).write(b" world").unwrap(); - pair.drive_client(); - - // The packet was dropped - assert!(pair.client.outbound.is_empty()); - assert!(pair.server.inbound.is_empty()); - - // Restore the default mtu, so future packets are properly transmitted - pair.max_udp_payload_size = DEFAULT_MAX_UDP_PAYLOAD_SIZE; - - // The lost packet is resent - pair.drive(); - assert!(pair.client.outbound.is_empty()); - - let recv = pair.server_recv(server_ch, s); - let buf = stream_chunks(recv); - - assert_eq!(buf, b"hello world".as_slice()); -} - #[test] fn blackhole_after_mtu_change_repairs_itself() { let _guard = subscribe(); @@ -2138,6 +2184,21 @@ fn blackhole_after_mtu_change_repairs_itself() { assert!(client_stats.path.congestion_events >= 3); } +#[test] +fn mtud_probes_include_immediate_ack() { + let _guard = subscribe(); + let mut pair = Pair::default(); + let (client_ch, _) = pair.connect(); + pair.drive(); + + let stats = pair.client_conn_mut(client_ch).stats(); + assert_eq!(stats.path.sent_plpmtud_probes, 4); + + // Each probe contains a ping and an immediate ack + assert_eq!(stats.frame_tx.ping, 4); + assert_eq!(stats.frame_tx.immediate_ack, 4); +} + #[test] fn packet_splitting_with_default_mtu() { let _guard = subscribe(); @@ -2146,6 +2207,7 @@ fn packet_splitting_with_default_mtu() { let payload = vec![42; 1300]; let mut pair = Pair::default(); + pair.max_udp_payload_size = 1200; let (client_ch, _) = pair.connect(); pair.drive(); @@ -2180,6 +2242,166 @@ fn packet_splitting_not_necessary_after_higher_mtu_discovered() { assert_eq!(pair.server.inbound.len(), 1); } +#[test] +fn single_ack_eliciting_packet_triggers_ack_after_delay() { + let _guard = subscribe(); + let mut pair = Pair::default(); + let (client_ch, _) = pair.connect(); + pair.drive(); + + let stats_after_connect = pair.client_conn_mut(client_ch).stats(); + + let start = pair.time; + pair.client_conn_mut(client_ch).ping(); + pair.drive_client(); // Send ping + pair.drive_server(); // Process ping + pair.drive_client(); // Give the client a chance to process an ack, so our assertion can fail + + // Sanity check: the time hasn't advanced in the meantime) + assert_eq!(pair.time, start); + + let stats_after_ping = pair.client_conn_mut(client_ch).stats(); + assert_eq!( + stats_after_ping.frame_tx.ping - stats_after_connect.frame_tx.ping, + 1 + ); + assert_eq!( + stats_after_ping.frame_rx.acks - stats_after_connect.frame_rx.acks, + 0 + ); + + pair.drive(); + let stats_after_drive = pair.client_conn_mut(client_ch).stats(); + assert_eq!( + stats_after_drive.frame_rx.acks - stats_after_ping.frame_rx.acks, + 1 + ); + + // The time is start + max_ack_delay + assert_eq!(pair.time, start + Duration::from_millis(25)); + + // Sanity check: no loss probe was sent, because the delayed ACK was received on time + assert_eq!( + stats_after_drive.frame_tx.ping - stats_after_connect.frame_tx.ping, + 1 + ); +} + +#[test] +fn immediate_ack_triggers_ack() { + let _guard = subscribe(); + let mut pair = Pair::default(); + let (client_ch, _) = pair.connect(); + pair.drive(); + + let acks_after_connect = pair.client_conn_mut(client_ch).stats().frame_rx.acks; + + pair.client_conn_mut(client_ch).immediate_ack(); + pair.drive_client(); // Send ping + pair.drive_server(); // Process ping + pair.drive_client(); // Give the client a chance to process the ack + + let acks_after_ping = pair.client_conn_mut(client_ch).stats().frame_rx.acks; + + assert_eq!(acks_after_ping - acks_after_connect, 1); +} + +#[test] +fn out_of_order_ack_eliciting_packet_triggers_ack() { + let _guard = subscribe(); + let mut pair = Pair::default(); + let (client_ch, server_ch) = pair.connect(); + pair.drive(); + + let default_mtu = pair.max_udp_payload_size; + + let client_stats_after_connect = pair.client_conn_mut(client_ch).stats(); + let server_stats_after_connect = pair.server_conn_mut(server_ch).stats(); + + // Send a packet that won't arrive right away (it will be dropped and be re-sent later) + pair.max_udp_payload_size = 0; + pair.client_conn_mut(client_ch).ping(); + pair.drive_client(); + + // Sanity check (ping sent, no ACK received) + let client_stats_after_first_ping = pair.client_conn_mut(client_ch).stats(); + assert_eq!( + client_stats_after_first_ping.frame_tx.ping - client_stats_after_connect.frame_tx.ping, + 1 + ); + assert_eq!( + client_stats_after_first_ping.frame_rx.acks - client_stats_after_connect.frame_rx.acks, + 0 + ); + + // Restore the default MTU and send another ping, which will arrive earlier than the dropped one + pair.max_udp_payload_size = default_mtu; + pair.client_conn_mut(client_ch).ping(); + pair.drive_client(); + pair.drive_server(); + pair.drive_client(); + + // Client sanity check (ping sent, one ACK received) + let client_stats_after_second_ping = pair.client_conn_mut(client_ch).stats(); + assert_eq!( + client_stats_after_second_ping.frame_tx.ping - client_stats_after_connect.frame_tx.ping, + 2 + ); + assert_eq!( + client_stats_after_second_ping.frame_rx.acks - client_stats_after_connect.frame_rx.acks, + 1 + ); + + // Server checks (single ping received, ACK sent) + let server_stats_after_second_ping = pair.server_conn_mut(server_ch).stats(); + assert_eq!( + server_stats_after_second_ping.frame_rx.ping - server_stats_after_connect.frame_rx.ping, + 1 + ); + assert_eq!( + server_stats_after_second_ping.frame_tx.acks - server_stats_after_connect.frame_tx.acks, + 1 + ); +} + +#[test] +fn single_ack_eliciting_packet_with_ce_bit_triggers_immediate_ack() { + let _guard = subscribe(); + let mut pair = Pair::default(); + let (client_ch, _) = pair.connect(); + pair.drive(); + + let stats_after_connect = pair.client_conn_mut(client_ch).stats(); + + let start = pair.time; + + pair.client_conn_mut(client_ch).ping(); + + pair.congestion_experienced = true; + pair.drive_client(); // Send ping + pair.congestion_experienced = false; + + pair.drive_server(); // Process ping, send ACK in response to congestion + pair.drive_client(); // Process ACK + + // Sanity check: the time hasn't advanced in the meantime) + assert_eq!(pair.time, start); + + let stats_after_ping = pair.client_conn_mut(client_ch).stats(); + assert_eq!( + stats_after_ping.frame_tx.ping - stats_after_connect.frame_tx.ping, + 1 + ); + assert_eq!( + stats_after_ping.frame_rx.acks - stats_after_connect.frame_rx.acks, + 1 + ); + assert_eq!( + stats_after_ping.path.congestion_events - stats_after_connect.path.congestion_events, + 1 + ); +} + fn stream_chunks(mut recv: RecvStream) -> Vec { let mut buf = Vec::new(); diff --git a/quinn-proto/src/tests/util.rs b/quinn-proto/src/tests/util.rs index 529ebb1f80..35c0100e68 100644 --- a/quinn-proto/src/tests/util.rs +++ b/quinn-proto/src/tests/util.rs @@ -18,7 +18,7 @@ use tracing::{info_span, trace}; use super::*; -pub(super) const DEFAULT_MAX_UDP_PAYLOAD_SIZE: usize = 1200; +pub(super) const DEFAULT_MAX_UDP_PAYLOAD_SIZE: usize = 1452; pub(super) struct Pair { pub(super) server: TestEndpoint, @@ -26,6 +26,8 @@ pub(super) struct Pair { pub(super) time: Instant, /// Simulates the maximum size allowed for UDP payloads by the link (packets exceeding this size will be dropped) pub(super) max_udp_payload_size: usize, + /// Simulates explicit congestion notification + pub(super) congestion_experienced: bool, // One-way pub(super) latency: Duration, /// Number of spin bit flips @@ -58,6 +60,7 @@ impl Pair { latency: Duration::new(0, 0), spins: 0, last_spin: false, + congestion_experienced: false, } } @@ -131,9 +134,10 @@ impl Pair { socket.send_to(&x.contents, x.destination).unwrap(); } if self.server.addr == x.destination { + let ecn = ecn(x.ecn, self.congestion_experienced); self.server .inbound - .push_back((self.time + self.latency, x.ecn, x.contents)); + .push_back((self.time + self.latency, ecn, x.contents)); } } } @@ -154,9 +158,10 @@ impl Pair { socket.send_to(&x.contents, x.destination).unwrap(); } if self.client.addr == x.destination { + let ecn = ecn(x.ecn, self.congestion_experienced); self.client .inbound - .push_back((self.time + self.latency, x.ecn, x.contents)); + .push_back((self.time + self.latency, ecn, x.contents)); } } } @@ -524,6 +529,16 @@ fn packet_size(transmit: &Transmit) -> usize { transmit.contents.len() } +fn ecn(x: Option, congestion_experienced: bool) -> Option { + x.map(|codepoint| { + if congestion_experienced { + EcnCodepoint::from_bits(0b11).unwrap() + } else { + codepoint + } + }) +} + lazy_static! { pub static ref SERVER_PORTS: Mutex> = Mutex::new(4433..); pub static ref CLIENT_PORTS: Mutex> = Mutex::new(44433..); diff --git a/quinn-proto/src/transport_parameters.rs b/quinn-proto/src/transport_parameters.rs index 9fe376a74e..e17c824c59 100644 --- a/quinn-proto/src/transport_parameters.rs +++ b/quinn-proto/src/transport_parameters.rs @@ -21,7 +21,7 @@ use crate::{ config::{EndpointConfig, ServerConfig, TransportConfig}, shared::ConnectionId, ResetToken, Side, TransportError, VarInt, LOC_CID_COUNT, MAX_CID_SIZE, MAX_STREAM_COUNT, - RESET_TOKEN_SIZE, + RESET_TOKEN_SIZE, TIMER_GRANULARITY, }; // Apply a given macro to a list of all the transport parameters having integer types, along with @@ -81,6 +81,11 @@ macro_rules! make_struct { /// bit pub(crate) grease_quic_bit: bool, + /// Minimum amount of time in microseconds by which the endpoint is able to delay + /// sending acknowledgments (implies that the endpoint supports QUIC Acknowledgement + /// Frequency) + pub(crate) min_ack_delay: Option, + // Server-only /// The value of the Destination Connection ID field from the first Initial packet sent /// by the client @@ -104,6 +109,7 @@ macro_rules! make_struct { max_datagram_frame_size: None, initial_src_cid: None, grease_quic_bit: false, + min_ack_delay: None, original_dst_cid: None, retry_src_cid: None, @@ -146,6 +152,9 @@ impl TransportParameters { .datagram_receive_buffer_size .map(|x| (x.min(u16::max_value().into()) as u16).into()), grease_quic_bit: endpoint_config.grease_quic_bit, + min_ack_delay: Some( + VarInt::from_u64(u64::try_from(TIMER_GRANULARITY.as_micros()).unwrap()).unwrap(), + ), ..Self::default() } } @@ -329,6 +338,12 @@ impl TransportParameters { w.write_var(0x2ab2); w.write_var(0); } + + if let Some(x) = self.min_ack_delay { + w.write_var(0xff04de1a); + w.write_var(x.size() as u64); + w.write(x); + } } /// Decode `TransportParameters` from buffer @@ -392,6 +407,7 @@ impl TransportParameters { 0 => params.grease_quic_bit = true, _ => return Err(Error::Malformed), }, + 0xff04de1a => params.min_ack_delay = Some(r.get().unwrap()), _ => { macro_rules! parse { {$($(#[$doc:meta])* $name:ident ($code:expr) = $default:expr,)*} => { @@ -418,6 +434,10 @@ impl TransportParameters { || params.max_udp_payload_size.0 < 1200 || params.initial_max_streams_bidi.0 > MAX_STREAM_COUNT || params.initial_max_streams_uni.0 > MAX_STREAM_COUNT + || params.min_ack_delay.map_or(false, |min_ack_delay| { + // min_ack_delay uses nanoseconds, whereas max_ack_delay uses milliseconds + min_ack_delay.0 > params.max_ack_delay.0 * 1_000_000 + }) || (side.is_server() && (params.stateless_reset_token.is_some() || params.preferred_address.is_some())) { @@ -458,6 +478,7 @@ mod test { stateless_reset_token: [0xab; RESET_TOKEN_SIZE].into(), }), grease_quic_bit: true, + min_ack_delay: Some(2_000_000u32.into()), ..TransportParameters::default() }; params.write(&mut buf); @@ -467,6 +488,39 @@ mod test { ); } + #[test] + fn read_semantic_validation() { + let illegal_params_builders: Vec> = vec![ + Box::new(|t| { + // This min_ack_delay is bigger than max_ack_delay! + let min_ack_delay = t.max_ack_delay.0 * 1_000_000 + 1; + t.min_ack_delay = Some(VarInt::from_u64(min_ack_delay).unwrap()) + }), + Box::new(|t| { + // Preferred address can only be sent by senders (and we are reading the transport + // params as a client) + t.preferred_address = Some(PreferredAddress { + address_v4: Some(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 42)), + address_v6: None, + connection_id: ConnectionId::new(&[]), + stateless_reset_token: [0xab; RESET_TOKEN_SIZE].into(), + }) + }), + ]; + + for mut builder in illegal_params_builders { + let mut buf = Vec::new(); + let mut params = TransportParameters::default(); + builder(&mut params); + params.write(&mut buf); + + assert_eq!( + TransportParameters::read(Side::Server, &mut buf.as_slice()), + Err(Error::IllegalValue) + ); + } + } + #[test] fn resumption_params_validation() { let high_limit = TransportParameters {