Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(proxy): plain hysteria2 #626

Merged
merged 6 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7,322 changes: 7,322 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions clash/tests/data/config/hysteria2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ proxies:
password: "passwd"
sni: example.com
skip-cert-verify: true
obfs: salamander
obfs-password: "obfs"
# obfs: salamander
# obfs-password: "obfs"

rules:
- MATCH, local
19 changes: 19 additions & 0 deletions clash/tests/data/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,23 @@ services:
- type: bind
source: ./v2ray/key.pem
target: /etc/v2ray/v2ray.key
restart: unless-stopped

hysteria2:
image: tobyxdd/hysteria
network_mode: "host"
command:
- server
- "-c"
- "/etc/hysteria/config.yaml"
volumes:
- type: bind
source: ./hysteria2/config.yaml
target: /etc/hysteria/config.yaml
- type: bind
source: ./v2ray/cert.pem
target: /etc/hysteria/cert.pem
- type: bind
source: ./v2ray/key.pem
target: /etc/hysteria/key.pem
restart: unless-stopped
12 changes: 7 additions & 5 deletions clash_lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,20 +123,22 @@ tor-rtcompat = { version = "0.22", optional = true, default-features = false }
# tuic
tuic = { tag = "v1.3.1", optional = true, git = "https://github.com/Itsusinn/tuic.git" }
tuic-quinn = { tag = "v1.3.1", optional = true, git = "https://github.com/Itsusinn/tuic.git" }
quinn = { version = "0.11", optional = true, default-features = false, features = ["futures-io", "runtime-tokio", "rustls"] }
register-count = { version = "0.1", optional = true }

console-subscriber = { version = "0.4" }
tracing-timing = { version = "0.6" }
criterion = { version = "0.5", features = ["html_reports", "async_tokio"], optional = true }
quinn = { version = "0.11", default-features = false, features = ["futures-io", "runtime-tokio", "rustls"] }

memory-stats = "1.0.0"
# hysteria2
h3 = "0.0.6"
h3-quinn = "0.0.7"
quinn-proto = "0.11.8"
blake2 = "0.10.6"
digest = "0.10.7"

console-subscriber = { version = "0.4" }
tracing-timing = { version = "0.6" }
criterion = { version = "0.5", features = ["html_reports", "async_tokio"], optional = true }
memory-stats = "1.0.0"

[dev-dependencies]
tempfile = "3.13"
mockall = "0.13.0"
Expand Down
1 change: 1 addition & 0 deletions clash_lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ async fn create_components(
let dns_listener =
dns::get_dns_listener(dns_listen, dns_resolver.clone(), &cwd).await;

info!("all components initialized");
Ok(RuntimeComponents {
cache_store,
dns_resolver,
Expand Down
18 changes: 13 additions & 5 deletions clash_lib/src/proxy/hysteria2/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ impl Decoder for Hy2TcpCodec {
type Error = std::io::Error;
type Item = Hy2TcpResp;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
fn decode(
&mut self,
src: &mut BytesMut,
) -> Result<Option<Self::Item>, Self::Error> {
if !src.has_remaining() {
return Err(ErrorKind::UnexpectedEof.into());
}
Expand All @@ -42,8 +45,8 @@ impl Decoder for Hy2TcpCodec {
}

let msg: Vec<u8> = src.split_to(msg_len).into();
let msg: String =
String::from_utf8(msg).map_err(|e| std::io::Error::new(ErrorKind::InvalidData, e))?;
let msg: String = String::from_utf8(msg)
.map_err(|e| std::io::Error::new(ErrorKind::InvalidData, e))?;

let padding_len = VarInt::decode(src)
.map_err(|_| ErrorKind::UnexpectedEof)?
Expand All @@ -68,7 +71,12 @@ pub fn padding(range: std::ops::RangeInclusive<u32>) -> Vec<u8> {

impl Encoder<&'_ SocksAddr> for Hy2TcpCodec {
type Error = std::io::Error;
fn encode(&mut self, item: &'_ SocksAddr, buf: &mut BytesMut) -> Result<(), Self::Error> {

fn encode(
&mut self,
item: &'_ SocksAddr,
buf: &mut BytesMut,
) -> Result<(), Self::Error> {
const REQ_ID: VarInt = VarInt::from_u32(0x401);

let padding = padding(64..=512);
Expand Down Expand Up @@ -123,5 +131,5 @@ fn hy2_resp_parse() {
let mut src = BytesMut::from(&[0x01, 0x00, 0x00][..]);
let msg = Hy2TcpCodec.decode(&mut src).unwrap().unwrap();
assert!(msg.status == 0x1);
assert!(msg.msg == "");
assert!(msg.msg.is_empty());
}
1 change: 1 addition & 0 deletions clash_lib/src/proxy/hysteria2/congestion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub struct Burtal {
budget_at_last_sent: u64,
rtt: u64,
in_flight: u64,
#[allow(dead_code)]
send_now: Instant,

sess: quinn::Connection,
Expand Down
132 changes: 88 additions & 44 deletions clash_lib/src/proxy/hysteria2/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
fmt::{Debug, Formatter},
net::SocketAddr,
net::{Ipv4Addr, Ipv6Addr, SocketAddr},
num::ParseIntError,
path::PathBuf,
pin::Pin,
Expand All @@ -23,7 +23,10 @@ use quinn::{
use quinn_proto::TransportConfig;

use rustls::{
client::danger::{ServerCertVerified, ServerCertVerifier},
client::{
danger::{ServerCertVerified, ServerCertVerifier},
WebPkiServerVerifier,
},
ClientConfig as RustlsClientConfig,
};
use tokio::{
Expand All @@ -38,20 +41,23 @@ use crate::{
},
dns::ThreadSafeDNSResolver,
},
common::utils::{encode_hex, sha256},
common::{
tls::GLOBAL_ROOT_STORE,
utils::{encode_hex, sha256},
},
// proxy::hysteria2::congestion::DynCongestion,
session::{Session, SocksAddr},
};
use tracing::debug;
use tracing::{debug, trace, warn};

use self::{
codec::Hy2TcpCodec,
congestion::{Burtal, DynController},
};

use super::{
converters::hysteria2::PortGenrateor, ConnectorType, DialWithConnector,
OutboundHandler, OutboundType,
converters::hysteria2::PortGenrateor, utils::new_udp_socket, ConnectorType,
DialWithConnector, OutboundHandler, OutboundType,
};

#[derive(Clone)]
Expand All @@ -64,26 +70,35 @@ pub struct HystOption {
pub salamander: Option<String>,
pub skip_cert_verify: bool,
pub alpn: Vec<String>,
#[allow(dead_code)]
pub up_down: Option<(u64, u64)>,
pub fingerprint: Option<String>,
pub ca: Option<PathBuf>,
#[allow(dead_code)]
pub ca_str: Option<String>,
#[allow(dead_code)]
pub cwnd: Option<u64>,
}

#[derive(Debug)]
struct CertVerifyOption {
fingerprint: Option<String>,
_ca: Option<PathBuf>,
skip: bool,
pki: Arc<WebPkiServerVerifier>,
}

impl CertVerifyOption {
fn new(fingerprint: Option<String>, ca: Option<PathBuf>, skip: bool) -> Self {
if ca.is_some() {
warn!("hysteria2 custom ca option is not supported yet");
// TODO: add load the ca and put it into a Store
}
Self {
fingerprint,
_ca: ca,
skip,
pki: WebPkiServerVerifier::builder(GLOBAL_ROOT_STORE.clone())
.build()
.unwrap(),
}
}
}
Expand All @@ -92,10 +107,10 @@ impl ServerCertVerifier for CertVerifyOption {
fn verify_server_cert(
&self,
end_entity: &rustls::pki_types::CertificateDer<'_>,
_intermediates: &[rustls::pki_types::CertificateDer<'_>],
_server_name: &rustls::pki_types::ServerName<'_>,
_ocsp_response: &[u8],
_now: rustls::pki_types::UnixTime,
intermediates: &[rustls::pki_types::CertificateDer<'_>],
server_name: &rustls::pki_types::ServerName<'_>,
ocsp_response: &[u8],
now: rustls::pki_types::UnixTime,
) -> Result<ServerCertVerified, rustls::Error> {
if let Some(ref fingerprint) = self.fingerprint {
let cert_hex = encode_hex(&sha256(end_entity.as_ref()));
Expand All @@ -110,36 +125,42 @@ impl ServerCertVerifier for CertVerifyOption {
if self.skip {
return Ok(ServerCertVerified::assertion());
}
// todo
Ok(ServerCertVerified::assertion())

self.pki.verify_server_cert(
end_entity,
intermediates,
server_name,
ocsp_response,
now,
)
}

fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
vec![]
self.pki.supported_verify_schemes()
}

fn verify_tls12_signature(
&self,
_message: &[u8],
_cert: &rustls::pki_types::CertificateDer<'_>,
_dss: &rustls::DigitallySignedStruct,
message: &[u8],
cert: &rustls::pki_types::CertificateDer<'_>,
dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
self.pki.verify_tls12_signature(message, cert, dss)
}

fn verify_tls13_signature(
&self,
_message: &[u8],
_cert: &rustls::pki_types::CertificateDer<'_>,
_dss: &rustls::DigitallySignedStruct,
message: &[u8],
cert: &rustls::pki_types::CertificateDer<'_>,
dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
self.pki.verify_tls13_signature(message, cert, dss)
}
}

enum CcRx {
Auto,
Fixed(u64),
Fixed(#[allow(dead_code)] u64),
}

impl FromStr for CcRx {
Expand Down Expand Up @@ -222,6 +243,7 @@ impl Handler {

async fn new_authed_session(
&self,
sess: &Session,
resolver: ThreadSafeDNSResolver,
) -> anyhow::Result<(Connection, SendRequest<OpenStreams, Bytes>)> {
// Everytime we enstablish a new session, we should lookup the server
Expand All @@ -239,55 +261,75 @@ impl Handler {

// Here maybe we should use a AsyncUdpSocket which implement salamander obfs
// and port hopping
let mut ep = if self.opts.salamander.is_some() || self.opts.ports.is_some() {
debug!("Hysteria2 use salamander obfs");

let mut ep = if self.opts.salamander.is_some() {
// let udp = salamander::Salamander::new(
// udp_socket,
// self.opts.salamander.as_ref().map(|s| s.as_bytes().to_vec()),
// self.opts.ports.clone(),
// )?;

let port_gen = self.opts.ports.as_ref().unwrap().clone();
let udp_hop =
udp_hop::UdpHop::new(server_socket_addr.port(), port_gen, None)?;
unimplemented!("salamander obfs is not implemented yet");
} else if let Some(port_gen) = self.opts.ports.as_ref() {
let udp_hop = udp_hop::UdpHop::new(
server_socket_addr.port(),
port_gen.clone(),
None,
)?;
quinn::Endpoint::new_with_abstract_socket(
self.ep_config.clone(),
None,
Arc::new(udp_hop),
Arc::new(TokioRuntime),
)?
} else {
let udp = SocketAddr::from(([0, 0, 0, 0], 0));
// bind to port 0, so the OS will choose a random port for us
let udp_socket = std::net::UdpSocket::bind::<SocketAddr>(udp)?;
let socket = {
if resolver.ipv6() {
new_udp_socket(
Some((Ipv6Addr::UNSPECIFIED, 0).into()),
sess.iface.clone(),
#[cfg(any(target_os = "linux", target_os = "android"))]
sess.so_mark,
)
.await?
} else {
new_udp_socket(
Some((Ipv4Addr::UNSPECIFIED, 0).into()),
sess.iface.clone(),
#[cfg(any(target_os = "linux", target_os = "android"))]
sess.so_mark,
)
.await?
}
};

quinn::Endpoint::new(
self.ep_config.clone(),
None,
udp_socket,
socket.into_std()?,
Arc::new(TokioRuntime),
)?
};

ep.set_default_client_config(self.client_config.clone());

let session = ep
.connect(
server_socket_addr,
self.opts.sni.as_ref().map(|s| s.as_str()).unwrap_or(""),
)?
.connect(server_socket_addr, self.opts.sni.as_deref().unwrap_or(""))?
.await?;
let (h3_conn, _rx, udp) = Self::auth(&session, &self.opts.passwd).await?;
*self.support_udp.write().unwrap() = udp;
// todo set congestion controller according to cc_rx

let any = session
match session
.congestion_state()
.into_any()
.downcast::<DynController>()
.unwrap();
any.set_controller(Box::new(Burtal::new(0, session.clone())));
{
Ok(any) => {
any.set_controller(Box::new(Burtal::new(0, session.clone())));
}
Err(_) => {
trace!("congestion controller is not set");
}
}

anyhow::Ok((session, h3_conn))
}
Expand Down Expand Up @@ -383,8 +425,10 @@ impl OutboundHandler for Handler {
}) {
Some(s) => s.clone(),
None => {
let (session, h3_conn) =
self.new_authed_session(resolver).await.map_err(|e| {
let (session, h3_conn) = self
.new_authed_session(sess, resolver)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!(
Expand Down
Loading
Loading