Skip to content

Commit

Permalink
Tweak flow control
Browse files Browse the repository at this point in the history
- Tweak initialization of flow control
- Tweak API for flow control configurations
- Add more comments about flow control
- Fix the unit test `client_send_multi_requests_blocked_by_conn_flow_control`
  • Loading branch information
iyangsj committed May 27, 2024
1 parent f449431 commit 640154e
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 63 deletions.
11 changes: 11 additions & 0 deletions include/tquic.h
Original file line number Diff line number Diff line change
Expand Up @@ -336,21 +336,29 @@ void quic_config_set_send_udp_payload_size(struct quic_config_t *config, uintptr
/**
* Set the `initial_max_data` transport parameter. It means the initial
* value for the maximum amount of data that can be sent on the connection.
* The value is capped by the setting `max_connection_window`.
* The default value is `10485760`.
*/
void quic_config_set_initial_max_data(struct quic_config_t *config, uint64_t v);

/**
* Set the `initial_max_stream_data_bidi_local` transport parameter.
* The value is capped by the setting `max_stream_window`.
* The default value is `5242880`.
*/
void quic_config_set_initial_max_stream_data_bidi_local(struct quic_config_t *config, uint64_t v);

/**
* Set the `initial_max_stream_data_bidi_remote` transport parameter.
* The value is capped by the setting `max_stream_window`.
* The default value is `2097152`.
*/
void quic_config_set_initial_max_stream_data_bidi_remote(struct quic_config_t *config, uint64_t v);

/**
* Set the `initial_max_stream_data_uni` transport parameter.
* The value is capped by the setting `max_stream_window`.
* The default value is `1048576`.
*/
void quic_config_set_initial_max_stream_data_uni(struct quic_config_t *config, uint64_t v);

Expand Down Expand Up @@ -436,11 +444,14 @@ void quic_config_set_multipath_algorithm(struct quic_config_t *config,

/**
* Set the maximum size of the connection flow control window.
* The default value is MAX_CONNECTION_WINDOW (15 MB).
*/
void quic_config_set_max_connection_window(struct quic_config_t *config, uint64_t v);

/**
* Set the maximum size of the stream flow control window.
* The value should not be greater than the setting `max_connection_window`.
* The default value is MAX_STREAM_WINDOW (6 MB).
*/
void quic_config_set_max_stream_window(struct quic_config_t *config, uint64_t v);

Expand Down
72 changes: 44 additions & 28 deletions src/connection/flowcontrol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,23 @@
use std::time::Duration;
use std::time::Instant;

/// A flow control implementation that allows the size of the receive buffer to
/// be auto-tuned.
///
/// The basic idea is to start with relatively small initial window size, and
/// then grow the window as necessary. For simplicity, auto-tuning may increase
/// the window size, but never decreases (contrast with congestion control).
///
/// The ideal size of the window is one that is large enough that it can
/// encompass the bandwidth delay product (BDP) to the peer.
///
/// The algorithm will compare the interval between successive flow control
/// window updates to the smoothed RTT. If the flow control window is too small
/// to keep up with the BDP, there will be a window update each RTT.
/// Alternatively, when the window is sized to the ideal, window updates can be
/// expected to occur with frequency corresponding to more than the 1 RTT
/// indicative of blocking, but not too much more. The default target chosen for
/// auto-tuning corresponds to 2 RTTs.
#[derive(Default, Debug)]
pub struct FlowControl {
/// Number of bytes consumed (cumulative).
Expand Down Expand Up @@ -47,9 +64,9 @@ pub struct FlowControl {
}

impl FlowControl {
pub fn new(max_data: u64, window: u64, max_window: u64) -> FlowControl {
pub fn new(window: u64, max_window: u64) -> FlowControl {
FlowControl {
max_data,
max_data: window,
window,
max_window,
..FlowControl::default()
Expand Down Expand Up @@ -86,7 +103,7 @@ impl FlowControl {
/// Return true if the available window is smaller than the half
/// of the current window.
pub fn should_send_max_data(&self) -> bool {
(self.max_data - self.read_off) < (self.window / 2)
(self.max_data - self.read_off) * 2 < self.window
}

/// Get the next max_data limit which will be sent to the peer
Expand All @@ -102,7 +119,7 @@ impl FlowControl {
}

/// Adjust the window size automatically. If the last update
/// is within 2 * srtt, increase the window size by 1.5, but
/// is within 2 * srtt, increase the window size by 2, but
/// not exceeding the max_window.
pub fn autotune_window(&mut self, now: Instant, srtt: Duration) {
if let Some(last_updated) = self.last_updated {
Expand All @@ -125,10 +142,10 @@ mod tests {

#[test]
fn fc_new() {
let flow_control = FlowControl::new(100, 10, 200);
let flow_control = FlowControl::new(100, 200);

assert_eq!(flow_control.max_data(), 100);
assert_eq!(flow_control.window(), 10);
assert_eq!(flow_control.window(), 100);
assert_eq!(flow_control.max_window, 200);
assert_eq!(flow_control.read_off, 0);
assert_eq!(flow_control.recv_off, 0);
Expand All @@ -137,7 +154,7 @@ mod tests {

#[test]
fn fc_increase_recv_off() {
let mut fc = FlowControl::new(100, 10, 200);
let mut fc = FlowControl::new(100, 200);

for (delta, total) in [(10, 10), (20, 30), (30, 60)] {
fc.increase_recv_off(delta);
Expand All @@ -147,19 +164,19 @@ mod tests {

#[test]
fn fc_update_logic() {
let mut fc = FlowControl::new(100, 10, 200);
let mut fc = FlowControl::new(100, 200);

for (read_delta, read_off, should_send, max_data_next) in [
// 1. Initial state
(0, 0, false, 10),
// 2. Read 95 bytes
// available window is 5 == window / 2, not need to send max_data,
// max_data_next is 105 = read_off(95) + window(10)
(95, 95, false, 105),
(0, 0, false, 100),
// 2. Read 50 bytes
// available window is 50 == window / 2, not need to send max_data,
// max_data_next is 150 = read_off(50) + window(100)
(50, 50, false, 150),
// 3. Read 1 bytes
// available window is 4 < window / 2, need to send max_data
// max_data_next is 106 = read_off(96) + window(10)
(1, 96, true, 106),
// available window is 49 < window / 2, need to send max_data
// max_data_next is 151 = read_off(51) + window(100)
(1, 51, true, 151),
] {
fc.increase_read_off(read_delta);
assert_eq!(fc.read_off, read_off);
Expand All @@ -168,7 +185,7 @@ mod tests {
}

fc.update_max_data(Instant::now());
assert_eq!(fc.max_data(), 106);
assert_eq!(fc.max_data(), 151);
}

#[test]
Expand All @@ -177,18 +194,18 @@ mod tests {
let max_window = 30;
let now = Instant::now();
let srtt = Duration::from_millis(100);
let mut fc = FlowControl::new(100, window, max_window);
let mut fc = FlowControl::new(window, max_window);

// 1. Read 96 bytes, available window is 4 < window / 2, need to send max_data.
let read_off = 96;
// 1. Read 6 bytes, available window is 4 < window / 2, need to send max_data.
let read_off = 6;
fc.increase_read_off(read_off);
assert_eq!(fc.should_send_max_data(), true);

// max_data_next = read_off(96) + window(10) = 106
// max_data_next = read_off(6) + window(10) = 16
let max_data_next = fc.max_data_next();
assert_eq!(max_data_next, read_off + fc.window);

// 2. Apply the new max_data limit(106), last_updated is set to now.
// 2. Apply the new max_data limit(16), last_updated is set to now.
fc.update_max_data(now);
assert_eq!(fc.max_data(), max_data_next);

Expand All @@ -197,16 +214,16 @@ mod tests {
// Window auto-tuned to 20
assert_eq!(fc.window, window * 2);

// 4. Read 1 byte, available window is 9 < window / 2, need to send max_data.
let read_off_delta = 1;
// 4. Read 5 byte, available window is 9 < window / 2, need to send max_data.
let read_off_delta = 5;
fc.increase_read_off(read_off_delta);
assert_eq!(fc.should_send_max_data(), true);

// max_data_next = read_off(97) + window(20) = 117
// max_data_next = read_off(11) + window(20) = 31
let max_data_next = fc.max_data_next();
assert_eq!(max_data_next, read_off + read_off_delta + fc.window);

// 5. Apply the new max_data limit(117), last_updated is set to now.
// 5. Apply the new max_data limit(31), last_updated is set to now.
fc.update_max_data(now);
assert_eq!(fc.max_data(), max_data_next);

Expand All @@ -219,8 +236,7 @@ mod tests {

#[test]
fn fc_ensure_window_lower_bound() {
let min_window = 10;
let mut fc = FlowControl::new(100, 10, 200);
let mut fc = FlowControl::new(10, 200);

for (min_window, window) in [
// min_window < window, unchanged
Expand Down
44 changes: 17 additions & 27 deletions src/connection/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,6 @@ impl StreamMap {

flow_control: flowcontrol::FlowControl::new(
local_params.initial_max_data,
cmp::min(
local_params.initial_max_data / 2 * 3,
DEFAULT_CONNECTION_WINDOW,
),
max_connection_window,
),

Expand Down Expand Up @@ -2026,11 +2022,7 @@ impl RecvBuf {
/// Create a new receive-side stream buffer with given flow control limits.
fn new(max_data: u64, max_window: u64) -> RecvBuf {
RecvBuf {
flow_control: flowcontrol::FlowControl::new(
max_data,
cmp::min(max_data, DEFAULT_STREAM_WINDOW),
max_window,
),
flow_control: flowcontrol::FlowControl::new(max_data, max_window),
..RecvBuf::default()
}
}
Expand Down Expand Up @@ -3739,10 +3731,10 @@ mod tests {
assert!(map.readable.contains(&0));
assert!(map.stream_shutdown(0, Shutdown::Read, 10).is_ok());

// init_max_data: 14, window: 21, read_off: 10
// init_max_data: 14, window: 14, read_off: 10
// available_window: 4 < window / 2, should update max_data.
assert_eq!(map.flow_control.max_data(), 14);
assert_eq!(map.flow_control.max_data_next(), 31);
assert_eq!(map.flow_control.max_data_next(), 24);
assert!(map.flow_control.should_send_max_data());
assert!(map.rx_almost_full);
}
Expand Down Expand Up @@ -3888,7 +3880,7 @@ mod tests {
assert_eq!(map.concurrency_control, ConcurrencyControl::new(5, 5));

// Check connection-level flow control
assert_eq!(map.flow_control.window(), 150);
assert_eq!(map.flow_control.window(), 100);
assert_eq!(map.flow_control.max_data(), 100);
assert!(
!map.flow_control.should_send_max_data(),
Expand Down Expand Up @@ -4959,8 +4951,7 @@ mod tests {
};

let mut map = StreamMap::new(true, 50, 50, local_tp);
// init window = initial_max_data /2 * 3
assert_eq!(map.flow_control.window(), 30);
assert_eq!(map.flow_control.window(), 20);
assert_eq!(map.flow_control.max_data(), 20);

// 1. Receive a RESET_STREAM frame for a stream which has received some data
Expand All @@ -4973,10 +4964,10 @@ mod tests {
);
assert_eq!(map.on_reset_stream_frame_received(4, 0, 4), Ok(()));
// map.flow_control.consumed = 4
assert_eq!(map.flow_control.max_data_next(), 34);
assert_eq!(map.flow_control.max_data_next(), 24);
assert!(
!map.flow_control.should_send_max_data(),
"available_window = 16 > 15 = window/2, not update max_data"
"available_window = 16 > 10 = window/2, not update max_data"
);
assert!(!map.rx_almost_full);

Expand All @@ -4985,15 +4976,15 @@ mod tests {
// stream_id: 8, max received offset: 1, final size: 2.
let stream = map.get_or_create(8, false).unwrap();
assert_eq!(
stream.recv.write(0, Bytes::from_static(b"O"), false),
stream.recv.write(0, Bytes::from_static(b"QUICQUIC"), false),
Ok(())
);
assert_eq!(map.on_reset_stream_frame_received(8, 0, 2), Ok(()));
// map.flow_control.consumed = 6
assert_eq!(map.flow_control.max_data_next(), 36);
assert_eq!(map.on_reset_stream_frame_received(8, 0, 8), Ok(()));
// map.flow_control.consumed = 12
assert_eq!(map.flow_control.max_data_next(), 32);
assert!(
map.flow_control.should_send_max_data(),
"available_window = 14 < 15 = window/2, update max_data"
"available_window = 8 < 10 = window/2, update max_data"
);
assert!(map.rx_almost_full);
}
Expand Down Expand Up @@ -5334,8 +5325,7 @@ mod tests {
initial_max_streams_uni: 5,
};
let mut map = StreamMap::new(true, 50, 50, local_tp);
// init window = initial_max_data /2 * 3
assert_eq!(map.flow_control.window(), 30);
assert_eq!(map.flow_control.window(), 20);
assert_eq!(map.flow_control.max_data(), 20);

// Create stream 4
Expand All @@ -5351,19 +5341,19 @@ mod tests {
// map.flow_control.consumed = 4
assert!(
!map.flow_control.should_send_max_data(),
"available_window = 16 > 15 = window/2, not update max_data"
"available_window = 16 > 10 = window/2, not update max_data"
);
assert!(!map.rx_almost_full);

// Receive the second block of data of stream 4, should update max_data
assert_eq!(
map.on_stream_frame_received(4, 4, 2, false, Bytes::from_static(b"GO")),
map.on_stream_frame_received(4, 4, 8, false, Bytes::from_static(b"QUICQUIC")),
Ok(())
);
// map.flow_control.consumed = 6
// map.flow_control.consumed = 12
assert!(
map.flow_control.should_send_max_data(),
"available_window = 14 < 15 = window/2, update max_data"
"available_window = 8 < 10 = window/2, update max_data"
);
assert!(map.rx_almost_full);
}
Expand Down
11 changes: 11 additions & 0 deletions src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,24 +165,32 @@ pub extern "C" fn quic_config_set_send_udp_payload_size(config: &mut Config, v:

/// Set the `initial_max_data` transport parameter. It means the initial
/// value for the maximum amount of data that can be sent on the connection.
/// The value is capped by the setting `max_connection_window`.
/// The default value is `10485760`.
#[no_mangle]
pub extern "C" fn quic_config_set_initial_max_data(config: &mut Config, v: u64) {
config.set_initial_max_data(v);
}

/// Set the `initial_max_stream_data_bidi_local` transport parameter.
/// The value is capped by the setting `max_stream_window`.
/// The default value is `5242880`.
#[no_mangle]
pub extern "C" fn quic_config_set_initial_max_stream_data_bidi_local(config: &mut Config, v: u64) {
config.set_initial_max_stream_data_bidi_local(v);
}

/// Set the `initial_max_stream_data_bidi_remote` transport parameter.
/// The value is capped by the setting `max_stream_window`.
/// The default value is `2097152`.
#[no_mangle]
pub extern "C" fn quic_config_set_initial_max_stream_data_bidi_remote(config: &mut Config, v: u64) {
config.set_initial_max_stream_data_bidi_remote(v);
}

/// Set the `initial_max_stream_data_uni` transport parameter.
/// The value is capped by the setting `max_stream_window`.
/// The default value is `1048576`.
#[no_mangle]
pub extern "C" fn quic_config_set_initial_max_stream_data_uni(config: &mut Config, v: u64) {
config.set_initial_max_stream_data_uni(v);
Expand Down Expand Up @@ -283,12 +291,15 @@ pub extern "C" fn quic_config_set_multipath_algorithm(config: &mut Config, v: Mu
}

/// Set the maximum size of the connection flow control window.
/// The default value is MAX_CONNECTION_WINDOW (15 MB).
#[no_mangle]
pub extern "C" fn quic_config_set_max_connection_window(config: &mut Config, v: u64) {
config.set_max_connection_window(v);
}

/// Set the maximum size of the stream flow control window.
/// The value should not be greater than the setting `max_connection_window`.
/// The default value is MAX_STREAM_WINDOW (6 MB).
#[no_mangle]
pub extern "C" fn quic_config_set_max_stream_window(config: &mut Config, v: u64) {
config.set_max_stream_window(v);
Expand Down
3 changes: 3 additions & 0 deletions src/h3/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3309,6 +3309,9 @@ mod tests {
assert_eq!(s.server_poll(), Ok((stream_id, headers_event)));
assert_eq!(s.server_poll(), Ok((stream_id, Http3Event::Finished)));
assert_eq!(s.server_poll(), Err(Http3Error::Done));

// Server send MAX_DATA
s.move_forward().unwrap();
}

// 4. Server send response headers with FIN for stream 0, 4, 8.
Expand Down
Loading

0 comments on commit 640154e

Please sign in to comment.