Skip to content

Commit

Permalink
WIP: implement acknowledgement frequency
Browse files Browse the repository at this point in the history
Sponsored by Stormshield
  • Loading branch information
aochagavia committed Apr 19, 2023
1 parent aaa58fc commit 8830d95
Show file tree
Hide file tree
Showing 9 changed files with 867 additions and 87 deletions.
164 changes: 143 additions & 21 deletions quinn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,13 @@ pub struct Connection {
path_response: Option<PathResponse>,
close: bool,

//
// ACK frequency
//
last_ack_frequency_frame: Option<u64>,
max_ack_delay: Duration,
peer_max_ack_delay: Duration,

//
// Loss Detection
//
Expand Down Expand Up @@ -301,6 +308,10 @@ impl Connection {
path_response: None,
close: false,

last_ack_frequency_frame: None,
max_ack_delay: get_max_ack_delay(&TransportParameters::default()),
peer_max_ack_delay: get_max_ack_delay(&TransportParameters::default()),

pto_count: 0,

app_limited: false,
Expand Down Expand Up @@ -471,8 +482,10 @@ impl Connection {
}

// If we need to send a probe, make sure we have something to send.
for space in SpaceId::iter() {
self.spaces[space].maybe_queue_probe(&self.streams);
for (space, allow_immediate_ack) in
SpaceId::iter().zip([false, false, self.peer_supports_immediate_ack()])
{
self.spaces[space].maybe_queue_probe(allow_immediate_ack, &self.streams);
}

// Check whether we need to send a close message
Expand Down Expand Up @@ -528,7 +541,8 @@ impl Connection {
}

let mut ack_eliciting = !self.spaces[space_id].pending.is_empty(&self.streams)
|| self.spaces[space_id].ping_pending;
|| self.spaces[space_id].ping_pending
|| self.spaces[space_id].immediate_ack_pending;
if space_id == SpaceId::Data {
ack_eliciting |= self.can_send_1rtt();
}
Expand Down Expand Up @@ -687,6 +701,7 @@ impl Connection {
// have gotten any other ACK for the data earlier on.
if !self.spaces[space_id].pending_acks.ranges().is_empty() {
Self::populate_acks(
now,
self.receiving_ecn,
&mut SentFrames::default(),
&mut self.spaces[space_id],
Expand Down Expand Up @@ -733,7 +748,8 @@ impl Connection {
break;
}

let sent = self.populate_packet(space_id, &mut buf, buf_capacity - builder.tag_len);
let sent =
self.populate_packet(now, space_id, &mut buf, buf_capacity - builder.tag_len);

// ACK-only packets should only be sent when explicitly allowed. If we write them due
// to any other reason, there is a bug which leads to one component announcing write
Expand All @@ -751,6 +767,7 @@ impl Connection {

if sent.largest_acked.is_some() {
self.spaces[space_id].pending_acks.acks_sent();
self.timers.stop(Timer::MaxAckDelay);
}

// Keep information about the packet around until it gets finalized
Expand Down Expand Up @@ -802,14 +819,21 @@ impl Connection {

// We implement MTU probes as ping packets padded up to the probe size
buf.write(frame::Type::PING);
self.stats.frame_tx.ping += 1;

// If supported by the peer, we want no delays to the probe's ACK
if self.peer_supports_immediate_ack() {
buf.write(frame::Type::IMMEDIATE_ACK);
self.stats.frame_tx.immediate_ack += 1;
}

builder.pad_to(probe_size);
let sent_frames = SentFrames {
non_retransmits: true,
..Default::default()
};
builder.finish_and_track(now, self, Some(sent_frames), &mut buf);

self.stats.frame_tx.ping += 1;
self.stats.path.sent_plpmtud_probes += 1;
num_datagrams = 1;

Expand Down Expand Up @@ -997,6 +1021,13 @@ impl Connection {
.push_back(EndpointEventInner::NeedIdentifiers(now, num_new_cid));
}
}
Timer::MaxAckDelay => {
trace!("max ack delay reached");
// This timer is only armed in the Data space
self.spaces[SpaceId::Data]
.pending_acks
.on_max_ack_delay_timeout()
}
}
}
}
Expand Down Expand Up @@ -1237,7 +1268,7 @@ impl Connection {
Duration::from_micros(0)
} else {
cmp::min(
self.max_ack_delay(),
self.peer_max_ack_delay,
Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
)
};
Expand Down Expand Up @@ -1535,7 +1566,7 @@ impl Connection {
return result;
}
// Include max_ack_delay and backoff for ApplicationData.
duration += self.max_ack_delay() * backoff;
duration += self.peer_max_ack_delay * backoff;
}
let last_ack_eliciting = match self.spaces[space].time_of_last_ack_eliciting_packet {
Some(time) => time,
Expand Down Expand Up @@ -1597,7 +1628,7 @@ impl Connection {
fn pto(&self, space: SpaceId) -> Duration {
let max_ack_delay = match space {
SpaceId::Initial | SpaceId::Handshake => Duration::new(0, 0),
SpaceId::Data => self.max_ack_delay(),
SpaceId::Data => self.peer_max_ack_delay,
};
self.path.rtt.pto_base() + max_ack_delay
}
Expand All @@ -1617,7 +1648,12 @@ impl Connection {
self.permit_idle_reset = true;
self.receiving_ecn |= ecn.is_some();
if let Some(x) = ecn {
self.spaces[space_id].ecn_counters += x;
let space = &mut self.spaces[space_id];
space.ecn_counters += x;

if x.is_ce() {
space.pending_acks.set_immediate_ack_required();
}
}

let packet = match packet {
Expand Down Expand Up @@ -1731,6 +1767,7 @@ impl Connection {
preferred_address: None,
retry_src_cid: None,
stateless_reset_token: None,
min_ack_delay: None,
ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
max_ack_delay: TransportParameters::default().max_ack_delay,
..params
Expand Down Expand Up @@ -2402,9 +2439,13 @@ impl Connection {
}
}
}
self.spaces[packet.header.space()]
.pending_acks
.packet_received(ack_eliciting);

if ack_eliciting {
// In the initial and handshake spaces, ACKs must be sent immediately
self.spaces[packet.header.space()]
.pending_acks
.set_immediate_ack_required();
}

self.write_crypto();
Ok(())
Expand Down Expand Up @@ -2504,7 +2545,11 @@ impl Connection {
if remote == self.path.remote {
// PATH_CHALLENGE on active path, possible off-path packet forwarding
// attack. Send a non-probing packet to recover the active path.
self.ping();
if self.peer_supports_immediate_ack() {
self.immediate_ack();
} else {
self.ping();
}
}
}
Frame::PathResponse(token) => {
Expand Down Expand Up @@ -2654,6 +2699,48 @@ impl Connection {
self.events.push_back(Event::DatagramReceived);
}
}
Frame::AckFrequency(ack_frequency) => {
if self
.last_ack_frequency_frame
.map_or(true, |sequence| ack_frequency.sequence > sequence)
{
// This frame can only be sent in the Data space
let space = &mut self.spaces[SpaceId::Data];

// Update max_ack_delay
let max_ack_delay =
Duration::from_micros(ack_frequency.request_max_ack_delay);
if max_ack_delay < TIMER_GRANULARITY {
return Err(TransportError::PROTOCOL_VIOLATION(
"Request Max Ack Delay in ACK_FREQUENCY frame is less than min_ack_delay",
));
}
self.max_ack_delay = max_ack_delay;
if let Some(timeout) =
space.pending_acks.max_ack_delay_timeout(max_ack_delay)
{
// Update the timeout if necessary
self.timers.set(Timer::MaxAckDelay, timeout);
}

// Update ack frequency params
space
.pending_acks
.set_reordering_threshold(ack_frequency.reordering_threshold);
space
.pending_acks
.set_ack_eliciting_threshold(ack_frequency.ack_eliciting_threshold);

// Keep track of the frame number so we can ignore older out-of-order frames
self.last_ack_frequency_frame = Some(ack_frequency.sequence);
}
}
Frame::ImmediateAck => {
// This frame can only be sent in the Data space
self.spaces[SpaceId::Data]
.pending_acks
.set_immediate_ack_required();
}
Frame::HandshakeDone => {
if self.side.is_server() {
return Err(TransportError::PROTOCOL_VIOLATION(
Expand All @@ -2667,9 +2754,14 @@ impl Connection {
}
}

self.spaces[SpaceId::Data]
let space = &mut self.spaces[SpaceId::Data];
if space
.pending_acks
.packet_received(ack_eliciting);
.packet_received(now, number, ack_eliciting, &space.dedup)
{
self.timers
.set(Timer::MaxAckDelay, now + self.max_ack_delay);
}

// Issue stream ID credit due to ACKs of outgoing finish/resets and incoming finish/resets
// on stopped streams. Incoming finishes/resets on open streams are not handled here as they
Expand Down Expand Up @@ -2786,6 +2878,7 @@ impl Connection {

fn populate_packet(
&mut self,
now: Instant,
space_id: SpaceId,
buf: &mut Vec<u8>,
max_size: usize,
Expand All @@ -2811,10 +2904,25 @@ impl Connection {
self.stats.frame_tx.ping += 1;
}

// IMMEDIATE_ACK
if mem::replace(&mut space.immediate_ack_pending, false) {
trace!("IMMEDIATE_ACK");
buf.write(frame::Type::IMMEDIATE_ACK);
sent.non_retransmits = true;
self.stats.frame_tx.immediate_ack += 1;
}

// ACK
if space.pending_acks.can_send() {
debug_assert!(!space.pending_acks.ranges().is_empty());
Self::populate_acks(self.receiving_ecn, &mut sent, space, buf, &mut self.stats);
Self::populate_acks(
now,
self.receiving_ecn,
&mut sent,
space,
buf,
&mut self.stats,
);
}

// PATH_CHALLENGE
Expand Down Expand Up @@ -2956,6 +3064,7 @@ impl Connection {
/// This method assumes ACKs are pending, and should only be called if
/// `!PendingAcks::ranges().is_empty()` returns `true`.
fn populate_acks(
now: Instant,
receiving_ecn: bool,
sent: &mut SentFrames,
space: &mut PacketSpace,
Expand All @@ -2973,7 +3082,7 @@ impl Connection {
};
sent.largest_acked = space.pending_acks.ranges().max();

let delay_micros = space.pending_acks.ack_delay().as_micros() as u64;
let delay_micros = space.pending_acks.ack_delay(now).as_micros() as u64;

// TODO: This should come frome `TransportConfig` if that gets configurable
let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
Expand Down Expand Up @@ -3029,6 +3138,7 @@ impl Connection {
retire_prior_to: 0,
}).expect("preferred address CID is the first received, and hence is guaranteed to be legal");
}
self.peer_max_ack_delay = get_max_ack_delay(&params);
self.peer_params = params;
self.path.mtud.on_peer_max_udp_payload_size_received(
u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX),
Expand Down Expand Up @@ -3139,6 +3249,10 @@ impl Connection {
self.key_phase = !self.key_phase;
}

fn peer_supports_immediate_ack(&self) -> bool {
self.peer_params.min_ack_delay.is_some()
}

/// The number of bytes of packets containing retransmittable frames that have not been
/// acknowledged or declared lost.
#[cfg(test)]
Expand Down Expand Up @@ -3198,6 +3312,14 @@ impl Connection {
.push_back(EndpointEventInner::NeedIdentifiers(now, n));
}

/// Send an IMMEDIATE_ACK frame to the remote endpoint
///
/// According to the spec, this will result in an error if the remote endpoint does not support
/// the Acknowledgement Frequency extension
pub fn immediate_ack(&mut self) {
self.spaces[self.highest_space].immediate_ack_pending = true;
}

/// Check the current active remote CID sequence
#[cfg(test)]
pub(crate) fn active_rem_cid_seq(&self) -> u64 {
Expand All @@ -3210,10 +3332,6 @@ impl Connection {
self.path.current_mtu()
}

fn max_ack_delay(&self) -> Duration {
Duration::from_micros(self.peer_params.max_ack_delay.0 * 1000)
}

/// Whether we have 1-RTT data to send
///
/// See also `self.space(SpaceId::Data).can_send()`
Expand Down Expand Up @@ -3444,6 +3562,10 @@ fn instant_saturating_sub(x: Instant, y: Instant) -> Duration {
}
}

fn get_max_ack_delay(params: &TransportParameters) -> Duration {
Duration::from_micros(params.max_ack_delay.0 * 1000)
}

// Prevents overflow and improves behavior in extreme circumstances
const MAX_BACKOFF_EXPONENT: u32 = 16;
// Minimal remaining size to allow packet coalescing
Expand Down
Loading

0 comments on commit 8830d95

Please sign in to comment.