From e5a6774e1afa9739806d308eda2f48bdd9146e38 Mon Sep 17 00:00:00 2001 From: eauxxs Date: Wed, 18 Sep 2024 14:14:27 +0800 Subject: [PATCH] hysteria2 hop --- Cargo.lock | 115 +++++---- clash/tests/data/config/hysteria2.yaml | 16 +- clash_lib/Cargo.toml | 2 +- clash_lib/src/app/profile/mod.rs | 4 +- clash_lib/src/config/internal/proxy.rs | 31 ++- clash_lib/src/proxy/converters/hysteria2.rs | 45 +++- clash_lib/src/proxy/hysteria2/mod.rs | 265 +++++++++++--------- clash_lib/src/proxy/hysteria2/salamander.rs | 38 ++- clash_lib/src/proxy/hysteria2/udp_hop.rs | 262 ++++--------------- 9 files changed, 344 insertions(+), 434 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 180168f21..bc2043e6d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 4 +version = 3 [[package]] name = "addr2line" @@ -280,9 +280,9 @@ checksum = "d92bec98840b8f03a5ff5413de5293bfcd8bf96467cf5452609f939ec6f5de16" [[package]] name = "async-compression" -version = "0.4.16" +version = "0.4.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "103db485efc3e41214fe4fda9f3dbeae2eb9082f48fd236e6095627a9422066e" +checksum = "0cb8f1d480b0ea3783ab015936d2a55c87e219676f0c0b7dec61494043f21857" dependencies = [ "flate2", "futures-core", @@ -299,7 +299,7 @@ checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -321,7 +321,7 @@ checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -332,7 +332,7 @@ checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -456,7 +456,7 @@ checksum = "57d123550fa8d071b7255cb0cc04dc302baa6c8c4a79f55701552684d8399bce" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -523,7 +523,7 @@ dependencies = [ "regex", "rustc-hash 1.1.0", "shlex", - "syn 2.0.79", + "syn 2.0.81", "which", ] @@ -544,7 +544,7 @@ dependencies = [ "regex", "rustc-hash 1.1.0", "shlex", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -601,7 +601,7 @@ checksum = "e0b121a9fe0df916e362fb3271088d071159cdf11db0e4182d02152850756eff" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -787,9 +787,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.1.30" +version = "1.1.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b16803a61b81d9eabb7eae2588776c4c1e584b738ede45fdbb4c972cec1e9945" +checksum = "c2e7962b54006dcfcc61cb72735f4d89bb97061dd6a7ed882ec6b8ee53714c6f" dependencies = [ "shlex", ] @@ -937,7 +937,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -1353,7 +1353,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -1401,7 +1401,7 @@ dependencies = [ "proc-macro2", "quote", "strsim 0.11.1", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -1423,7 +1423,7 @@ checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" dependencies = [ "darling_core 0.20.10", "quote", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -1452,7 +1452,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -1536,7 +1536,7 @@ dependencies = [ "quote", "sha3", "strum 0.26.3", - "syn 2.0.79", + "syn 2.0.81", "void", ] @@ -1581,7 +1581,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -1671,7 +1671,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -1802,7 +1802,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -1815,7 +1815,7 @@ dependencies = [ "num-traits", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -2083,7 +2083,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -3329,7 +3329,7 @@ dependencies = [ "cfg-if", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -3512,7 +3512,7 @@ dependencies = [ "proc-macro-crate 3.2.0", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -3780,7 +3780,7 @@ dependencies = [ "phf_shared", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -3809,7 +3809,7 @@ checksum = "a4502d8515ca9f32f1fb543d987f63d95a14934883db45bdb48060b6b69257f8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -3970,12 +3970,12 @@ dependencies = [ [[package]] name = "prettyplease" -version = "0.2.22" +version = "0.2.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "479cf940fbbb3426c32c5d5176f62ad57549a0bb84773423ba8be9d089f5faba" +checksum = "910d41a655dac3b764f1ade94821093d3610248694320cd072303a8eedcf221d" dependencies = [ "proc-macro2", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -4043,9 +4043,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.87" +version = "1.0.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3e4daa0dcf6feba26f985457cdf104d4b4256fc5a09547140f3631bb076b19a" +checksum = "7c3a7fc5db1e57d5a779a352c8cdb57b29aa4c40cc69c3a68a7fedc815fbf2f9" dependencies = [ "unicode-ident", ] @@ -4077,7 +4077,7 @@ dependencies = [ "prost", "prost-types", "regex", - "syn 2.0.79", + "syn 2.0.81", "tempfile", ] @@ -4091,7 +4091,7 @@ dependencies = [ "itertools 0.13.0", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -4682,7 +4682,7 @@ checksum = "243902eda00fad750862fc144cea25caca5e20d615af0a81bee94ca738f1df1f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -4724,7 +4724,7 @@ checksum = "6c64451ba24fc7a6a2d60fc75dd9c83c90903b19028d4eff35e88fc1e86564e9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -4775,7 +4775,7 @@ dependencies = [ "darling 0.20.10", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -4813,7 +4813,7 @@ checksum = "82fe9db325bcef1fbcde82e078a5cc4efdf787e96b3b9cf45b50b529f2083d67" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -5146,7 +5146,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -5159,7 +5159,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -5181,9 +5181,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.79" +version = "2.0.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89132cd0bf050864e1d38dc3bbc07a0eb8e7530af26344d3d2bbbef83499f590" +checksum = "198514704ca887dd5a1e408c6c6cdcba43672f9b4062e1b24aa34e74e6d7faae" dependencies = [ "proc-macro2", "quote", @@ -5244,7 +5244,7 @@ checksum = "08904e7672f5eb876eaaf87e0ce17857500934f4981c4a0ab2b4aa98baac7fc3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -5355,7 +5355,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -6400,7 +6400,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -6492,7 +6492,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568" dependencies = [ "quote", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -6635,12 +6635,9 @@ dependencies = [ [[package]] name = "unicase" -version = "2.7.0" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7d2d4dafb69621809a81864c9c1b864479e1235c0dd4e199924b9742439ed89" -dependencies = [ - "version_check", -] +checksum = "7e51b68083f157f853b6379db119d1c1be0e6e4dec98101079dec41f6f5cf6df" [[package]] name = "unicode-bidi" @@ -6751,7 +6748,7 @@ checksum = "6b91f57fe13a38d0ce9e28a03463d8d3c2468ed03d75375110ec71d93b449a08" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -6780,7 +6777,7 @@ checksum = "d674d135b4a8c1d7e813e2f8d1c9a58308aee4a680323066025e53132218bd91" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -6851,7 +6848,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.81", "wasm-bindgen-shared", ] @@ -6873,7 +6870,7 @@ checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.81", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -7027,7 +7024,7 @@ checksum = "2bbd5b46c938e506ecbce286b6628a02171d56153ba733b6c741fc627ec9579b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -7038,7 +7035,7 @@ checksum = "053c4c462dc91d3b1504c6fe5a726dd15e216ba718e84a0e46a88fbe5ded3515" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -7298,7 +7295,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.81", ] [[package]] @@ -7318,5 +7315,5 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.81", ] diff --git a/clash/tests/data/config/hysteria2.yaml b/clash/tests/data/config/hysteria2.yaml index aff9a6755..3e1290a8f 100644 --- a/clash/tests/data/config/hysteria2.yaml +++ b/clash/tests/data/config/hysteria2.yaml @@ -3,7 +3,6 @@ port: 8891 socks-port: 8889 mixed-port: 8888 - dns: enable: true listen: 127.0.0.1:53533 @@ -50,8 +49,19 @@ proxies: password: passwd sni: example.com skip-cert-verify: true - obfs: salamander - obfs-password: "passwd" + + # fingerprint: "DF:F8:47:B5:6A:A3:8D:E8:A8:09:76:70:DD:1D:BB:DF:EB:3D:FB:07:FB:12:EE:C5:A9:BC:9B:07:26:7A:75:33" + # or + # fingerprint: "dff847b56aa38de8a8097670dd1dbbdfeb3dfb07fb12eec5a9bc9b07267a7533" + + # obfs: salamander + # obfs-password: "passwd" + + # server port hop range + # if hysteria2 server is deployed on `localhost` for testing, you can use this iptables commands to redirect the traffic, otherwise https://v2.hysteria.network/docs/advanced/Port-Hopping/#server + # sudo iptables -t nat -I PREROUTING -p udp --dport 15000:16000 -j REDIRECT --to-ports 10086 + # sudo iptabls -t nat -I OUTPUT -p udp -o lo --dport 15000:16000 -j REDIRECT --to-ports 10086 + ports: "15000-16000" rules: - MATCH, local diff --git a/clash_lib/Cargo.toml b/clash_lib/Cargo.toml index 6f1f5a5af..31a878606 100644 --- a/clash_lib/Cargo.toml +++ b/clash_lib/Cargo.toml @@ -146,7 +146,7 @@ tokio-test = "0.4.4" axum-macros = "0.4.2" bollard = "0.17" serial_test = "3.1" -env_logger = "0.11" +env_logger = "*" [build-dependencies] prost-build = "0.13" diff --git a/clash_lib/src/app/profile/mod.rs b/clash_lib/src/app/profile/mod.rs index c458069bb..515772d47 100644 --- a/clash_lib/src/app/profile/mod.rs +++ b/clash_lib/src/app/profile/mod.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, sync::Arc}; use serde::{Deserialize, Serialize}; -use tracing::{error, trace, warn}; +use tracing::{error, warn}; #[derive(Serialize, Deserialize, Debug, Clone)] struct Db { @@ -43,7 +43,7 @@ impl ThreadSafeCacheFile { if let Err(e) = tokio::fs::write(&path, s).await { error!("failed to write cache file: {}", e); } else { - trace!("cache file flushed to {}", path); + // trace!("cache file flushed to {}", path); } } }); diff --git a/clash_lib/src/config/internal/proxy.rs b/clash_lib/src/config/internal/proxy.rs index 46436ecf7..c79309fa8 100644 --- a/clash_lib/src/config/internal/proxy.rs +++ b/clash_lib/src/config/internal/proxy.rs @@ -1,9 +1,9 @@ -use crate::{common::utils::default_bool_true, config::utils, Error}; +use crate::{common::utils::{decode_hex, default_bool_true}, config::utils, Error}; use serde::{de::value::MapDeserializer, Deserialize}; use serde_yaml::Value; use std::{ collections::HashMap, - fmt::{Display, Formatter}, + fmt::{Display, Formatter}, path::PathBuf, }; use uuid::Uuid; @@ -499,8 +499,11 @@ pub struct OutboundHysteria2 { pub sni: Option, pub skip_cert_verify: bool, pub ca: Option, - pub ca_str: Option, - pub fingerprint: Option, + pub ca_str: Option, + // a hex_encoded sha256 fingerprint of the server certificate + #[serde(deserialize_with = "deserialize_sha256")] + #[serde(default)] + pub fingerprint: Option<[u8; 32]>, /// bbr congestion control window pub cwnd: Option, } @@ -510,3 +513,23 @@ pub struct OutboundHysteria2 { pub enum Hysteria2Obfs { Salamander, } + +fn deserialize_sha256<'de, D>(deserializer: D) -> Result, D::Error> +where + D: serde::Deserializer<'de>, +{ + let s = Option::::deserialize(deserializer)?.map(|x| x.replace(':', "")); + if let Some(s) = s { + let s = decode_hex(&s).map_err(serde::de::Error::custom)?; + if s.len() != 32 { + return Err(serde::de::Error::custom( + "fingerprint must be 32 bytes long", + )); + } + let mut arr = [0; 32]; + arr.copy_from_slice(&s); + Ok(Some(arr)) + } else { + Ok(None) + } +} \ No newline at end of file diff --git a/clash_lib/src/proxy/converters/hysteria2.rs b/clash_lib/src/proxy/converters/hysteria2.rs index cb587881a..ca9bf86f6 100644 --- a/clash_lib/src/proxy/converters/hysteria2.rs +++ b/clash_lib/src/proxy/converters/hysteria2.rs @@ -1,10 +1,12 @@ use std::{ num::{NonZeroU16, ParseIntError}, ops::RangeInclusive, + path::PathBuf, sync::Arc, }; use rand::Rng; +use rustls::pki_types::{pem::PemObject, CertificateDer}; use crate::{ config::internal::proxy::{Hysteria2Obfs, OutboundHysteria2}, @@ -40,10 +42,8 @@ impl PortGenrateor { } pub fn get(&self) -> u16 { - let mut rng = rand::thread_rng(); - let len = - 1 + self.ports.len() + self.range.iter().map(|r| r.len()).sum::(); - let idx = rng.gen_range(0..len); + let len = 1 /* default_port */ + self.ports.len() + self.range.iter().map(|r| r.len()).sum::(); + let idx = rand::thread_rng().gen_range(0..len); match idx { 0 => self.default, idx if idx <= self.ports.len() => self.ports[idx - 1], @@ -122,14 +122,13 @@ impl TryFrom for AnyOutboundHandler { sni: value.sni.or(addr.domain().map(|s| s.to_owned())), addr, alpn: value.alpn.unwrap_or_default(), - ca: value.ca.map(|s| s.into()), + ca: parse_certificate(value.ca.as_deref(), value.ca_str.as_ref())?, fingerprint: value.fingerprint, skip_cert_verify: value.skip_cert_verify, passwd: value.password, ports: ports_gen, obfs, up_down: value.up.zip(value.down), - ca_str: value.ca_str, cwnd: value.cwnd, }; @@ -138,6 +137,40 @@ impl TryFrom for AnyOutboundHandler { } } +fn parse_certificate( + ca_str: Option<&str>, + ca_path: Option<&PathBuf>, +) -> Result>, crate::Error> { + let ca_from_str = ca_str + .map(|s| CertificateDer::from_pem_slice(s.as_bytes()).map(|c| c.to_owned())) + .transpose() + .map_err(|e| { + crate::Error::InvalidConfig(format!("parse ca from str error: {:?}", e)) + })?; + + let ca_from_path = ca_path + .map(|p| CertificateDer::from_pem_file(p).map(|c| c.to_owned())) + .transpose() + .map_err(|e| { + crate::Error::InvalidConfig(format!("parse ca from path error: {:?}", e)) + })?; + + match (ca_from_str, ca_from_path) { + (Some(ca), None) => Ok(Some(ca)), + (None, Some(ca)) => Ok(Some(ca)), + (Some(a), Some(b)) => { + if a == b { + Ok(a.into()) + } else { + Err(crate::Error::InvalidConfig( + "ca from str and path not equal".to_owned(), + )) + } + } + (None, None) => Ok(None), + } +} + #[test] fn test_port_gen() { let p = PortGenrateor::new(1000).parse_ports_str("").unwrap(); diff --git a/clash_lib/src/proxy/hysteria2/mod.rs b/clash_lib/src/proxy/hysteria2/mod.rs index 7288b6bcd..2674dcbf8 100644 --- a/clash_lib/src/proxy/hysteria2/mod.rs +++ b/clash_lib/src/proxy/hysteria2/mod.rs @@ -2,7 +2,6 @@ use std::{ fmt::{Debug, Formatter}, net::{Ipv4Addr, Ipv6Addr, SocketAddr}, num::ParseIntError, - path::PathBuf, pin::Pin, str::FromStr, sync::{Arc, RwLock}, @@ -13,12 +12,14 @@ mod congestion; mod salamander; mod udp_hop; +use anyhow::Context as _; use bytes::Bytes; use futures::{SinkExt, StreamExt}; use h3::client::SendRequest; use h3_quinn::OpenStreams; use quinn::{ - crypto::rustls::QuicClientConfig, ClientConfig, Connection, TokioRuntime, + crypto::rustls::QuicClientConfig, AsyncUdpSocket, ClientConfig, Connection, + Runtime, TokioRuntime, }; use quinn_proto::TransportConfig; @@ -27,11 +28,13 @@ use rustls::{ danger::{ServerCertVerified, ServerCertVerifier}, WebPkiServerVerifier, }, + pki_types::CertificateDer, ClientConfig as RustlsClientConfig, }; use tokio::{ io::{AsyncRead, AsyncWrite, ReadBuf}, - sync::Mutex, + select, + sync::{Mutex, OnceCell}, }; use crate::{ @@ -49,7 +52,7 @@ use crate::{ // proxy::hysteria2::congestion::DynCongestion, session::{Session, SocksAddr}, }; -use tracing::{debug, trace, warn}; +use tracing::{debug, trace}; use self::{ codec::Hy2TcpCodec, @@ -61,14 +64,11 @@ use super::{ DialWithConnector, OutboundHandler, OutboundType, }; -#[derive(Clone)] -pub struct SalamanderObfs { - pub key: Vec, -} +pub use salamander::SalamanderObfs; #[derive(Clone)] pub enum Obfs { - Salamander(SalamanderObfs), + Salamander(salamander::SalamanderObfs), } #[derive(Clone)] @@ -83,28 +83,71 @@ pub struct HystOption { pub alpn: Vec, #[allow(dead_code)] pub up_down: Option<(u64, u64)>, - pub fingerprint: Option, - pub ca: Option, - #[allow(dead_code)] - pub ca_str: Option, + pub fingerprint: Option<[u8; 32]>, + pub ca: Option>, #[allow(dead_code)] pub cwnd: Option, } +impl HystOption { + async fn get_udp_socket( + &self, + sess: &crate::session::Session, + is_ipv6: bool, + ) -> std::io::Result> { + let socket_addr = if is_ipv6 { + SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), 0) + } else { + SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 0) + }; + let udp_socket = new_udp_socket( + socket_addr.into(), + sess.iface.clone(), + #[cfg(any(target_os = "linux", target_os = "android"))] + sess.so_mark, + ) + .await + .unwrap(); + let udp_socket = udp_socket.into_std()?; + + let udp = if let Some(obfs) = &self.obfs { + match obfs { + Obfs::Salamander(key) => { + Arc::new(salamander::Salamander::new(udp_socket, key.clone())?) + } + } + } else { + TokioRuntime.wrap_udp_socket(udp_socket)? + }; + + if let Some(ports) = &self.ports { + let server_port = ports.get(); + Ok(Arc::new(udp_hop::Hop::new( + udp, + server_port, + self.addr.port(), + ))) + } else { + Ok(udp) + } + } +} #[derive(Debug)] struct CertVerifyOption { - fingerprint: Option, + fingerprint: Option<[u8; 32]>, + certificate: Option>, skip: bool, pki: Arc, } impl CertVerifyOption { - fn new(fingerprint: Option, ca: Option, skip: bool) -> Self { - if ca.is_some() { - warn!("hysteria2 custom ca option is not supported yet"); - // TODO: add load the ca and put it into a Store - } + fn new( + fingerprint: Option<[u8; 32]>, + certificate: Option>, + skip: bool, + ) -> Self { Self { + certificate, fingerprint, skip, pki: WebPkiServerVerifier::builder(GLOBAL_ROOT_STORE.clone()) @@ -124,19 +167,32 @@ impl ServerCertVerifier for CertVerifyOption { now: rustls::pki_types::UnixTime, ) -> Result { if let Some(ref fingerprint) = self.fingerprint { - let cert_hex = encode_hex(&sha256(end_entity.as_ref())); - if &cert_hex != fingerprint { + let cert_sha256 = sha256(end_entity.as_ref()); + assert!(cert_sha256.len() == 32); + let cert_sha256: [u8; 32] = cert_sha256.try_into().unwrap(); + + if &cert_sha256 != fingerprint { return Err(rustls::Error::General(format!( - "cert hash mismatch: found: {}\nexcept: {}", - cert_hex, fingerprint + "cert hash mismatch: found: {}\nexpect: {}", + encode_hex(&cert_sha256), + encode_hex(fingerprint) ))); + } else { + return Ok(ServerCertVerified::assertion()); + } + } + + if let Some(ref cert) = self.certificate { + if cert != end_entity { + return Err(rustls::Error::General(format!("cert mismatch",))); + } else { + return Ok(ServerCertVerified::assertion()); } } if self.skip { return Ok(ServerCertVerified::assertion()); } - self.pki.verify_server_cert( end_entity, intermediates, @@ -156,6 +212,9 @@ impl ServerCertVerifier for CertVerifyOption { cert: &rustls::pki_types::CertificateDer<'_>, dss: &rustls::DigitallySignedStruct, ) -> Result { + if self.skip { + return Ok(rustls::client::danger::HandshakeSignatureValid::assertion()); + } self.pki.verify_tls12_signature(message, cert, dss) } @@ -165,6 +224,9 @@ impl ServerCertVerifier for CertVerifyOption { cert: &rustls::pki_types::CertificateDer<'_>, dss: &rustls::DigitallySignedStruct, ) -> Result { + if self.skip { + return Ok(rustls::client::danger::HandshakeSignatureValid::assertion()); + } self.pki.verify_tls13_signature(message, cert, dss) } } @@ -189,7 +251,7 @@ impl FromStr for CcRx { pub struct Handler { opts: HystOption, - ep_config: quinn::EndpointConfig, + ep: OnceCell, client_config: quinn::ClientConfig, session: Mutex>>, // a send request guard to keep the connection alive @@ -207,13 +269,15 @@ impl Debug for Handler { impl Handler { const DEFAULT_MAX_IDLE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(300); + const HOP_INTERVAL: std::time::Duration = std::time::Duration::from_secs(2); pub fn new(opts: HystOption) -> anyhow::Result { let verify = CertVerifyOption::new( - opts.fingerprint.clone(), + opts.fingerprint, opts.ca.clone(), opts.skip_cert_verify, ); + // let verify = DummyTlsVerifier::new(); let mut tls_config = RustlsClientConfig::builder() .dangerous() .with_custom_certificate_verifier(Arc::new(verify)) @@ -237,11 +301,10 @@ impl Handler { let quic_config: QuicClientConfig = tls_config.try_into().unwrap(); let mut client_config = ClientConfig::new(Arc::new(quic_config)); client_config.transport_config(Arc::new(transport)); - let ep_config = quinn::EndpointConfig::default(); Ok(Self { opts, - ep_config, + ep: OnceCell::new(), client_config, session: Mutex::new(None), guard: Mutex::new(None), @@ -262,114 +325,77 @@ impl Handler { let ip = resolver .resolve(d.as_str(), true) .await? - .ok_or_else(|| anyhow!("resolve domain {} failed", d))?; + .context(format!("resolve domain {} failed", d))?; SocketAddr::new(ip, port) } }; - // Here maybe we should use a AsyncUdpSocket which implement salamander obfs - // and port hopping - let create_socket = || async { - if resolver.ipv6() { - new_udp_socket( - Some((Ipv6Addr::UNSPECIFIED, 0).into()), - sess.iface.clone(), - #[cfg(any(target_os = "linux", target_os = "android"))] - sess.so_mark, - ) - .await - } else { - new_udp_socket( - Some((Ipv4Addr::UNSPECIFIED, 0).into()), - sess.iface.clone(), - #[cfg(any(target_os = "linux", target_os = "android"))] - sess.so_mark, - ) - .await - } - }; - - let mut ep = if let Some(obfs) = self.opts.obfs.as_ref() { - match obfs { - Obfs::Salamander(salamander_obfs) => { - let socket = create_socket().await?; - let obfs = salamander::Salamander::new( - socket.into_std()?, - salamander_obfs.key.to_vec(), - )?; - - quinn::Endpoint::new_with_abstract_socket( - self.ep_config.clone(), + let connection = self + .ep + .get_or_init(|| { + let client_config = self.client_config.clone(); + let is_ipv6 = resolver.ipv6(); + async move { + let mut ep = quinn::Endpoint::new_with_abstract_socket( + Default::default(), None, - Arc::new(obfs), + self.opts.get_udp_socket(sess, is_ipv6).await.unwrap(), Arc::new(TokioRuntime), - )? - } - } - } else if let Some(port_gen) = self.opts.ports.as_ref() { - let udp_hop = udp_hop::UdpHop::new( - server_socket_addr.port(), - port_gen.clone(), - None, - )?; - quinn::Endpoint::new_with_abstract_socket( - self.ep_config.clone(), - None, - Arc::new(udp_hop), - Arc::new(TokioRuntime), - )? - } else { - let socket = { - if resolver.ipv6() { - new_udp_socket( - Some((Ipv6Addr::UNSPECIFIED, 0).into()), - sess.iface.clone(), - #[cfg(any(target_os = "linux", target_os = "android"))] - sess.so_mark, - ) - .await? - } else { - new_udp_socket( - Some((Ipv4Addr::UNSPECIFIED, 0).into()), - sess.iface.clone(), - #[cfg(any(target_os = "linux", target_os = "android"))] - sess.so_mark, ) - .await? + .unwrap(); + ep.set_default_client_config(client_config); + ep } - }; - - quinn::Endpoint::new( - self.ep_config.clone(), - None, - socket.into_std()?, - Arc::new(TokioRuntime), - )? - }; - - ep.set_default_client_config(self.client_config.clone()); - - let session = ep + }) + .await .connect(server_socket_addr, self.opts.sni.as_deref().unwrap_or(""))? .await?; - let (guard, _rx, udp) = Self::auth(&session, &self.opts.passwd).await?; + let (guard, _rx, udp) = Self::auth(&connection, &self.opts.passwd).await?; *self.support_udp.write().unwrap() = udp; // todo set congestion controller according to cc_rx - match session + match connection .congestion_state() .into_any() .downcast::() { Ok(any) => { - any.set_controller(Box::new(Burtal::new(0, session.clone()))); + any.set_controller(Box::new(Burtal::new(0, connection.clone()))); } Err(_) => { trace!("congestion controller is not set"); } } - Ok((session, guard)) + // if the server support hop, we should rebind the udp socket + if self.opts.ports.is_some() { + tokio::spawn({ + let sess = sess.clone(); + let connection = connection.clone(); + let opts = self.opts.clone(); + let ep = self.ep.clone(); + async move { + let mut ticker = tokio::time::interval(Self::HOP_INTERVAL); + // ticks immediately + ticker.tick().await; + loop { + select! { + _ = ticker.tick() => { + let ep = ep.get().unwrap(); + let udp = opts.get_udp_socket(&sess, ep.local_addr().unwrap().is_ipv6()).await?; + ep.rebind_abstract(udp)?; + }, + reason = connection.closed() => { + tracing::debug!("hysteria session closed: {:?}", reason); + break; + }, + } + } + Ok::<(), std::io::Error>(()) + } + }); + } + anyhow::Ok((connection, guard)) } async fn auth( @@ -385,8 +411,7 @@ impl Handler { .header("Hysteria-Auth", passwd) .header("Hysteria-CC-RX", "0") .header("Hysteria-Padding", codec::padding(64..=512)) - .body(()) - .unwrap(); + .body(())?; let mut r = sender.send_request(req).await?; r.finish().await?; @@ -402,14 +427,14 @@ impl Handler { let cc_rx = r .headers() .get("Hysteria-CC-RX") - .ok_or_else(|| anyhow!("auth failed: missing Hysteria-CC-RX header"))? + .context("auth failed: missing Hysteria-CC-RX header")? .to_str()? .parse()?; let support_udp = r .headers() .get("Hysteria-UDP") - .ok_or_else(|| anyhow!("auth failed: missing Hysteria-UDP header"))? + .context("auth failed: missing Hysteria-UDP header")? .to_str()? .parse()?; @@ -454,7 +479,7 @@ impl OutboundHandler for Handler { let authed_conn = { let mut session_lock = self.session.lock().await; - match (*session_lock).as_ref().filter(|s| match s.close_reason() { + match session_lock.as_ref().filter(|s| match s.close_reason() { // rust should have inspect method on Option and Result! Some(reason) => { tracing::debug!("old connection closed: {:?}", reason); @@ -462,7 +487,7 @@ impl OutboundHandler for Handler { } None => true, }) { - Some(s) => s.clone(), + Some(s) => s.to_owned(), None => { let (session, guard) = self .new_authed_session(sess, resolver) @@ -518,8 +543,8 @@ impl OutboundHandler for Handler { } }; - let hyster_client = HystStream { send: tx, recv: rx }; - Ok(Box::new(ChainedStreamWrapper::new(Box::new(hyster_client)))) + let hyster_stream = HystStream { send: tx, recv: rx }; + Ok(Box::new(ChainedStreamWrapper::new(Box::new(hyster_stream)))) } } diff --git a/clash_lib/src/proxy/hysteria2/salamander.rs b/clash_lib/src/proxy/hysteria2/salamander.rs index d004ea6b1..b247666fe 100644 --- a/clash_lib/src/proxy/hysteria2/salamander.rs +++ b/clash_lib/src/proxy/hysteria2/salamander.rs @@ -17,15 +17,19 @@ use rand::Rng; type Blake2b256 = Blake2b; -struct SalamanderObfs { - key: Vec, +#[derive(Clone)] +pub struct SalamanderObfs { + pub key: Vec, } +const SALAMANDER_OBFS_PREFIX_LEN: usize = 8; + impl SalamanderObfs { /// create a new obfs /// /// new() should init a blake2b256 hasher with key to reduce calculation, /// but rust-analyzer can't recognize its type + #[allow(dead_code)] pub fn new(key: Vec) -> Self { Self { key } } @@ -42,9 +46,9 @@ impl SalamanderObfs { } fn encrpyt(&self, data: &mut [u8]) -> Bytes { - let salt: [u8; 8] = rand::thread_rng().gen(); + let salt: [u8; SALAMANDER_OBFS_PREFIX_LEN] = rand::thread_rng().gen(); - let mut res = BytesMut::with_capacity(8 + data.len()); + let mut res = BytesMut::with_capacity(SALAMANDER_OBFS_PREFIX_LEN + data.len()); res.put_slice(&salt); self.obfs(&salt, data); res.put_slice(data); @@ -53,11 +57,10 @@ impl SalamanderObfs { } fn decrpyt(&self, data: &mut [u8]) { - assert!(data.len() > 8, "data len must > 8"); + assert!(data.len() > SALAMANDER_OBFS_PREFIX_LEN); - let (salt, data) = data.split_at_mut(8); + let (salt, data) = data.split_at_mut(SALAMANDER_OBFS_PREFIX_LEN); self.obfs(salt, data); - // data.advance(8); // sadlly IoSliceMut::advance is unstable } } @@ -67,13 +70,13 @@ pub struct Salamander { } impl Salamander { - pub fn new(socket: std::net::UdpSocket, key: Vec) -> std::io::Result { + pub fn new(socket: std::net::UdpSocket, obfs: SalamanderObfs) -> std::io::Result { use quinn::Runtime; let inner = TokioRuntime.wrap_udp_socket(socket)?; std::io::Result::Ok(Self { inner, - obfs: SalamanderObfs::new(key), + obfs, }) } } @@ -96,6 +99,7 @@ impl AsyncUdpSocket for Salamander { // TODO: encrypt in place let x = self.obfs.encrpyt(&mut v.contents.to_vec()); v.contents = &x; + // v.destination.set_port(15005); self.inner.try_send(&v) } @@ -107,26 +111,18 @@ impl AsyncUdpSocket for Salamander { ) -> Poll> { // the number of udp packets received let packet_nums = ready!(self.inner.poll_recv(cx, bufs, meta))?; - meta.iter().take(packet_nums).for_each(|v| { - tracing::trace!("meta addr {:?}, dst_ip: {:?}", v.addr, v.dst_ip); - }); bufs.iter_mut() .zip(meta.iter_mut()) - // first step take and then filter .take(packet_nums) - .filter(|(_, meta)| meta.len > 8) + .filter(|(_, meta)| meta.len > SALAMANDER_OBFS_PREFIX_LEN) .for_each(|(v, meta)| { + // meta.addr.set_port(15001); let x = &mut v.deref_mut()[..meta.len]; // decrypt in place, and drop first 8 bytes self.obfs.decrpyt(x); - let data = &mut x[8..]; - unsafe { - // because IoSliceMut is transparent and .0 is also transparent, so it is a &[u8] - let b: IoSliceMut<'_> = std::mem::transmute(data); - *v = b; - } + v.advance(SALAMANDER_OBFS_PREFIX_LEN); // MUST update meta.len - meta.len -= 8; + meta.len -= SALAMANDER_OBFS_PREFIX_LEN; }); Poll::Ready(Ok(packet_nums)) diff --git a/clash_lib/src/proxy/hysteria2/udp_hop.rs b/clash_lib/src/proxy/hysteria2/udp_hop.rs index b154367b4..333942315 100644 --- a/clash_lib/src/proxy/hysteria2/udp_hop.rs +++ b/clash_lib/src/proxy/hysteria2/udp_hop.rs @@ -1,242 +1,68 @@ -use std::{ - fmt::Debug, - io, - net::SocketAddr, - ops::{Deref, DerefMut, Sub}, - pin::Pin, - sync::{Arc, Mutex}, - task::{Context, Poll}, - time::{Duration, Instant}, +use futures::ready; +use quinn::{ + udp::{RecvMeta, Transmit}, + AsyncUdpSocket, }; - -use quinn::{udp::Transmit, AsyncUdpSocket, Runtime, TokioRuntime, UdpPoller}; - -use crate::proxy::converters::hysteria2::PortGenrateor; - -struct HopState { - prev_conn: Option>, - cur_conn: Arc, - last: Instant, - new_hop_port: u16, -} - -/// A udp socket hopper, it can hop to a new port when the time interval is -/// greater than interval -/// -/// https://v2.hysteria.network/docs/advanced/Port-Hopping/ -pub struct UdpHop { - /// (prev_conn, cur_conn, last, new_hop_port), here mybe we can use struct - state: Mutex, - /// The default port is the initial port when this quic connect connects to - /// the server. Every time we call poll_recv, we must rewrite the source - /// of the data packet inside to this port, because quic will check the - /// source of the data packet and discard the unknown source data. - init_port: u16, - /// generate new port used to hop - port_range: PortGenrateor, - /// interval to hop - interval: Duration, +use std::{fmt::Debug, io::IoSliceMut, sync::Arc, task::Poll}; + +pub(super) struct Hop { + /// The inner udp socket + inner: Arc, + // The port that quinn packet should be sent to + server_port: u16, + /// The port that quinn connection is connecting to + connection_port: u16, } -impl UdpHop { - const DEFAULT_INTERVAL: Duration = Duration::from_secs(30); - - pub fn new( - port: u16, - port_range: PortGenrateor, - interval: Option, - ) -> io::Result { - let socket = - std::net::UdpSocket::bind(SocketAddr::new([0, 0, 0, 0].into(), 0))?; - - let state = HopState { - prev_conn: None, - cur_conn: TokioRuntime.wrap_udp_socket(socket)?, - last: Instant::now(), - new_hop_port: port, - } - .into(); - - Ok(UdpHop { - state, - init_port: port, - port_range, - interval: interval.unwrap_or(Self::DEFAULT_INTERVAL), - }) - } - - fn hop(&self) -> u16 { - let mut lock = self.state.lock().unwrap(); - let HopState { - prev_conn, - cur_conn, - last, - new_hop_port, - } = lock.deref_mut(); - - let now = Instant::now(); - let to_hop = now.sub(*last) > self.interval; - - if to_hop && prev_conn.is_none() { - *last = now; - tracing::trace!("port hopping"); - - std::net::UdpSocket::bind(SocketAddr::new([0, 0, 0, 0].into(), 0)) - .and_then(|udp| TokioRuntime.wrap_udp_socket(udp)) - .map(|new_conn| { - *new_hop_port = self.port_range.get(); - *prev_conn = Some(std::mem::replace(cur_conn, new_conn)); - }) - .unwrap_or_else(|e| { - tracing::error!("port hopping err {}", e); - }); +impl Hop { + pub(super) fn new( + udp_socket: Arc, + server_port: u16, + connection_port: u16, + ) -> Self { + Self { + inner: udp_socket, + server_port, + connection_port, } - *new_hop_port - } - - fn get_conn( - &self, - ) -> (Option>, Arc) { - let lock = self.state.lock().unwrap(); - let HopState { - prev_conn, - cur_conn, - .. - } = lock.deref(); - (prev_conn.clone(), cur_conn.clone()) - } - - fn drop_prcv_conn(&self) { - let mut lock = self.state.lock().unwrap(); - lock.deref_mut().prev_conn.take(); } } -impl Debug for UdpHop { +impl Debug for Hop { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("UdpHop") - // .field("cur_conn", &self.state) - .finish() + f.debug_struct("Hysteria HopSocket").finish() } } -impl AsyncUdpSocket for UdpHop { - fn create_io_poller(self: Arc) -> Pin> { - let cur = self.get_conn().1; - cur.create_io_poller() +impl quinn::AsyncUdpSocket for Hop { + fn create_io_poller( + self: Arc, + ) -> std::pin::Pin> { + self.inner.clone().create_io_poller() } - fn try_send(&self, transmit: &Transmit) -> io::Result<()> { - let port = self.hop(); - - let cur = self.get_conn().1; - - // here just need change send addr, it is not nessary to change send - // contents, so we can use unsafe + fn try_send(&self, transmit: &Transmit) -> std::io::Result<()> { unsafe { - let prt = transmit as *const Transmit as *mut Transmit; - (*prt).destination.set_port(port); + let transmit = transmit as *const Transmit as *mut Transmit; + (*transmit).destination.set_port(self.server_port); } - - cur.try_send(transmit) + self.inner.try_send(transmit) } - // fn poll_send( - // &self, - // state: &UdpState, - // cx: &mut Context, - // transmits: &[Transmit], - // ) -> Poll> { - // // try to hop when we send data - // let port = self.hop(); - - // let (_pre_conn, io) = self.get_conn(); - - // // here just need change send addr, it is not nessary to change send - // // contents, so we can use unsafe - // unsafe { - // let prt = transmits.as_ptr() as *mut Transmit; - // let slice_mut: &mut [Transmit] = - // std::slice::from_raw_parts_mut(prt, transmits.len()); - // slice_mut.iter_mut().for_each(|v| { - // v.destination.set_port(port); - // }) - // } - - // loop { - // ready!(io.poll_send_ready(cx))?; - // if let Ok(res) = io.try_io(Interest::WRITABLE, || { - // self.socket_rw.send((&io).into(), state, &transmits) - // }) { - // return Poll::Ready(Ok(res)); - // } - // } - // } - fn poll_recv( &self, - cx: &mut Context, - bufs: &mut [io::IoSliceMut<'_>], - meta: &mut [quinn::udp::RecvMeta], - ) -> Poll> { - let (prev_io, io) = self.get_conn(); - - // read prev conn - let (len, should_drop) = match prev_io { - Some(ref prev_io) => match prev_io.poll_recv(cx, bufs, meta) { - // can readable, it is represent that the prev conn is not - // closed, and we recv the data from prev conn - Poll::Ready(Ok(len)) => (len, false), - Poll::Ready(Err(e)) => { - tracing::trace!("poll prev conn err {}", e); - match e.kind() { - // io::ErrorKind::WouldBlock => {} - io::ErrorKind::TimedOut => return Poll::Ready(Err(e)), - _ => (0, true), - } - } - Poll::Pending => { - tracing::trace!("poll prev conn pending"); - (0, false) - } - }, - None => (0, true), - }; - - if should_drop { - self.drop_prcv_conn(); + cx: &mut std::task::Context, + bufs: &mut [IoSliceMut<'_>], + meta: &mut [RecvMeta], + ) -> std::task::Poll> { + let packet_nums = ready!(self.inner.poll_recv(cx, bufs, meta))?; + for i in &mut meta[..packet_nums] { + i.addr.set_port(self.connection_port); } - meta.iter_mut() - .take(len) - .for_each(|m| m.addr.set_port(self.init_port)); - - match io.poll_recv(cx, bufs, &mut meta[len..]) { - Poll::Pending => { - if len > 0 { - Poll::Ready(Ok(len)) - } else { - Poll::Pending - } - } - Poll::Ready(Ok(res)) => { - meta.iter_mut() - .skip(len) - .take(res) - .for_each(|m| m.addr.set_port(self.init_port)); - Poll::Ready(Ok(len + res)) - } - Poll::Ready(Err(e)) => { - tracing::trace!("poll cur conn err {}", e); - Poll::Ready(Err(e)) - } - } - } - - fn local_addr(&self) -> io::Result { - self.get_conn().1.local_addr() + Poll::Ready(Ok(packet_nums)) } - fn may_fragment(&self) -> bool { - self.get_conn().1.may_fragment() + fn local_addr(&self) -> std::io::Result { + self.inner.local_addr() } }