Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
eauxxs committed Dec 29, 2023
1 parent 70ad8da commit 45abf2a
Show file tree
Hide file tree
Showing 6 changed files with 258 additions and 134 deletions.
6 changes: 0 additions & 6 deletions clash_lib/src/app/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use serde::Serialize;
use tokio::sync::broadcast::Sender;

use tracing::debug;
use tracing::Level;
use tracing_appender::non_blocking::NonBlocking;
use tracing_appender::non_blocking::WorkerGuard;
use tracing_oslog::OsLogger;
Expand Down Expand Up @@ -104,11 +103,6 @@ pub fn setup_logging(
)
.from_env_lossy();

// let trace = tracing_subscriber::filter::targets::Targets::new()
// .with_target("quinn", Level::TRACE)
// .with_target("quinn_proto", Level::TRACE)
// .with_target("clash", Level::TRACE);

let jaeger = if let Ok(jager_endpoint) = std::env::var("JAGER_ENDPOINT") {
global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());

Expand Down
127 changes: 127 additions & 0 deletions clash_lib/src/proxy/hysteria2/codec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
use std::io::ErrorKind;

use bytes::{Buf, BufMut, BytesMut};
use quinn_proto::{coding::Codec, VarInt};
use rand::distributions::Alphanumeric;
use tokio_util::codec::{Decoder, Encoder};

use crate::session::SocksAddr;

pub struct Hy2TcpCodec;

/// ### format
///
/// ```text
/// [uint8] Status (0x00 = OK, 0x01 = Error)
/// [varint] Message length
/// [bytes] Message string
/// [varint] Padding length
/// [bytes] Random padding
/// ```
#[derive(Debug)]
pub struct Hy2TcpResp {
pub status: u8,
pub msg: String,
}

impl Decoder for Hy2TcpCodec {
type Error = std::io::Error;
type Item = Hy2TcpResp;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if !src.has_remaining() {
return Err(ErrorKind::UnexpectedEof.into());
}
let status = src.get_u8();
let msg_len = VarInt::decode(src)
.map_err(|_| ErrorKind::InvalidData)?
.into_inner() as usize;

if src.remaining() < msg_len {
return Err(ErrorKind::UnexpectedEof.into());
}

let msg: Vec<u8> = src.split_to(msg_len).into();
let msg: String =
String::from_utf8(msg).map_err(|e| std::io::Error::new(ErrorKind::InvalidData, e))?;

let padding_len = VarInt::decode(src)
.map_err(|_| ErrorKind::UnexpectedEof)?
.into_inner() as usize;

if src.remaining() < padding_len {
return Err(ErrorKind::UnexpectedEof.into());
}
src.advance(padding_len);

Ok(Hy2TcpResp { status, msg }.into())
}
}

#[inline]
pub fn padding(range: std::ops::RangeInclusive<u32>) -> Vec<u8> {
use rand::Rng;
let mut rng = rand::thread_rng();
let len = rng.gen_range(range) as usize;
rng.sample_iter(Alphanumeric).take(len).collect()
}

impl Encoder<&'_ SocksAddr> for Hy2TcpCodec {
type Error = std::io::Error;
fn encode(&mut self, item: &'_ SocksAddr, buf: &mut BytesMut) -> Result<(), Self::Error> {
const REQ_ID: VarInt = VarInt::from_u32(0x401);

let padding = padding(64..=512);
let padding_var = VarInt::from_u32(padding.len() as u32);

let addr = item.to_string().into_bytes();
let addr_var = VarInt::from_u32(addr.len() as u32);

buf.reserve(
var_size(REQ_ID)
+ var_size(padding_var)
+ var_size(addr_var)
+ addr.len()
+ padding.len(),
);

REQ_ID.encode(buf);

addr_var.encode(buf);
buf.put_slice(&addr);

padding_var.encode(buf);
buf.put_slice(&padding);

Ok(())
}
}

/// Compute the number of bytes needed to encode this value
pub fn var_size(var: VarInt) -> usize {
let x = var.into_inner();
if x < 2u64.pow(6) {
1
} else if x < 2u64.pow(14) {
2
} else if x < 2u64.pow(30) {
4
} else if x < 2u64.pow(62) {
8
} else {
unreachable!("malformed VarInt");
}
}

#[test]
fn hy2_resp_parse() {
let mut src = BytesMut::from(&[0x00, 0x03, 0x61, 0x62, 0x63, 0x00][..]);
let msg = Hy2TcpCodec.decode(&mut src).unwrap().unwrap();
assert!(msg.status == 0);
assert!(msg.msg == "abc");

let mut src = BytesMut::from(&[0x01, 0x00, 0x00][..]);
let msg = Hy2TcpCodec.decode(&mut src).unwrap().unwrap();
assert!(msg.status == 0x1);
assert!(msg.msg == "");
}
41 changes: 38 additions & 3 deletions clash_lib/src/proxy/hysteria2/congestion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,24 @@ impl ControllerFactory for DynCongestion {
}
}

struct Burtal;
const SLOT_COUNT: u64 = 5;
const MIN_SAMPLE_COUNT: u8 = 50;
const MIN_ACKRATE: f64 = 0.8;
const CONGESTION_WINDOW_MULTIPLIER: u8 = 2;

#[derive(Copy, Clone)]
struct SlotInfo {
time: u64,
lost: u64,
ack: u64,
}

struct Burtal {
mtu: u16,
slots: [SlotInfo; SLOT_COUNT as usize],
ack_rate: f64,
bps: u64,
}

impl Controller for Burtal {
fn initial_window(&self) -> u64 {
Expand All @@ -41,10 +58,19 @@ impl Controller for Burtal {
fn on_congestion_event(
&mut self,
_now: Instant,
_sent: Instant,
sent: Instant,
_is_persistent_congestion: bool,
_lost_bytes: u64,
) {
let t = sent.elapsed().as_secs();
let idx = (t % SLOT_COUNT) as usize;
if self.slots[idx].time != t {
self.slots[idx].time = t;
self.slots[idx].lost = 0;
self.slots[idx].ack = 0;
} else {
self.slots[idx].lost = 1
}
}

fn on_ack(
Expand Down Expand Up @@ -149,7 +175,16 @@ fn test_dyn() {

println!("{:?}", r.0.read().unwrap().window());

let b = Box::new(Burtal);
let b = Box::new(Burtal {
bps: 0,
ack_rate: 0.0,
mtu: 0,
slots: [SlotInfo {
time: 0,
lost: 0,
ack: 0,
}; 5],
});
*r.0.write().unwrap() = b;

assert!(r.window() == 999);
Expand Down
122 changes: 40 additions & 82 deletions clash_lib/src/proxy/hysteria2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,18 @@ use std::{
sync::{Arc, RwLock},
task::{Context, Poll},
};
mod codec;
mod congestion;
mod salamander;
mod udp_hop;

use bytes::{BufMut, Bytes};
use bytes::Bytes;
use futures::{SinkExt, StreamExt};
use h3::client::SendRequest;
use h3_quinn::OpenStreams;
use quinn::{ClientConfig, Connection, TokioRuntime, VarInt};
use quinn_proto::{coding::Codec, TransportConfig};
use quinn::{ClientConfig, Connection, TokioRuntime};
use quinn_proto::TransportConfig;

use rand::distributions::Alphanumeric;
use rustls::{client::ServerCertVerifier, ClientConfig as RustlsClientConfig};
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
Expand All @@ -36,6 +37,8 @@ use crate::{
};
use tracing::debug;

use self::codec::Hy2TcpCodec;

use super::{converters::hysteria2::PortGenrateor, AnyStream, OutboundHandler, OutboundType};

#[derive(Clone)]
Expand Down Expand Up @@ -195,7 +198,7 @@ impl HystClient {
// )?;

let port_gen = self.opts.ports.as_ref().unwrap().clone();
let udp_hop = udp_hop::UdpHop::new(server_socket_addr.port(), port_gen)?;
let udp_hop = udp_hop::UdpHop::new(server_socket_addr.port(), port_gen, None)?;
quinn::Endpoint::new_with_abstract_socket(
self.ep_config.clone(),
None,
Expand All @@ -204,7 +207,7 @@ impl HystClient {
)?
} else {
let udp = SocketAddr::from(([0, 0, 0, 0], 0));
// bind to port 0, so the OS will choose a random port for us, and specify the fix source ip
// bind to port 0, so the OS will choose a random port for us
let udp_socket = std::net::UdpSocket::bind::<SocketAddr>(udp)?;

quinn::Endpoint::new(
Expand Down Expand Up @@ -240,7 +243,7 @@ impl HystClient {
let req = http::Request::post("https://hysteria/auth")
.header("Hysteria-Auth", passwd)
.header("Hysteria-CC-RX", "0")
.header("Hysteria-Padding", padding(64..=512))
.header("Hysteria-Padding", codec::padding(64..=512))
.body(())
.unwrap();
let mut r = sender.send_request(req).await?;
Expand Down Expand Up @@ -322,32 +325,39 @@ impl OutboundHandler for HystClient {
}
};

// tracing::trace!("conn patch {} ", authed_conn.)
let (mut tx, mut rx) = authed_conn.open_bi().await?;

let (mut tx, mut rx) = authed_conn.open_bi().await.unwrap();
tx.write_all(&make_tcp_request(&sess.destination)).await?;
tokio_util::codec::FramedWrite::new(&mut tx, Hy2TcpCodec)
.send(&sess.destination)
.await?;

let mut buf = [0u8; 2000];
let n = rx.read(&mut buf).await?.ok_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!(
"server response nothing after send tcp request: addr: {}",
self.opts.addr
),
)
})?;

// todo read response
if n < 2 && buf[0] != 0x00 {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!(
"server response invalid tcp request: addr: {}",
match tokio_util::codec::FramedRead::new(&mut rx, Hy2TcpCodec)
.next()
.await
{
Some(Ok(resp)) => {
if resp.status != 0x00 {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!(
"server response error: addr: {}, msg: {:?}",
self.opts.addr, resp.msg
),
));
} else {
debug!(
"hysteria2 tcp request success: status: {}, msg: {:?}",
resp.status, resp.msg
);
}
}
_ => {
return Err(std::io::Error::other(format!(
"not receive hysteria2 response from server: {}",
self.opts.addr
),
));
}
)));
}
};

let hyster_client = HystStream { send: tx, recv: rx };
Ok(Box::new(ChainedStreamWrapper::new(Box::new(hyster_client))))
Expand Down Expand Up @@ -411,55 +421,3 @@ impl AsyncWrite for HystStream {
Pin::new(&mut self.get_mut().send).poll_shutdown(cx)
}
}

#[inline]
fn padding(range: std::ops::RangeInclusive<u32>) -> Vec<u8> {
use rand::Rng;
let mut rng = rand::thread_rng();
let len = rng.gen_range(range) as usize;
rng.sample_iter(Alphanumeric).take(len).collect()
}

/// hysteria2 TCPRequest format
///
/// varint is **quic-varint**
/// ```text
/// [varint] 0x401 (TCPRequest ID)
/// [varint] Address length
/// [bytes] Address string (host:port)
/// [varint] Padding length
/// [bytes] Random padding
/// ```
fn make_tcp_request(dst: &SocksAddr) -> Bytes {
let padding = padding(64..=512);

let dst = dst.to_string();
let addr = dst.as_bytes();
let mut buf = bytes::BytesMut::with_capacity(addr.len() + padding.len() + 20);

VarInt::from_u32(0x401).encode(&mut buf);
VarInt::from(addr.len() as u32).encode(&mut buf);
buf.put_slice(addr);
VarInt::from_u32(padding.len() as u32).encode(&mut buf);
buf.put_slice(&padding);

buf.freeze()
}

#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_quic_varint() {
let x = quinn::VarInt::from_u32(0x401);
let mut buf = bytes::BytesMut::with_capacity(1000);
x.encode(&mut buf);
println!("{:?}", buf.freeze());
}

#[test]
fn test_padding() {
let p = padding(100..=200);
println!("{:?}", std::str::from_utf8(&p));
}
}
Loading

0 comments on commit 45abf2a

Please sign in to comment.