diff --git a/quinn-proto/src/config.rs b/quinn-proto/src/config.rs index d4c8e6385..6894265a9 100644 --- a/quinn-proto/src/config.rs +++ b/quinn-proto/src/config.rs @@ -52,6 +52,7 @@ pub struct TransportConfig { pub(crate) min_mtu: u16, pub(crate) mtu_discovery_config: Option, pub(crate) ack_frequency_config: Option, + pub(crate) ack_timestamps_config: AckTimestampsConfig, pub(crate) persistent_congestion_threshold: u32, pub(crate) keep_alive_interval: Option, @@ -223,6 +224,21 @@ impl TransportConfig { self } + /// Specifies the ACK timestamp config. + /// Defaults to `None`, which disables receiving acknowledgement timestamps from the sender. + /// If `Some`, TransportParameters are sent to the peer to enable acknowledgement timestamps + /// if supported. + pub fn max_ack_timestamps(&mut self, value: VarInt) -> &mut Self { + self.ack_timestamps_config.max_timestamps_per_ack = Some(value); + self + } + + /// Specifies the exponent used when encoding the timestamps. + pub fn ack_timestamps_exponent(&mut self, value: VarInt) -> &mut Self { + self.ack_timestamps_config.exponent = value; + self + } + /// Number of consecutive PTOs after which network is considered to be experiencing persistent congestion. pub fn persistent_congestion_threshold(&mut self, value: u32) -> &mut Self { self.persistent_congestion_threshold = value; @@ -360,6 +376,8 @@ impl Default for TransportConfig { congestion_controller_factory: Arc::new(congestion::CubicConfig::default()), enable_segmentation_offload: true, + + ack_timestamps_config: AckTimestampsConfig::default(), } } } @@ -390,6 +408,7 @@ impl fmt::Debug for TransportConfig { deterministic_packet_numbers: _, congestion_controller_factory: _, enable_segmentation_offload, + ack_timestamps_config, } = self; fmt.debug_struct("TransportConfig") .field("max_concurrent_bidi_streams", max_concurrent_bidi_streams) @@ -416,10 +435,44 @@ impl fmt::Debug for TransportConfig { .field("datagram_send_buffer_size", datagram_send_buffer_size) .field("congestion_controller_factory", &"[ opaque ]") .field("enable_segmentation_offload", enable_segmentation_offload) + .field("ack_timestamps_config", ack_timestamps_config) .finish() } } +/// Parameters for controlling the peer's acknowledgements with receiver timestamps. +#[derive(Clone, Debug, PartialEq, Eq, Copy)] +pub struct AckTimestampsConfig { + /// If max_timestamp_per_ack is None, this feature is disabled. + pub(crate) max_timestamps_per_ack: Option, + pub(crate) exponent: VarInt, +} + +impl AckTimestampsConfig { + /// Sets the maximum number of timestamp entries per ACK frame. + pub fn max_timestamps_per_ack(&mut self, value: VarInt) -> &mut Self { + self.max_timestamps_per_ack = Some(value); + self + } + + /// Timestamp values are divided by the exponent value provided. This reduces the size of the + /// VARINT for loss in precision. A exponent of 0 represents microsecond precision. + pub fn exponent(&mut self, value: VarInt) -> &mut Self { + self.exponent = value; + self + } +} + +impl Default for AckTimestampsConfig { + fn default() -> Self { + Self { + max_timestamps_per_ack: None, + // Default to 0 as per draft. + exponent: 0u32.into(), + } + } +} + /// Parameters for controlling the peer's acknowledgement frequency /// /// The parameters provided in this config will be sent to the peer at the beginning of the diff --git a/quinn-proto/src/congestion.rs b/quinn-proto/src/congestion.rs index fac6e80e5..b41b7dc38 100644 --- a/quinn-proto/src/congestion.rs +++ b/quinn-proto/src/congestion.rs @@ -3,7 +3,7 @@ use crate::connection::RttEstimator; use std::any::Any; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; mod bbr; mod cubic; @@ -34,6 +34,21 @@ pub trait Controller: Send + Sync { ) { } + #[allow(unused_variables)] + /// Packet deliveries were confirmed with timestamps information. + fn on_ack_timestamped( + &mut self, + pn: u64, + now: Instant, + sent: Instant, + received: Option, + bytes: u64, + app_limited: bool, + rtt: &RttEstimator, + ) { + self.on_ack(now, sent, bytes, app_limited, rtt); + } + /// Packets are acked in batches, all with the same `now` argument. This indicates one of those batches has completed. #[allow(unused_variables)] fn on_end_acks( diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 2c0f92b55..a5d171d33 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -9,7 +9,7 @@ use std::{ }; use bytes::{Bytes, BytesMut}; -use frame::StreamMetaVec; +use frame::{AckTimestampEncodeParams, StreamMetaVec}; use rand::{rngs::StdRng, Rng, SeedableRng}; use thiserror::Error; use tracing::{debug, error, trace, trace_span, warn}; @@ -18,10 +18,9 @@ use crate::{ cid_generator::ConnectionIdGenerator, cid_queue::CidQueue, coding::BufMutExt, - config::{ServerConfig, TransportConfig}, + config::{AckTimestampsConfig, ServerConfig, TransportConfig}, crypto::{self, KeyPair, Keys, PacketKey}, - frame, - frame::{Close, Datagram, FrameStruct}, + frame::{self, Close, Datagram, FrameStruct}, packet::{ FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet, PacketNumber, PartialDecode, SpaceId, @@ -65,7 +64,10 @@ use paths::{PathData, PathResponses}; mod send_buffer; -mod spaces; +mod receiver_timestamps; +pub(crate) use receiver_timestamps::{PacketTimestamp, ReceiverTimestamps}; + +pub(crate) mod spaces; #[cfg(fuzzing)] pub use spaces::Retransmits; #[cfg(not(fuzzing))] @@ -227,6 +229,10 @@ pub struct Connection { /// no outgoing application data. app_limited: bool, + // Ack Receive Timestamps + // The timestamp config of the peer. + ack_timestamps_cfg: AckTimestampsConfig, + streams: StreamsState, /// Surplus remote CIDs for future use on new paths rem_cids: CidQueue, @@ -238,6 +244,8 @@ pub struct Connection { stats: ConnectionStats, /// QUIC version used for the connection. version: u32, + /// Created at time instant. + epoch: Instant, } impl Connection { @@ -337,6 +345,8 @@ impl Connection { &TransportParameters::default(), )), + ack_timestamps_cfg: AckTimestampsConfig::default(), + pto_count: 0, app_limited: false, @@ -357,6 +367,7 @@ impl Connection { rng, stats: ConnectionStats::default(), version, + epoch: now, }; if side.is_client() { // Kick off the connection @@ -817,6 +828,7 @@ impl Connection { &mut self.spaces[space_id], buf, &mut self.stats, + self.ack_timestamps_cfg, ); } @@ -1343,6 +1355,7 @@ impl Connection { if ack.largest >= self.spaces[space].next_packet_number { return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked")); } + let new_largest = { let space = &mut self.spaces[space]; if space @@ -1371,6 +1384,23 @@ impl Connection { } } + let timestamp_iter = ack.timestamps_iter(self.config.ack_timestamps_config.exponent.0); + if let (Some(max), Some(iter)) = ( + self.config.ack_timestamps_config.max_timestamps_per_ack, + timestamp_iter, + ) { + let packet_space = &mut self.spaces[space]; + for (i, pkt) in iter.enumerate() { + if i > max.0 as usize { + warn!("peer is sending more timestamps than max requested"); + break; + } + if let Some(sent_packet) = packet_space.get_mut_sent_packet(pkt.packet_number) { + sent_packet.time_received = Some(pkt.timestamp); + } + } + } + if newly_acked.is_empty() { return Ok(()); } @@ -1490,9 +1520,11 @@ impl Connection { if info.ack_eliciting && self.path.challenge.is_none() { // Only pass ACKs to the congestion controller if we are not validating the current // path, so as to ignore any ACKs from older paths still coming in. - self.path.congestion.on_ack( + self.path.congestion.on_ack_timestamped( + pn, now, info.time_sent, + info.time_received, info.size.into(), self.app_limited, &self.path.rtt, @@ -3047,6 +3079,7 @@ impl Connection { space, buf, &mut self.stats, + self.ack_timestamps_cfg, ); } @@ -3231,6 +3264,7 @@ impl Connection { space: &mut PacketSpace, buf: &mut Vec, stats: &mut ConnectionStats, + ack_timestamps_config: AckTimestampsConfig, ) { debug_assert!(!space.pending_acks.ranges().is_empty()); @@ -3255,7 +3289,26 @@ impl Connection { delay_micros ); - frame::Ack::encode(delay as _, space.pending_acks.ranges(), ecn, buf); + frame::Ack::encode( + delay as _, + space.pending_acks.ranges(), + ecn, + ack_timestamps_config.max_timestamps_per_ack.map(|max| { + AckTimestampEncodeParams { + // Safety: If peer_timestamp_config is set, receiver_timestamps must be set. + receiver_timestamps: space.pending_acks.receiver_timestamps_as_ref().unwrap(), + exponent: ack_timestamps_config.exponent.0, + max_timestamps: max.0, + } + }), + buf, + ); + + if let Some(ts) = space.pending_acks.receiver_timestamps_as_mut() { + // Best effort / one try to send the timestamps to the peer. + ts.clear(); + } + stats.frame_tx.acks += 1; } @@ -3309,6 +3362,14 @@ impl Connection { self.path.mtud.on_peer_max_udp_payload_size_received( u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX), ); + self.ack_timestamps_cfg = params.ack_timestamps_cfg; + if let Some(max) = params.ack_timestamps_cfg.max_timestamps_per_ack { + for space in self.spaces.iter_mut() { + space + .pending_acks + .set_receiver_timestamp(max.0 as usize, self.epoch); + } + }; } fn decrypt_packet( diff --git a/quinn-proto/src/connection/packet_builder.rs b/quinn-proto/src/connection/packet_builder.rs index a29ff448e..b7c06854c 100644 --- a/quinn-proto/src/connection/packet_builder.rs +++ b/quinn-proto/src/connection/packet_builder.rs @@ -210,6 +210,8 @@ impl PacketBuilder { ack_eliciting, retransmits: sent.retransmits, stream_frames: sent.stream_frames, + + time_received: None, }; conn.path diff --git a/quinn-proto/src/connection/receiver_timestamps.rs b/quinn-proto/src/connection/receiver_timestamps.rs new file mode 100644 index 000000000..80114874e --- /dev/null +++ b/quinn-proto/src/connection/receiver_timestamps.rs @@ -0,0 +1,129 @@ +use std::{collections::VecDeque, fmt}; + +use std::time::{Duration, Instant}; + +use tracing::warn; + +#[derive(Debug, Clone, PartialEq, Eq, Copy)] +pub struct PacketTimestamp { + pub packet_number: u64, + pub timestamp: Duration, +} + +pub(crate) struct ReceiverTimestamps { + pub(crate) data: VecDeque, + max: usize, + epoch: Instant, +} + +impl fmt::Debug for ReceiverTimestamps { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut l = f.debug_list(); + let mut last: Option<(u64, Duration)> = None; + for curr in self.data.iter() { + if let Some(last) = last.take() { + let s = format!( + "{}..{} diff_micros: {}", + last.0, + curr.packet_number, + (curr.timestamp - last.1).as_micros(), + ); + l.entry(&s); + } + let _ = last.insert((curr.packet_number, curr.timestamp)); + } + l.finish() + } +} + +impl ReceiverTimestamps { + pub(crate) fn new(max: usize, epoch: Instant) -> Self { + Self { + data: VecDeque::with_capacity(max), + max, + epoch, + } + } + + pub(crate) fn add(&mut self, packet_number: u64, timestamp: Instant) { + if self.data.len() == self.max { + self.data.pop_front(); + } + if let Some(v) = self.data.back() { + if packet_number <= v.packet_number { + warn!("out of order packets are not supported"); + return; + } + } + self.data.push_back(PacketTimestamp { + packet_number, + timestamp: timestamp - self.epoch, + }); + } + + pub(crate) fn clear(&mut self) { + self.data.clear(); + } + + #[allow(dead_code)] + pub(crate) fn subtract_below(&mut self, packet_number: u64) { + if self.data.is_empty() { + return; + } + let idx = self + .data + .partition_point(|v| v.packet_number < packet_number); + if idx == self.data.len() { + self.data.clear(); + } else { + let _ = self.data.drain(0..=idx); + } + } + + pub(crate) fn iter(&self) -> impl DoubleEndedIterator + '_ { + self.data.iter().cloned() + } + + pub(crate) fn len(&self) -> usize { + self.data.len() + } +} + +#[cfg(test)] +mod receiver_timestamp_tests { + use super::*; + + #[test] + fn subtract_below() { + let t0 = Instant::now(); + let mut ts = ReceiverTimestamps::new(10, t0); + ts.add(1, t0); + ts.add(2, t0); + ts.add(3, t0); + ts.add(4, t0); + ts.subtract_below(3); + assert_eq!(1, ts.len()); + } + + #[test] + fn subtract_below_everything() { + let t0 = Instant::now(); + let mut ts = ReceiverTimestamps::new(10, t0); + ts.add(5, t0); + ts.subtract_below(10); + assert_eq!(0, ts.len()); + } + + #[test] + fn receiver_timestamp_max() { + let t0 = Instant::now(); + let mut ts = ReceiverTimestamps::new(2, t0); + ts.add(1, t0); + ts.add(2, t0); + ts.add(3, t0); + ts.add(4, t0); + assert_eq!(2, ts.len()); + assert_eq!(3, ts.data.front().unwrap().packet_number); + assert_eq!(4, ts.data.back().unwrap().packet_number); + } +} diff --git a/quinn-proto/src/connection/spaces.rs b/quinn-proto/src/connection/spaces.rs index 3f999c886..00019cd8f 100644 --- a/quinn-proto/src/connection/spaces.rs +++ b/quinn-proto/src/connection/spaces.rs @@ -12,8 +12,13 @@ use tracing::trace; use super::assembler::Assembler; use crate::{ - connection::StreamsState, crypto::Keys, frame, packet::SpaceId, range_set::ArrayRangeSet, - shared::IssuedCid, Dir, StreamId, TransportError, VarInt, + connection::{receiver_timestamps::ReceiverTimestamps, StreamsState}, + crypto::Keys, + frame, + packet::SpaceId, + range_set::ArrayRangeSet, + shared::IssuedCid, + Dir, StreamId, TransportError, VarInt, }; pub(super) struct PacketSpace { @@ -216,6 +221,10 @@ impl PacketSpace { Some(packet) } + pub(super) fn get_mut_sent_packet(&mut self, number: u64) -> Option<&mut SentPacket> { + self.sent_packets.get_mut(&number) + } + /// Returns the number of bytes to *remove* from the connection's in-flight count pub(super) fn sent(&mut self, number: u64, packet: SentPacket) -> u64 { // Retain state for at most this many non-ACK-eliciting packets sent after the most recently @@ -278,6 +287,11 @@ impl IndexMut for [PacketSpace; 3] { pub(super) struct SentPacket { /// The time the packet was sent. pub(super) time_sent: Instant, + + /// The time the packet was received by the receiver. The time Instant on this field is + /// relative to a basis negotiated by the two connections. Time arithmetic done using the + /// time_received field is only useful when compared to other time_received. + pub(super) time_received: Option, /// The number of bytes sent in the packet, not including UDP or IP overhead, but including QUIC /// framing overhead. Zero if this packet is not counted towards congestion control, i.e. not an /// "in flight" packet. @@ -587,6 +601,8 @@ pub(super) struct PendingAcks { largest_ack_eliciting_packet: Option, /// The largest acknowledged packet number sent in an ACK frame largest_acked: Option, + + receiver_timestamps: Option, } impl PendingAcks { @@ -602,9 +618,15 @@ impl PendingAcks { largest_packet: None, largest_ack_eliciting_packet: None, largest_acked: None, + + receiver_timestamps: None, } } + pub(super) fn set_receiver_timestamp(&mut self, max_timestamps: usize, epoch: Instant) { + self.receiver_timestamps = Some(ReceiverTimestamps::new(max_timestamps, epoch)); + } + pub(super) fn set_ack_frequency_params(&mut self, frame: &frame::AckFrequency) { self.ack_eliciting_threshold = frame.ack_eliciting_threshold.into_inner(); self.reordering_threshold = frame.reordering_threshold.into_inner(); @@ -644,6 +666,10 @@ impl PendingAcks { ack_eliciting: bool, dedup: &Dedup, ) -> bool { + if let Some(ts) = self.receiver_timestamps_as_mut() { + ts.add(packet_number, now); + } + if !ack_eliciting { self.non_ack_eliciting_since_last_ack_sent += 1; return false; @@ -755,6 +781,14 @@ impl PendingAcks { &self.ranges } + pub(super) fn receiver_timestamps_as_mut(&mut self) -> Option<&mut ReceiverTimestamps> { + self.receiver_timestamps.as_mut() + } + + pub(super) fn receiver_timestamps_as_ref(&self) -> Option<&ReceiverTimestamps> { + self.receiver_timestamps.as_ref() + } + /// Queue an ACK if a significant number of non-ACK-eliciting packets have not yet been /// acknowledged /// diff --git a/quinn-proto/src/frame.rs b/quinn-proto/src/frame.rs index d58cb36e5..d9dbfbbbf 100644 --- a/quinn-proto/src/frame.rs +++ b/quinn-proto/src/frame.rs @@ -2,6 +2,7 @@ use std::{ fmt::{self, Write}, io, mem, ops::{Range, RangeInclusive}, + time::Duration, }; use bytes::{Buf, BufMut, Bytes}; @@ -9,6 +10,7 @@ use tinyvec::TinyVec; use crate::{ coding::{self, BufExt, BufMutExt, UnexpectedEnd}, + connection::{PacketTimestamp, ReceiverTimestamps}, range_set::ArrayRangeSet, shared::{ConnectionId, EcnCodepoint}, Dir, ResetToken, StreamId, TransportError, TransportErrorCode, VarInt, MAX_CID_SIZE, @@ -133,6 +135,8 @@ frame_types! { ACK_FREQUENCY = 0xaf, IMMEDIATE_ACK = 0x1f, // DATAGRAM + // Custom frame for https://www.ietf.org/archive/id/draft-smith-quic-receive-ts-00.html + ACK_RECEIVE_TIMESTAMPS = 0x40, } const STREAM_TYS: RangeInclusive = RangeInclusive::new(0x08, 0x0f); @@ -343,6 +347,7 @@ pub struct Ack { pub delay: u64, pub additional: Bytes, pub ecn: Option, + pub timestamps: Option, } impl fmt::Debug for Ack { @@ -358,11 +363,14 @@ impl fmt::Debug for Ack { } ranges.push(']'); + let timestamp_count = self.timestamps_iter(0).map(|iter| iter.count()); + f.debug_struct("Ack") .field("largest", &self.largest) .field("delay", &self.delay) .field("ecn", &self.ecn) .field("ranges", &ranges) + .field("timestamp_count", ×tamp_count) .finish() } } @@ -376,18 +384,27 @@ impl<'a> IntoIterator for &'a Ack { } } +pub struct AckTimestampEncodeParams<'a> { + pub(crate) receiver_timestamps: &'a ReceiverTimestamps, + pub(crate) exponent: u64, + pub(crate) max_timestamps: u64, +} + impl Ack { pub fn encode( delay: u64, ranges: &ArrayRangeSet, ecn: Option<&EcnCounts>, + ts_params: Option, buf: &mut W, ) { let mut rest = ranges.iter().rev(); let first = rest.next().unwrap(); let largest = first.end - 1; let first_size = first.end - first.start; - buf.write(if ecn.is_some() { + buf.write(if ts_params.is_some() { + Type::ACK_RECEIVE_TIMESTAMPS + } else if ecn.is_some() { Type::ACK_ECN } else { Type::ACK @@ -403,7 +420,15 @@ impl Ack { buf.write_var(size - 1); prev = block.start; } - if let Some(x) = ecn { + if let Some(x) = ts_params { + Self::encode_timestamps( + x.receiver_timestamps, + largest, + buf, + x.exponent, + x.max_timestamps, + ) + } else if let Some(x) = ecn { x.encode(buf) } } @@ -411,6 +436,101 @@ impl Ack { pub fn iter(&self) -> AckIter<'_> { self.into_iter() } + + pub fn timestamps_iter(&self, exponent: u64) -> Option { + self.timestamps + .as_ref() + .map(|v| AckTimestampIter::new(self.largest, exponent, &v[..])) + } + + // https://www.ietf.org/archive/id/draft-smith-quic-receive-ts-00.html#ts-ranges + fn encode_timestamps( + timestamps: &ReceiverTimestamps, + mut largest: u64, + buf: &mut impl BufMut, + exponent: u64, + max_timestamps: u64, + ) { + if timestamps.len() == 0 { + buf.write_var(0); + return; + } + let mut prev = None; + + // segment_idx tracks the positions in `timestamps` in which a gap occurs. + let mut segment_idxs = Vec::::new(); + // iterates from largest number to smallest + for (i, pkt) in timestamps.iter().rev().enumerate() { + if let Some(prev) = prev { + if pkt.packet_number + 1 != prev { + segment_idxs.push(timestamps.len() - i); + } + } + prev = Some(pkt.packet_number); + } + segment_idxs.push(0); + // Timestamp Range Count + buf.write_var(segment_idxs.len() as u64); + + let mut right = timestamps.len(); + let mut first = true; + let mut counter = 0; + let mut basis = Duration::ZERO; + + for segment_idx in segment_idxs { + let Some(elt) = timestamps.data.get(right - 1) else { + debug_assert!( + false, + "an invalid indexing occurred on the ReceiverTimestamps vector" + ); + break; + }; + // *Gap + // For the first Timestamp Range: Gap is the difference between (a) the Largest Acknowledged packet number + // in the frame and (b) the largest packet in the current (first) Timestamp Range. + let gap = if first { + debug_assert!( + elt.packet_number <= largest, + "largest packet number is less than what was found in timestamp vec" + ); + largest - elt.packet_number + } else { + // For subsequent Timestamp Ranges: Gap is the difference between (a) the packet number two lower + // than the smallest packet number in the previous Timestamp Range + // and (b) the largest packet in the current Timestamp Range. + largest - 2 - elt.packet_number + }; + buf.write_var(gap); + // *Timestamp Delta Count + buf.write_var((right - segment_idx) as u64); + // *Timestamp Deltas + for pkt in timestamps.data.range(segment_idx..right).rev() { + let delta: u64 = if first { + first = false; + // For the first Timestamp Delta of the first Timestamp Range in the frame: the value + // is the difference between (a) the receive timestamp of the largest packet in the + // Timestamp Range (indicated by Gap) and (b) the session receive_timestamp_basis + pkt.timestamp.as_micros() as u64 + } else { + // For all other Timestamp Deltas: the value is the difference between + // (a) the receive timestamp specified by the previous Timestamp Delta and + // (b) the receive timestamp of the current packet in the Timestamp Range, decoded as described below. + (basis - pkt.timestamp).as_micros() as u64 + }; + buf.write_var(delta >> exponent); + basis = pkt.timestamp; + largest = pkt.packet_number; + counter += 1; + } + + right = segment_idx; + } + + debug_assert!( + counter <= max_timestamps, + "the number of timestamps in an ack frame exceeded the max allowed" + ); + } } #[derive(Debug, Copy, Clone, Eq, PartialEq)] @@ -619,7 +739,7 @@ impl Iter { Type::RETIRE_CONNECTION_ID => Frame::RetireConnectionId { sequence: self.bytes.get_var()?, }, - Type::ACK | Type::ACK_ECN => { + Type::ACK | Type::ACK_ECN | Type::ACK_RECEIVE_TIMESTAMPS => { let largest = self.bytes.get_var()?; let delay = self.bytes.get_var()?; let extra_blocks = self.bytes.get_var()? as usize; @@ -639,6 +759,15 @@ impl Iter { ce: self.bytes.get_var()?, }) }, + timestamps: if ty != Type::ACK_RECEIVE_TIMESTAMPS { + None + } else { + let ts_start = end; + let ts_range_count = self.bytes.get_var()?; + scan_ack_timestamps_blocks(&mut self.bytes, largest, ts_range_count)?; + let ts_end = self.bytes.position() as usize; + Some(self.bytes.get_ref().slice(ts_start..ts_end)) + }, }) } Type::PATH_CHALLENGE => Frame::PathChallenge(self.bytes.get()?), @@ -767,12 +896,45 @@ fn scan_ack_blocks(buf: &mut io::Cursor, largest: u64, n: usize) -> Resul Ok(()) } +fn scan_ack_timestamps_blocks( + buf: &mut io::Cursor, + largest: u64, + range_count: u64, +) -> Result<(), IterErr> { + let mut next = largest; + let mut first = true; + for _ in 0..range_count { + let gap = buf.get_var()?; + next = if first { + first = false; + next.checked_sub(gap).ok_or(IterErr::Malformed)? + } else { + next.checked_sub(gap + 2).ok_or(IterErr::Malformed)? + }; + let timestamp_delta_count = buf.get_var()?; + next = next + .checked_sub(timestamp_delta_count - 1) + .ok_or(IterErr::Malformed)?; + for _ in 0..timestamp_delta_count { + buf.get_var()?; + } + } + Ok(()) +} + +#[derive(PartialEq, Eq)] enum IterErr { UnexpectedEnd, InvalidFrameId, Malformed, } +impl fmt::Debug for IterErr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(self.reason()) + } +} + impl IterErr { fn reason(&self) -> &'static str { use self::IterErr::*; @@ -790,6 +952,71 @@ impl From for IterErr { } } +pub struct AckTimestampIter<'a> { + timestamp_basis: u64, + exponent: u64, + data: &'a [u8], + + deltas_remaining: usize, + first: bool, + next_pn: u64, +} + +impl<'a> AckTimestampIter<'a> { + fn new(largest: u64, exponent: u64, mut data: &'a [u8]) -> Self { + // We read and throw away the Timestamp Range Count value because + // it was already used to properly slice the data. + // Unwrap safety: this byte block was scanned prior using scan_ack_timestamps_blocks. + let _ = data.get_var().unwrap(); + AckTimestampIter { + timestamp_basis: 0, + exponent, + data, + deltas_remaining: 0, + first: true, + next_pn: largest, + } + } +} + +impl<'a> Iterator for AckTimestampIter<'a> { + type Item = PacketTimestamp; + fn next(&mut self) -> Option { + if !self.data.has_remaining() { + debug_assert_eq!( + self.deltas_remaining, 0, + "timestamp delta remaining should be 0" + ); + return None; + } + if self.deltas_remaining == 0 { + let gap = self.data.get_var().unwrap(); + self.deltas_remaining = self.data.get_var().unwrap() as usize; + self.next_pn -= match self.first { + true => gap, + false => gap + 2, + }; + } else { + self.next_pn -= 1; + } + + let delta = self.data.get_var().unwrap(); + self.deltas_remaining -= 1; + + if self.first { + self.timestamp_basis = delta << self.exponent; + self.first = false; + } else { + self.timestamp_basis -= delta << self.exponent; + } + + Some(PacketTimestamp { + packet_number: self.next_pn, + timestamp: Duration::from_micros(self.timestamp_basis), + }) + } +} + #[derive(Debug, Clone)] pub struct AckIter<'a> { largest: u64, @@ -797,7 +1024,7 @@ pub struct AckIter<'a> { } impl<'a> AckIter<'a> { - fn new(largest: u64, payload: &'a [u8]) -> Self { + pub(crate) fn new(largest: u64, payload: &'a [u8]) -> Self { let data = io::Cursor::new(payload); Self { largest, data } } @@ -955,7 +1182,7 @@ mod test { ect1: 24, ce: 12, }; - Ack::encode(42, &ranges, Some(&ECN), &mut buf); + Ack::encode(42, &ranges, Some(&ECN), None, &mut buf); let frames = frames(buf); assert_eq!(frames.len(), 1); match frames[0] { @@ -995,4 +1222,258 @@ mod test { assert_eq!(frames.len(), 1); assert_matches!(&frames[0], Frame::ImmediateAck); } + + #[cfg(test)] + mod ack_timestamps_tests { + use super::*; + use std::time::Instant; + + #[test] + fn test_scan_ack_timestamps_block() { + let mut buf = bytes::BytesMut::new(); + buf.write_var(0); // gap + buf.write_var(3); // delta count + buf.write_var(1); // delta + buf.write_var(2); // delta + buf.write_var(3); // delta + + let buf_len = buf.len() as u64; + let mut c = io::Cursor::new(buf.freeze()); + scan_ack_timestamps_blocks(&mut c, 3, 1).unwrap(); + assert_eq!(buf_len, c.position()); + } + + #[test] + fn test_scan_ack_timestamps_block_with_gap() { + let mut buf = bytes::BytesMut::new(); + buf.write_var(1); // gap + buf.write_var(3); // delta count + buf.write_var(1); // pn 9 + buf.write_var(2); // pn 8 + buf.write_var(3); // pn 7 + + buf.write_var(1); // gap + buf.write_var(5); // delta count + buf.write_var(1); // pn 4 + buf.write_var(2); // pn 3 + buf.write_var(3); // pn 2 + buf.write_var(3); // pn 1 + buf.write_var(3); // pn 0 + + let buf_len = buf.len() as u64; + let mut c = io::Cursor::new(buf.freeze()); + scan_ack_timestamps_blocks(&mut c, 10, 2).unwrap(); + assert_eq!(buf_len, c.position()); + } + + #[test] + fn test_scan_ack_timestamps_block_with_gap_malforned() { + let mut buf = bytes::BytesMut::new(); + buf.write_var(1); // gap + buf.write_var(3); // delta count + buf.write_var(1); // pn 9 + buf.write_var(2); // pn 8 + buf.write_var(3); // pn 7 + + buf.write_var(2); // gap + buf.write_var(6); // delta count + buf.write_var(1); // pn 3 + buf.write_var(2); // pn 2 + buf.write_var(3); // pn 1 + buf.write_var(3); // pn 0 + buf.write_var(3); // pn -1 this will cause an error + + let mut c = io::Cursor::new(buf.freeze()); + assert!(scan_ack_timestamps_blocks(&mut c, 10, 2).is_err()); + } + + #[test] + fn test_scan_ack_timestamps_block_malformed() { + let mut buf = bytes::BytesMut::new(); + buf.write_var(5); // gap + buf.write_var(1); // delta count + buf.write_var(1); // packet number 0 + + let mut c = io::Cursor::new(buf.freeze()); + assert_eq!( + IterErr::UnexpectedEnd, + scan_ack_timestamps_blocks(&mut c, 5, 2).unwrap_err(), + ); + } + + #[test] + fn timestamp_iter() { + let second = Duration::from_secs(1); + let t0 = Instant::now(); + + let mut timestamps = ReceiverTimestamps::new(100, t0); + + timestamps.add(1, t0 + second); + timestamps.add(2, t0 + second * 2); + timestamps.add(3, t0 + second * 3); + let mut buf = bytes::BytesMut::new(); + + Ack::encode_timestamps(×tamps, 12, &mut buf, 0, 10); + + // Manually decode and assert the values in the buffer. + assert_eq!(1, buf.get_var().unwrap()); // timestamp_range_count + assert_eq!(9, buf.get_var().unwrap()); // gap: 12-3 + assert_eq!(3, buf.get_var().unwrap()); // timestamp delta count + assert_eq!(3_000_000, buf.get_var().unwrap()); // timestamp delta: 3_000_000 μs = 3 seconds = diff between largest timestamp and basis + assert_eq!(1_000_000, buf.get_var().unwrap()); // timestamp delta: 1 second diff + assert_eq!(1_000_000, buf.get_var().unwrap()); // timestamp delta: 1 second diff + assert!(buf.get_var().is_err()); + } + + #[test] + fn timestamp_iter_with_gaps() { + let t0 = Instant::now(); + let mut timestamps = ReceiverTimestamps::new(100, t0); + let one_second = Duration::from_secs(1); + let t0 = Instant::now(); + vec![(1..=3), (5..=5), (10..=12)] + .into_iter() + .flatten() + .for_each(|i| timestamps.add(i, t0 + one_second * i as u32)); + + let mut buf = bytes::BytesMut::new(); + + Ack::encode_timestamps(×tamps, 12, &mut buf, 0, 10); + // Manually decode and assert the values in the buffer. + assert_eq!(3, buf.get_var().unwrap()); // timestamp_range_count + // + assert_eq!(0, buf.get_var().unwrap()); // gap: 12 - 12 = 0 + assert_eq!(3, buf.get_var().unwrap()); // timestamp_delta_count + assert_eq!(12_000_000, buf.get_var().unwrap()); // delta: 3_000_000 μs = 3 seconds = diff between largest timestamp and basis + assert_eq!(1_000_000, buf.get_var().unwrap()); // delta: 1 second diff + assert_eq!(1_000_000, buf.get_var().unwrap()); // delta: 1 second diff + // + assert_eq!(3, buf.get_var().unwrap()); // gap: 10 - 2 - 5 = 3 + assert_eq!(1, buf.get_var().unwrap()); // timestamp_delta_count + assert_eq!(5_000_000, buf.get_var().unwrap()); // delta: 1 second diff + + assert_eq!(0, buf.get_var().unwrap()); // gap + assert_eq!(3, buf.get_var().unwrap()); // timestamp_delta_count + assert_eq!(2_000_000, buf.get_var().unwrap()); // delta: 2 second diff + assert_eq!(1_000_000, buf.get_var().unwrap()); // delta: 1 second diff + assert_eq!(1_000_000, buf.get_var().unwrap()); // delta: 1 second diff + + // end + assert!(buf.get_var().is_err()); + } + + #[test] + fn timestamp_iter_with_exponent() { + let t0 = Instant::now(); + let mut timestamps = ReceiverTimestamps::new(100, t0); + let millisecond = Duration::from_millis(1); + timestamps.add(1, t0 + millisecond * 200); + timestamps.add(2, t0 + millisecond * 300); + let mut buf = bytes::BytesMut::new(); + + let exponent = 2; + Ack::encode_timestamps(×tamps, 12, &mut buf, exponent, 10); + + // values below are tested in another unit test + buf.get_var().unwrap(); // timestamp_range_count + buf.get_var().unwrap(); // gap + buf.get_var().unwrap(); // timestamp_delta_count + assert_eq!(300_000 >> exponent, buf.get_var().unwrap()); // 300ms diff + assert_eq!(100_000 >> exponent, buf.get_var().unwrap()); // 100ms diff + assert!(buf.get_var().is_err()); + } + + #[test] + fn timestamp_encode_decode() { + let t0 = Instant::now(); + let mut timestamps = ReceiverTimestamps::new(100, t0); + let one_second = Duration::from_secs(1); + timestamps.add(1, t0 + one_second); + timestamps.add(2, t0 + one_second * 2); + timestamps.add(3, t0 + one_second * 3); + + let mut buf = bytes::BytesMut::new(); + + Ack::encode_timestamps(×tamps, 12, &mut buf, 0, 10); + + let decoder = AckTimestampIter::new(12, 0, &buf); + + let got: Vec<_> = decoder.collect(); + // [(3, _), (2, _), (1, _)] + assert_eq!(3, got.len()); + assert_eq!(3, got[0].packet_number); + assert_eq!(3 * one_second, got[0].timestamp,); + + assert_eq!(2, got[1].packet_number); + assert_eq!(2 * one_second, got[1].timestamp,); + + assert_eq!(1, got[2].packet_number); + assert_eq!(1 * one_second, got[2].timestamp); + } + + #[test] + fn timestamp_encode_decode_with_gaps() { + let t0 = Instant::now(); + let mut timestamps = ReceiverTimestamps::new(100, t0); + let one_second = Duration::from_secs(1); + let expect: Vec<_> = vec![(1..=3), (5..=5), (10..=12)] + .into_iter() + .flatten() + .collect::>() + .into_iter() + .map(|i| { + let dt = one_second * i as u32; + timestamps.add(i, t0 + dt); + PacketTimestamp { + packet_number: i, + timestamp: dt, + } + }) + .collect(); + + let mut buf = bytes::BytesMut::new(); + + Ack::encode_timestamps(×tamps, 12, &mut buf, 0, 10); + + let decoder = AckTimestampIter::new(12, 0, &buf); + let got: Vec<_> = decoder.collect(); + + assert_eq!(7, got.len()); + assert_eq!(expect, got.into_iter().rev().collect::>()); + } + + #[test] + fn timestamp_encode_max_ack() { + // fix this + let t0 = Instant::now(); + let mut timestamps = ReceiverTimestamps::new(2, t0); + let one_second = Duration::from_secs(1); + let expect: Vec<_> = vec![(1..=3), (5..=5), (10..=12)] + .into_iter() + .flatten() + .collect::>() + .into_iter() + .map(|i| { + let dt = one_second * i as u32; + timestamps.add(i, t0 + dt); + PacketTimestamp { + packet_number: i, + timestamp: dt, + } + }) + .collect(); + + let mut buf = bytes::BytesMut::new(); + + Ack::encode_timestamps(×tamps, 12, &mut buf, 0, 2); + let decoder = AckTimestampIter::new(12, 0, &buf); + let got: Vec<_> = decoder.collect(); + + assert_eq!(2, got.len()); + assert_eq!( + expect[expect.len() - 2..expect.len()], // the last 2 values + got.into_iter().rev().collect::>() + ); + } + } } diff --git a/quinn-proto/src/lib.rs b/quinn-proto/src/lib.rs index bb39101b7..fd6f2e393 100644 --- a/quinn-proto/src/lib.rs +++ b/quinn-proto/src/lib.rs @@ -49,8 +49,8 @@ pub use crate::connection::{ mod config; pub use config::{ - AckFrequencyConfig, ClientConfig, ConfigError, EndpointConfig, IdleTimeout, MtuDiscoveryConfig, - ServerConfig, TransportConfig, + AckFrequencyConfig, AckTimestampsConfig, ClientConfig, ConfigError, EndpointConfig, + IdleTimeout, MtuDiscoveryConfig, ServerConfig, TransportConfig, }; pub mod crypto; diff --git a/quinn-proto/src/packet.rs b/quinn-proto/src/packet.rs index ee1babcc7..3c32e094e 100644 --- a/quinn-proto/src/packet.rs +++ b/quinn-proto/src/packet.rs @@ -969,7 +969,6 @@ mod tests { for byte in &buf { print!("{byte:02x}"); } - println!(); assert_eq!( buf[..], hex!( diff --git a/quinn-proto/src/transport_parameters.rs b/quinn-proto/src/transport_parameters.rs index c09c41f97..e6172c15f 100644 --- a/quinn-proto/src/transport_parameters.rs +++ b/quinn-proto/src/transport_parameters.rs @@ -18,7 +18,7 @@ use crate::{ cid_generator::ConnectionIdGenerator, cid_queue::CidQueue, coding::{BufExt, BufMutExt, UnexpectedEnd}, - config::{EndpointConfig, ServerConfig, TransportConfig}, + config::{AckTimestampsConfig, EndpointConfig, ServerConfig, TransportConfig}, shared::ConnectionId, ResetToken, Side, TransportError, VarInt, LOC_CID_COUNT, MAX_CID_SIZE, MAX_STREAM_COUNT, RESET_TOKEN_SIZE, TIMER_GRANULARITY, @@ -99,6 +99,9 @@ macro_rules! make_struct { pub(crate) stateless_reset_token: Option, /// The server's preferred address for communication after handshake completion pub(crate) preferred_address: Option, + + + pub(crate) ack_timestamps_cfg: AckTimestampsConfig, } // We deliberately don't implement the `Default` trait, since that would be public, and @@ -120,6 +123,7 @@ macro_rules! make_struct { retry_src_cid: None, stateless_reset_token: None, preferred_address: None, + ack_timestamps_cfg: AckTimestampsConfig::default(), } } } @@ -160,6 +164,9 @@ impl TransportParameters { min_ack_delay: Some( VarInt::from_u64(u64::try_from(TIMER_GRANULARITY.as_micros()).unwrap()).unwrap(), ), + + ack_timestamps_cfg: config.ack_timestamps_config, + ..Self::default() } } @@ -349,6 +356,20 @@ impl TransportParameters { w.write_var(x.size() as u64); w.write(x); } + + // The below 2 fields are for the implementation of + // https://www.ietf.org/archive/id/draft-smith-quic-receive-ts-00.html#name-extension-negotiation + // Values of 0x00f0 and 0x00f1 arbitrarily chosen. + if let Some(max) = self.ack_timestamps_cfg.max_timestamps_per_ack { + w.write_var(0x00f0); + w.write_var(max.size() as u64); + w.write(max); + + let exponent = self.ack_timestamps_cfg.exponent; + w.write_var(0x00f1); + w.write_var(exponent.size() as u64); + w.write(exponent); + } } /// Decode `TransportParameters` from buffer @@ -413,6 +434,22 @@ impl TransportParameters { _ => return Err(Error::Malformed), }, 0xff04de1b => params.min_ack_delay = Some(r.get().unwrap()), + + 0x00f0 => { + if len > 8 || params.ack_timestamps_cfg.max_timestamps_per_ack.is_some() { + return Err(Error::Malformed); + } + params + .ack_timestamps_cfg + .max_timestamps_per_ack(r.get().unwrap()); + } + 0x00f1 => { + if len > 8 { + return Err(Error::Malformed); + } + params.ack_timestamps_cfg.exponent(r.get().unwrap()); + } + _ => { macro_rules! parse { {$($(#[$doc:meta])* $name:ident ($code:expr) = $default:expr,)*} => { @@ -464,6 +501,11 @@ impl TransportParameters { return Err(Error::IllegalValue); } + // https://www.ietf.org/archive/id/draft-smith-quic-receive-ts-00.html#name-extension-negotiation + if params.ack_timestamps_cfg.exponent.0 > 20 { + return Err(Error::IllegalValue); + } + Ok(params) } } @@ -499,6 +541,11 @@ mod test { }), grease_quic_bit: true, min_ack_delay: Some(2_000u32.into()), + + ack_timestamps_cfg: AckTimestampsConfig { + max_timestamps_per_ack: Some(10u32.into()), + exponent: 2u32.into(), + }, ..TransportParameters::default() }; params.write(&mut buf);