diff --git a/clash_lib/src/proxy/hysteria2/congestion.rs b/clash_lib/src/proxy/hysteria2/congestion.rs index 43995fe93..759b983dc 100644 --- a/clash_lib/src/proxy/hysteria2/congestion.rs +++ b/clash_lib/src/proxy/hysteria2/congestion.rs @@ -1,6 +1,6 @@ use std::{ sync::{Arc, RwLock}, - time::Instant, + time::{Duration, Instant}, }; use quinn_proto::congestion::{Bbr, BbrConfig, Controller, ControllerFactory}; @@ -15,9 +15,10 @@ impl ControllerFactory for DynCongestion { } const SLOT_COUNT: u64 = 5; -const MIN_SAMPLE_COUNT: u8 = 50; +const MIN_SAMPLE_COUNT: u64 = 50; const MIN_ACKRATE: f64 = 0.8; -const CONGESTION_WINDOW_MULTIPLIER: u8 = 2; +const CONGESTION_WINDOW_MULTIPLIER: u64 = 2; +const INITIAL_PACKET_SIZE_IPV4: u64 = 1252; #[derive(Copy, Clone)] struct SlotInfo { @@ -26,33 +27,102 @@ struct SlotInfo { ack: u64, } -struct Burtal { - mtu: u16, +pub struct Burtal { + ack: u64, + last_lost_packet_num: u64, slots: [SlotInfo; SLOT_COUNT as usize], ack_rate: f64, bps: u64, + max_datagram_size: u64, + last_send_time: Option, + budget_at_last_sent: u64, + rtt: u64, + in_flight: u64, + send_now: Instant, + + sess: quinn::Connection, +} + +impl Burtal { + pub fn new(bps: u64, sess: quinn::Connection) -> Self { + Self { + sess, + ack: 0, + max_datagram_size: INITIAL_PACKET_SIZE_IPV4, + last_lost_packet_num: 0, + slots: [SlotInfo { + time: 0, + lost: 0, + ack: 0, + }; SLOT_COUNT as usize], + ack_rate: 0.0, + bps, + rtt: 0, + last_send_time: None, + budget_at_last_sent: 0, + in_flight: 0, + send_now: Instant::now(), + } + } + + fn get_bandwith(&self) -> f64 { + self.bps as f64 / self.ack_rate + } } impl Controller for Burtal { fn initial_window(&self) -> u64 { - 0 + self.window() } + // https://github.com/quinn-rs/quinn/blob/55234e178fdca81cd51a5bfb520cb912de14f72e/quinn-proto/src/connection/mod.rs#L641 + // https://github.com/apernet/hysteria/blob/405572dc6e335c29ab28011bcfa9e0db2c45a4b4/core/internal/congestion/brutal/brutal.go#L72 fn window(&self) -> u64 { - 999 + if self.budget_at_last_sent >= self.max_datagram_size || self.last_send_time.is_none() { + if self.rtt == 0 { + return 10240; + } + ((self.bps * self.rtt * CONGESTION_WINDOW_MULTIPLIER) as f64 / self.ack_rate) as u64 + } else { + 0 + } + + // let last_send_time = self.last_send_time.unwrap(); } - fn on_sent(&mut self, _now: Instant, _bytes: u64, _last_packet_number: u64) {} + fn on_sent(&mut self, now: Instant, _bytes: u64, _last_packet_number: u64) { + let max = (2000000.0 * self.get_bandwith() / 1e9).max((10 * self.max_datagram_size) as f64); + let budget = if self.last_send_time.is_none() { + max + } else { + let budget = self.budget_at_last_sent.saturating_add( + now.duration_since(self.last_send_time.unwrap()).as_secs() + * self.get_bandwith() as u64, + ); - fn on_mtu_update(&mut self, _new_mtu: u16) {} + max.min(budget as f64) + }; + + if _bytes > budget as u64 { + self.budget_at_last_sent = 0; + } else { + self.budget_at_last_sent = budget as u64 - _bytes; + } + self.last_send_time = Some(now); + } + + fn on_mtu_update(&mut self, new_mtu: u16) { + self.max_datagram_size = new_mtu as u64; + } fn on_end_acks( &mut self, _now: Instant, - _in_flight: u64, + in_flight: u64, _app_limited: bool, _largest_packet_num_acked: Option, ) { + self.in_flight = in_flight; } fn on_congestion_event( @@ -62,14 +132,39 @@ impl Controller for Burtal { _is_persistent_congestion: bool, _lost_bytes: u64, ) { + let current_lost_packet_num = self.sess.stats().path.lost_packets; let t = sent.elapsed().as_secs(); let idx = (t % SLOT_COUNT) as usize; if self.slots[idx].time != t { self.slots[idx].time = t; - self.slots[idx].lost = 0; - self.slots[idx].ack = 0; + self.slots[idx].lost = current_lost_packet_num - self.last_lost_packet_num; + self.slots[idx].ack = self.ack; + } else { + self.slots[idx].time = t; + self.slots[idx].lost += current_lost_packet_num - self.last_lost_packet_num; + self.ack += self.ack; + } + + self.last_lost_packet_num = current_lost_packet_num; + self.ack = 0; + + let (ack, lost) = + self.slots + .iter() + .filter(|x| x.time < 5) + .fold((0, 0), |(mut ack, mut lost), x| { + ack += x.ack; + lost += x.lost; + (ack, lost) + }); + + self.ack_rate = if ack + lost < MIN_SAMPLE_COUNT { + 1.0 } else { - self.slots[idx].lost = 1 + match ack as f64 / (ack + lost) as f64 { + x if x < MIN_ACKRATE => MIN_ACKRATE, + x => x, + } } } @@ -79,8 +174,10 @@ impl Controller for Burtal { _sent: Instant, _bytes: u64, _app_limited: bool, - _rtt: &quinn_proto::RttEstimator, + rtt: &quinn_proto::RttEstimator, ) { + self.rtt = rtt.get().as_secs(); + self.ack += 1; } fn clone_box(&self) -> Box { @@ -93,6 +190,12 @@ impl Controller for Burtal { } pub struct DynController(Arc>>); +impl DynController { + pub fn set_controller(&self, controller: Box) { + *self.0.write().unwrap() = controller; + } +} + unsafe impl Send for DynController {} impl Controller for DynController { @@ -163,29 +266,3 @@ impl Controller for DynController { self } } - -#[test] -fn test_dyn() { - let r = DynCongestion.build(Instant::now(), 1); - let r = r - .clone_box() - .into_any() - .downcast::() - .unwrap(); - - println!("{:?}", r.0.read().unwrap().window()); - - let b = Box::new(Burtal { - bps: 0, - ack_rate: 0.0, - mtu: 0, - slots: [SlotInfo { - time: 0, - lost: 0, - ack: 0, - }; 5], - }); - *r.0.write().unwrap() = b; - - assert!(r.window() == 999); -} diff --git a/clash_lib/src/proxy/hysteria2/mod.rs b/clash_lib/src/proxy/hysteria2/mod.rs index f3fbb88ac..e2f64b7f6 100644 --- a/clash_lib/src/proxy/hysteria2/mod.rs +++ b/clash_lib/src/proxy/hysteria2/mod.rs @@ -37,7 +37,10 @@ use crate::{ }; use tracing::debug; -use self::codec::Hy2TcpCodec; +use self::{ + codec::Hy2TcpCodec, + congestion::{Burtal, DynController}, +}; use super::{converters::hysteria2::PortGenrateor, AnyStream, OutboundHandler, OutboundType}; @@ -229,6 +232,14 @@ impl HystClient { let (h3_conn, _rx, udp) = Self::auth(&session, &self.opts.passwd).await?; *self.support_udp.write().unwrap() = udp; //todo set congestion controller according to cc_rx + + let any = session + .congestion_state() + .into_any() + .downcast::() + .unwrap(); + any.set_controller(Box::new(Burtal::new(0, session.clone()))); + anyhow::Ok((session, h3_conn)) }