From b8f8555045042c746c8116859e9840a438abfbf6 Mon Sep 17 00:00:00 2001 From: iHsin Date: Thu, 21 Mar 2024 02:03:58 +0800 Subject: [PATCH 01/17] chore: ignore nix direnv files --- .gitignore | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.gitignore b/.gitignore index d3fb7bab9..f995f49d0 100644 --- a/.gitignore +++ b/.gitignore @@ -28,8 +28,13 @@ venv/ # don't check in this real config ignore*.yaml +config.yaml cache.db Country.mmdb ruleset/ rust-project.json + +# for NixOS direnv +.envrc +shell.nix From 74d89ac4b08e123a5344ebffa08a75312882693d Mon Sep 17 00:00:00 2001 From: iHsin Date: Thu, 21 Mar 2024 02:04:25 +0800 Subject: [PATCH 02/17] build: add tuic related deps --- Cargo.lock | 82 ++++++++++++++++++++++++++++++++++++++++++++ clash_lib/Cargo.toml | 10 ++++-- 2 files changed, 90 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b1ed56768..1ebd1fcde 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1083,8 +1083,10 @@ dependencies = [ "opentelemetry_sdk", "prost", "public-suffix", + "quinn", "rand", "regex", + "register-count", "rustls 0.21.10", "rustls-pemfile", "security-framework", @@ -1111,6 +1113,8 @@ dependencies = [ "tracing-oslog", "tracing-subscriber", "tracing-timing", + "tuic", + "tuic-quinn", "tun", "url", "uuid", @@ -4140,6 +4144,54 @@ version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" +[[package]] +name = "quinn" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cc2c5017e4b43d5995dcea317bc46c1e09404c0a9664d2908f7f02dfe943d75" +dependencies = [ + "bytes", + "futures-io", + "pin-project-lite", + "quinn-proto", + "quinn-udp", + "rustc-hash", + "rustls 0.21.10", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "quinn-proto" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "141bf7dfde2fbc246bfd3fe12f2455aa24b0fbd9af535d8c86c7bd1381ff2b1a" +dependencies = [ + "bytes", + "rand", + "ring 0.16.20", + "rustc-hash", + "rustls 0.21.10", + "slab", + "thiserror", + "tinyvec", + "tracing", +] + +[[package]] +name = "quinn-udp" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "055b4e778e8feb9f93c4e439f71dc2156ef13360b432b799e179a8c4cdf0b1d7" +dependencies = [ + "bytes", + "libc", + "socket2", + "tracing", + "windows-sys 0.48.0", +] + [[package]] name = "quote" version = "1.0.35" @@ -4297,6 +4349,12 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" +[[package]] +name = "register-count" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6d8b2af7d3e6675306d6757f10b4cf0b218a9fa6a0b44d668f2132684ae4893" + [[package]] name = "resolv-conf" version = "0.7.0" @@ -6517,6 +6575,30 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tuic" +version = "5.0.0" +dependencies = [ + "bytes", + "futures-util", + "parking_lot 0.12.1", + "register-count", + "thiserror", + "uuid", +] + +[[package]] +name = "tuic-quinn" +version = "0.1.0" +dependencies = [ + "bytes", + "futures-util", + "quinn", + "thiserror", + "tuic", + "uuid", +] + [[package]] name = "tun" version = "0.6.1" diff --git a/clash_lib/Cargo.toml b/clash_lib/Cargo.toml index ebd7ec2e5..3ad93e7dc 100644 --- a/clash_lib/Cargo.toml +++ b/clash_lib/Cargo.toml @@ -44,7 +44,7 @@ once_cell = "1.18.0" # opentelemetry opentelemetry = "0.22" opentelemetry_sdk = { version = "0.22", features = ["rt-tokio"] } -tracing-opentelemetry = "0.23" +tracing-opentelemetry = "0.23" opentelemetry-jaeger-propagator = "0.1.0" opentelemetry-jaeger = { version = "0.21", features = ["collector_client", "hyper_collector_client", "rt-tokio"] } opentelemetry-otlp = { version = "0.15.0", features = ["http-proto"] } @@ -84,7 +84,7 @@ hickory-proto = { version = "0.24", features = ["dns-over-rustls", "dns-over-htt # DoH # ideally we should make a CryptoProvider with boringssl and get rid of rings -rustls = { version = "0.21", features=["dangerous_configuration"] } +rustls = { version = "0.21", features=["dangerous_configuration", "quic"] } rustls-pemfile = "1.0.4" webpki-roots = "0.25" dhcproto = "0.11" @@ -108,6 +108,12 @@ murmur3 = "0.5.2" arti-client = { version = "0.14.0", default-features = false, features = ["tokio", "rustls", "compression", "static-sqlite"] } tor-rtcompat = { version = "0.10.0" } +# tuic +tuic = { path = "/home/itsusinn/Workspace/Code/tuic/tuic" } +tuic-quinn = { path = "/home/itsusinn/Workspace/Code/tuic/tuic-quinn" } +quinn = { version = "0.10", default-features = false, features = ["futures-io", "runtime-tokio", "tls-rustls"] } +register-count = "0.1.0" + console-subscriber = { version = "0.2.0" } tracing-timing = { version = "0.6.0" } criterion = { version = "0.5", features = ["html_reports", "async_tokio"], optional = true } From 302ea269057769d5676aeb1fd02b8926413814ae Mon Sep 17 00:00:00 2001 From: iHsin Date: Thu, 21 Mar 2024 02:06:26 +0800 Subject: [PATCH 03/17] feat: support parse tuic configuration --- clash_lib/src/app/outbound/manager.rs | 4 ++- .../proxy_provider/proxy_set_provider.rs | 1 + clash_lib/src/config/internal/proxy.rs | 16 +++++++++ clash_lib/src/proxy/converters/mod.rs | 1 + clash_lib/src/proxy/converters/tuic.rs | 35 +++++++++++++++++++ clash_lib/src/proxy/mod.rs | 2 ++ 6 files changed, 58 insertions(+), 1 deletion(-) create mode 100644 clash_lib/src/proxy/converters/tuic.rs diff --git a/clash_lib/src/app/outbound/manager.rs b/clash_lib/src/app/outbound/manager.rs index ec7ede30c..837cec724 100644 --- a/clash_lib/src/app/outbound/manager.rs +++ b/clash_lib/src/app/outbound/manager.rs @@ -215,7 +215,9 @@ impl OutboundManager { OutboundProxyProtocol::Tor(tor) => { handlers.insert(tor.name.clone(), tor.try_into()?); } - + OutboundProxyProtocol::Tuic(tuic) => { + handlers.insert(tuic.name.clone(), tuic.try_into()?); + } p => { unimplemented!("proto {} not supported yet", p); } diff --git a/clash_lib/src/app/remote_content_manager/providers/proxy_provider/proxy_set_provider.rs b/clash_lib/src/app/remote_content_manager/providers/proxy_provider/proxy_set_provider.rs index b28ac7fdd..e918ba611 100644 --- a/clash_lib/src/app/remote_content_manager/providers/proxy_provider/proxy_set_provider.rs +++ b/clash_lib/src/app/remote_content_manager/providers/proxy_provider/proxy_set_provider.rs @@ -104,6 +104,7 @@ impl ProxySetProvider { OutboundProxyProtocol::Vmess(vm) => vm.try_into(), OutboundProxyProtocol::Wireguard(wg) => wg.try_into(), OutboundProxyProtocol::Tor(tor) => tor.try_into(), + OutboundProxyProtocol::Tuic(tuic) => tuic.try_into(), }) .collect::, _>>(); Ok(proxies?) diff --git a/clash_lib/src/config/internal/proxy.rs b/clash_lib/src/config/internal/proxy.rs index 0603c7348..5ecb47b49 100644 --- a/clash_lib/src/config/internal/proxy.rs +++ b/clash_lib/src/config/internal/proxy.rs @@ -6,6 +6,7 @@ use serde::Deserialize; use serde_yaml::Value; use std::collections::HashMap; use std::fmt::{Display, Formatter}; +use uuid::Uuid; pub const PROXY_DIRECT: &str = "DIRECT"; pub const PROXY_REJECT: &str = "REJECT"; @@ -61,6 +62,8 @@ pub enum OutboundProxyProtocol { Wireguard(OutboundWireguard), #[serde(rename = "tor")] Tor(OutboundTor), + #[serde(rename = "tuic")] + Tuic(OutboundTuic), } impl OutboundProxyProtocol { @@ -74,6 +77,7 @@ impl OutboundProxyProtocol { OutboundProxyProtocol::Vmess(vmess) => &vmess.name, OutboundProxyProtocol::Wireguard(wireguard) => &wireguard.name, OutboundProxyProtocol::Tor(tor) => &tor.name, + OutboundProxyProtocol::Tuic(tuic) => &tuic.name, } } } @@ -105,6 +109,7 @@ impl Display for OutboundProxyProtocol { OutboundProxyProtocol::Vmess(_) => write!(f, "Vmess"), OutboundProxyProtocol::Wireguard(_) => write!(f, "Wireguard"), OutboundProxyProtocol::Tor(_) => write!(f, "Tor"), + OutboundProxyProtocol::Tuic(_) => write!(f, "Tuic"), } } } @@ -217,6 +222,17 @@ pub struct OutboundTor { pub name: String, } +#[derive(serde::Serialize, serde::Deserialize, Debug, Default)] +#[serde(rename_all = "kebab-case")] +pub struct OutboundTuic { + pub name: String, + pub server: String, + pub port: u16, + pub uuid: Uuid, + pub password: String, + pub udp_relay_mode: Option, +} + #[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] #[serde(tag = "type")] pub enum OutboundGroupProtocol { diff --git a/clash_lib/src/proxy/converters/mod.rs b/clash_lib/src/proxy/converters/mod.rs index 3b747b248..9d4cfc520 100644 --- a/clash_lib/src/proxy/converters/mod.rs +++ b/clash_lib/src/proxy/converters/mod.rs @@ -1,5 +1,6 @@ pub mod shadowsocks; pub mod tor; pub mod trojan; +pub mod tuic; pub mod vmess; pub mod wireguard; diff --git a/clash_lib/src/proxy/converters/tuic.rs b/clash_lib/src/proxy/converters/tuic.rs new file mode 100644 index 000000000..77317afe3 --- /dev/null +++ b/clash_lib/src/proxy/converters/tuic.rs @@ -0,0 +1,35 @@ +use crate::{ + config::internal::proxy::OutboundTuic, + proxy::{ + tuic::{Handler, HandlerOptions}, + AnyOutboundHandler, + }, +}; + +impl TryFrom for AnyOutboundHandler { + type Error = crate::Error; + + fn try_from(value: OutboundTuic) -> Result { + (&value).try_into() + } +} + +impl TryFrom<&OutboundTuic> for AnyOutboundHandler { + type Error = crate::Error; + + fn try_from(s: &OutboundTuic) -> Result { + Handler::new(HandlerOptions { + name: s.name.to_owned(), + server: s.server.to_owned(), + port: s.port, + uuid: s.uuid.to_owned(), + password: s.password.to_owned(), + udp_relay_mode: s.udp_relay_mode.to_owned().unwrap_or("native".to_string()), + disable_sni: false, + }) + .map_err(|e| { + // TODO find a better way + crate::Error::Operation(e.to_string()) + }) + } +} diff --git a/clash_lib/src/proxy/mod.rs b/clash_lib/src/proxy/mod.rs index 9b0fe0b2b..96c76b826 100644 --- a/clash_lib/src/proxy/mod.rs +++ b/clash_lib/src/proxy/mod.rs @@ -31,6 +31,7 @@ pub mod shadowsocks; pub mod socks; pub mod tor; pub mod trojan; +pub mod tuic; pub mod tun; pub mod utils; pub mod vmess; @@ -110,6 +111,7 @@ pub enum OutboundType { Trojan, WireGuard, Tor, + Tuic, #[serde(rename = "URLTest")] UrlTest, From 1e1b5780e82c2f2479f1a1d9fc1e0b6bc15835fe Mon Sep 17 00:00:00 2001 From: iHsin Date: Thu, 21 Mar 2024 02:07:34 +0800 Subject: [PATCH 04/17] fix(logging): shutdown should be logged after receiving signal --- clash_lib/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clash_lib/src/lib.rs b/clash_lib/src/lib.rs index 82f12cdd4..3910e8785 100644 --- a/clash_lib/src/lib.rs +++ b/clash_lib/src/lib.rs @@ -286,8 +286,8 @@ async fn start_async(opts: Options) -> Result<(), Error> { } runners.push(Box::pin(async move { - info!("receiving shutdown signal"); shutdown_rx.recv().await; + info!("receiving shutdown signal"); Ok(()) })); From b9fa4bd150d165749d21671a9b429242421a4712 Mon Sep 17 00:00:00 2001 From: iHsin Date: Thu, 21 Mar 2024 02:08:09 +0800 Subject: [PATCH 05/17] feat: support TUIC TCP relay --- clash_lib/src/proxy/tuic/handle_stream.rs | 101 ++++++++++ clash_lib/src/proxy/tuic/handle_task.rs | 160 ++++++++++++++++ clash_lib/src/proxy/tuic/mod.rs | 210 ++++++++++++++++++++ clash_lib/src/proxy/tuic/types.rs | 222 ++++++++++++++++++++++ 4 files changed, 693 insertions(+) create mode 100644 clash_lib/src/proxy/tuic/handle_stream.rs create mode 100644 clash_lib/src/proxy/tuic/handle_task.rs create mode 100644 clash_lib/src/proxy/tuic/mod.rs create mode 100644 clash_lib/src/proxy/tuic/types.rs diff --git a/clash_lib/src/proxy/tuic/handle_stream.rs b/clash_lib/src/proxy/tuic/handle_stream.rs new file mode 100644 index 000000000..51512b052 --- /dev/null +++ b/clash_lib/src/proxy/tuic/handle_stream.rs @@ -0,0 +1,101 @@ +use std::sync::atomic::Ordering; + +use bytes::Bytes; +use quinn::{RecvStream, SendStream, VarInt}; +use register_count::Register; +use tuic_quinn::Task; + +use crate::proxy::tuic::types::UdpRelayMode; + +use super::types::TuicConnection; + +impl TuicConnection { + pub async fn accept_uni_stream(&self) -> anyhow::Result<(RecvStream, Register)> { + let max = self.max_concurrent_uni_streams.load(Ordering::Relaxed); + + if self.remote_uni_stream_cnt.count() as u32 == max { + self.max_concurrent_uni_streams + .store(max * 2, Ordering::Relaxed); + + self.conn + .set_max_concurrent_uni_streams(VarInt::from(max * 2)); + } + + let recv = self.conn.accept_uni().await?; + let reg = self.remote_uni_stream_cnt.reg(); + Ok((recv, reg)) + } + + pub async fn accept_bi_stream(&self) -> anyhow::Result<(SendStream, RecvStream, Register)> { + let max = self.max_concurrent_bi_streams.load(Ordering::Relaxed); + + if self.remote_bi_stream_cnt.count() as u32 == max { + self.max_concurrent_bi_streams + .store(max * 2, Ordering::Relaxed); + + self.conn + .set_max_concurrent_bi_streams(VarInt::from(max * 2)); + } + + let (send, recv) = self.conn.accept_bi().await?; + let reg = self.remote_bi_stream_cnt.reg(); + Ok((send, recv, reg)) + } + + pub async fn accept_datagram(&self) -> anyhow::Result { + Ok(self.conn.read_datagram().await?) + } + + pub async fn handle_uni_stream(self, recv: RecvStream, _reg: Register) { + tracing::debug!("[relay] incoming unidirectional stream"); + + let res = match self.inner.accept_uni_stream(recv).await { + Err(err) => Err(anyhow!(err)), + Ok(Task::Packet(pkt)) => match self.udp_relay_mode { + UdpRelayMode::Quic => { + Self::handle_packet(pkt).await; + Ok(()) + } + UdpRelayMode::Native => Err(anyhow!("wrong packet source")), + }, + _ => unreachable!(), // already filtered in `tuic_quinn` + }; + + if let Err(err) = res { + tracing::warn!("[relay] incoming unidirectional stream error: {err}"); + } + } + + pub async fn handle_bi_stream(self, send: SendStream, recv: RecvStream, _reg: Register) { + tracing::debug!("[relay] incoming bidirectional stream"); + + let res = match self.inner.accept_bi_stream(send, recv).await { + Err(err) => Err::<(), _>(anyhow!(err)), + _ => unreachable!(), // already filtered in `tuic_quinn` + }; + + if let Err(err) = res { + tracing::warn!("[relay] incoming bidirectional stream error: {err}"); + } + } + + pub async fn handle_datagram(self, dg: Bytes) { + tracing::debug!("[relay] incoming datagram"); + + let res = match self.inner.accept_datagram(dg) { + Err(err) => Err(anyhow!(err)), + Ok(Task::Packet(pkt)) => match self.udp_relay_mode { + UdpRelayMode::Native => { + Self::handle_packet(pkt).await; + Ok(()) + } + UdpRelayMode::Quic => Err(anyhow!("wrong packet source")), + }, + _ => unreachable!(), // already filtered in `tuic_quinn` + }; + + if let Err(err) = res { + tracing::warn!("[relay] incoming datagram error: {err}"); + } + } +} diff --git a/clash_lib/src/proxy/tuic/handle_task.rs b/clash_lib/src/proxy/tuic/handle_task.rs new file mode 100644 index 000000000..e6deace95 --- /dev/null +++ b/clash_lib/src/proxy/tuic/handle_task.rs @@ -0,0 +1,160 @@ +use bytes::Bytes; +use quinn::ZeroRttAccepted; + +use anyhow::Result; +use std::time::Duration; +use tokio::time; +use tuic::Address; +use tuic_quinn::{Connect, Packet}; + +use super::types::{TuicConnection, UdpRelayMode}; + +impl TuicConnection { + pub async fn authenticate(self, zero_rtt_accepted: Option) { + if let Some(zero_rtt_accepted) = zero_rtt_accepted { + tracing::debug!( + "[relay] [authenticate] waiting for connection to be fully established" + ); + zero_rtt_accepted.await; + } + + tracing::debug!("[relay] [authenticate] sending authentication"); + + match self + .inner + .authenticate(self.uuid, self.password.clone()) + .await + { + Ok(()) => tracing::info!("[relay] [authenticate] {uuid}", uuid = self.uuid), + Err(err) => { + tracing::warn!("[relay] [authenticate] authentication sending error: {err}") + } + } + } + + pub async fn connect(&self, addr: Address) -> Result { + let addr_display = addr.to_string(); + tracing::info!("[relay] [connect] {addr_display}"); + + match self.inner.connect(addr).await { + Ok(conn) => Ok(conn), + Err(err) => { + tracing::warn!( + "[relay] [connect] failed initializing relay to {addr_display}: {err}" + ); + Err(anyhow!(err)) + } + } + } + + pub async fn packet(&self, pkt: Bytes, addr: Address, assoc_id: u16) -> anyhow::Result<()> { + let addr_display = addr.to_string(); + + match self.udp_relay_mode { + UdpRelayMode::Native => { + tracing::info!("[relay] [packet] [{assoc_id:#06x}] [to-native] to {addr_display}"); + match self.inner.packet_native(pkt, addr, assoc_id) { + Ok(()) => Ok(()), + Err(err) => { + tracing::warn!("[relay] [packet] [{assoc_id:#06x}] [to-native] to {addr_display}: {err}"); + Err(anyhow!(err)) + } + } + } + UdpRelayMode::Quic => { + tracing::info!("[relay] [packet] [{assoc_id:#06x}] [to-quic] {addr_display}"); + match self.inner.packet_quic(pkt, addr, assoc_id).await { + Ok(()) => Ok(()), + Err(err) => { + tracing::warn!( + "[relay] [packet] [{assoc_id:#06x}] [to-quic] to {addr_display}: {err}" + ); + Err(anyhow!(err)) + } + } + } + } + } + + pub async fn dissociate(&self, assoc_id: u16) -> anyhow::Result<()> { + tracing::info!("[relay] [dissociate] [{assoc_id:#06x}]"); + match self.inner.dissociate(assoc_id).await { + Ok(()) => Ok(()), + Err(err) => { + tracing::warn!("[relay] [dissociate] [{assoc_id:#06x}] {err}"); + Err(anyhow!(err)) + } + } + } + + pub async fn heartbeat(self, heartbeat: Duration) { + loop { + time::sleep(heartbeat).await; + + if self.is_closed() { + break; + } + + if self.inner.task_connect_count() + self.inner.task_associate_count() == 0 { + continue; + } + + match self.inner.heartbeat().await { + Ok(()) => tracing::debug!("[relay] [heartbeat]"), + Err(err) => tracing::warn!("[relay] [heartbeat] {err}"), + } + } + } + + pub async fn handle_packet(pkt: Packet) { + let assoc_id = pkt.assoc_id(); + let pkt_id = pkt.pkt_id(); + + let mode = if pkt.is_from_native() { + "native" + } else if pkt.is_from_quic() { + "quic" + } else { + unreachable!() + }; + + tracing::info!( + "[relay] [packet] [{assoc_id:#06x}] [from-{mode}] [{pkt_id:#06x}] fragment {frag_id}/{frag_total}", + frag_id = pkt.frag_id() + 1, + frag_total = pkt.frag_total(), + ); + todo!("relay not yet impl") + // match pkt.accept().await { + // Ok(Some((pkt, addr, _))) => { + // tracing::info!("[relay] [packet] [{assoc_id:#06x}] [from-{mode}] [{pkt_id:#06x}] from {addr}"); + + // let addr = match addr { + // Address::None => unreachable!(), + // Address::DomainAddress(domain, port) => { + // Socks5Address::DomainAddress(domain, port) + // } + // Address::SocketAddress(addr) => Socks5Address::SocketAddress(addr), + // }; + + // let session = SOCKS5_UDP_SESSIONS + // .get() + // .unwrap() + // .lock() + // .get(&assoc_id) + // .cloned(); + + // if let Some(session) = session { + // if let Err(err) = session.send(pkt, addr).await { + // tracing::warn!( + // "[relay] [packet] [{assoc_id:#06x}] [from-native] [{pkt_id:#06x}] failed sending packet to socks5 client: {err}", + // ); + // } + // } else { + // tracing::warn!("[relay] [packet] [{assoc_id:#06x}] [from-native] [{pkt_id:#06x}] unable to find socks5 associate session"); + // } + // } + // Ok(None) => {} + // Err(err) => tracing::warn!("[relay] [packet] [{assoc_id:#06x}] [from-native] [{pkt_id:#06x}] packet receiving error: {err}"), + // } + } +} diff --git a/clash_lib/src/proxy/tuic/mod.rs b/clash_lib/src/proxy/tuic/mod.rs new file mode 100644 index 000000000..0b826a92e --- /dev/null +++ b/clash_lib/src/proxy/tuic/mod.rs @@ -0,0 +1,210 @@ +mod handle_stream; +mod handle_task; +mod types; + +use std::{ + net::{Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}, + sync::Arc, + time::Duration, +}; + +use anyhow::Result; +use axum::async_trait; +use quinn::{EndpointConfig, TokioRuntime}; +use tokio_util::compat::FuturesAsyncReadCompatExt; +use uuid::Uuid; + +use crate::{ + app::{ + dispatcher::{ + BoxedChainedDatagram, BoxedChainedStream, ChainedStream, ChainedStreamWrapper, + }, + dns::ThreadSafeDNSResolver, + }, + common::tls::GLOBAL_ROOT_STORE, + proxy::tuic::types::{ServerAddr, TuicEndpoint}, + session::{Session, SocksAddr}, +}; + +use quinn::ClientConfig as QuinnConfig; +use quinn::Endpoint as QuinnEndpoint; +use quinn::TransportConfig as QuinnTransportConfig; +use quinn::{congestion::CubicConfig, VarInt}; +use tokio::sync::Mutex as AsyncMutex; + +use rustls::client::ClientConfig as TlsConfig; + +use self::types::TuicConnection; + +use super::{AnyOutboundHandler, AnyStream, OutboundHandler, OutboundType}; + +#[derive(Debug, Clone)] +pub struct HandlerOptions { + pub name: String, + pub server: String, + pub port: u16, + pub uuid: Uuid, + pub password: String, + pub udp_relay_mode: String, + pub disable_sni: bool, +} + +pub struct Handler { + opts: HandlerOptions, + ep: TuicEndpoint, + conn: AsyncMutex>, +} + +#[async_trait] +impl OutboundHandler for Handler { + fn name(&self) -> &str { + &self.opts.name + } + + fn proto(&self) -> OutboundType { + OutboundType::Tuic + } + + async fn remote_addr(&self) -> Option { + None + } + + async fn support_udp(&self) -> bool { + true + } + + async fn proxy_stream( + &self, + s: AnyStream, + _sess: &Session, + _resolver: ThreadSafeDNSResolver, + ) -> std::io::Result { + tracing::warn!("Proxy stream currently is direcrt connect"); + Ok(s) + } + + async fn connect_stream( + &self, + sess: &Session, + resolver: ThreadSafeDNSResolver, + ) -> std::io::Result { + self.do_connect_stream(sess, resolver).await.map_err(|e| { + tracing::error!("[tuic] {:?}", e); + std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) + }) + } + async fn connect_datagram( + &self, + sess: &Session, + resolver: ThreadSafeDNSResolver, + ) -> std::io::Result { + self.do_connect_datagram(sess, resolver).await.map_err(|e| { + tracing::error!("[tuic] {:?}", e); + std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) + }) + } +} + +impl Handler { + pub fn new(opts: HandlerOptions) -> anyhow::Result { + let mut crypto = TlsConfig::builder() + .with_safe_default_cipher_suites() + .with_safe_default_kx_groups() + .with_protocol_versions(&[&rustls::version::TLS13]) + .unwrap() + .with_root_certificates(GLOBAL_ROOT_STORE.clone()) + .with_no_client_auth(); + // aborted by peer: the cryptographic handshake failed: error 120: peer doesn't support any known protocol + crypto.alpn_protocols = vec!["h3".to_string()] + .into_iter() + .map(|alpn| alpn.into_bytes()) + .collect(); + crypto.enable_early_data = true; + crypto.enable_sni = !opts.disable_sni; + let mut quinn_config = QuinnConfig::new(Arc::new(crypto)); + let mut quinn_transport_config = QuinnTransportConfig::default(); + quinn_transport_config + .max_concurrent_bidi_streams(VarInt::from_u32(32)) + .max_concurrent_uni_streams(VarInt::from_u32(32)) + .send_window(16777216) + .stream_receive_window(VarInt::from_u32(8388608)) + .max_idle_timeout(None) + .congestion_controller_factory(Arc::new(CubicConfig::default())); + quinn_config.transport_config(Arc::new(quinn_transport_config)); + // Try to create an IPv4 socket as the placeholder first, if it fails, try IPv6. + let socket = UdpSocket::bind(SocketAddr::from((Ipv4Addr::UNSPECIFIED, 0))) + .or_else(|err| { + UdpSocket::bind(SocketAddr::from((Ipv6Addr::UNSPECIFIED, 0))).map_err(|_| err) + }) + .map_err(|err| anyhow!("failed to create endpoint UDP socket {}", err))?; + + let mut endpoint = QuinnEndpoint::new( + EndpointConfig::default(), + None, + socket, + Arc::new(TokioRuntime), + )?; + endpoint.set_default_client_config(quinn_config); + let endpoint = TuicEndpoint { + ep: endpoint, + server: ServerAddr::new(opts.server.clone(), opts.port, None), + uuid: opts.uuid, + password: Arc::from(opts.password.clone().into_bytes().into_boxed_slice()), + udp_relay_mode: types::UdpRelayMode::Native, + zero_rtt_handshake: false, + heartbeat: Duration::from_secs(3), + gc_interval: Duration::from_secs(3), + gc_lifetime: Duration::from_secs(15), + }; + Ok(Arc::new(Handler { + opts, + ep: endpoint, + conn: AsyncMutex::new(None), + })) + } + async fn get_conn(&self) -> Result { + let fut = async { + let mut guard = self.conn.lock().await; + if guard.is_none() { + // init + *guard = Some(self.ep.connect().await?); + } + let conn = guard.take().unwrap(); + let conn = if conn.is_closed() { + // reconnect + self.ep.connect().await? + } else { + conn + }; + // TODO TuicConnection is huge, is it necessary to clone it? + *guard = Some(conn.clone()); + Ok(conn) + }; + tokio::time::timeout(Duration::from_secs(3), fut).await? + } + + async fn do_connect_stream( + &self, + sess: &Session, + _resolver: ThreadSafeDNSResolver, + ) -> Result { + let conn = self.get_conn().await?; + let dest = match sess.destination.clone() { + SocksAddr::Ip(addr) => tuic::Address::SocketAddress(addr), + SocksAddr::Domain(domain, port) => tuic::Address::DomainAddress(domain, port), + }; + let relay = conn.connect(dest).await?.compat(); + + let s = ChainedStreamWrapper::new(relay); + s.append_to_chain(self.name()).await; + Ok(Box::new(s)) + } + + async fn do_connect_datagram( + &self, + sess: &Session, + _resolver: ThreadSafeDNSResolver, + ) -> std::io::Result { + todo!() + } +} diff --git a/clash_lib/src/proxy/tuic/types.rs b/clash_lib/src/proxy/tuic/types.rs new file mode 100644 index 000000000..d1637a527 --- /dev/null +++ b/clash_lib/src/proxy/tuic/types.rs @@ -0,0 +1,222 @@ +use anyhow::Result; +use quinn::Connection as QuinnConnection; +use quinn::{Endpoint as QuinnEndpoint, ZeroRttAccepted}; +use register_count::Counter; +use std::{ + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}, + str::FromStr, + sync::{atomic::AtomicU32, Arc}, + time::Duration, +}; +use tuic_quinn::Connection as InnerConnection; +use uuid::Uuid; + +pub struct TuicEndpoint { + pub ep: QuinnEndpoint, + pub server: ServerAddr, + pub uuid: Uuid, + pub password: Arc<[u8]>, + pub udp_relay_mode: UdpRelayMode, + pub zero_rtt_handshake: bool, + pub heartbeat: Duration, + pub gc_interval: Duration, + pub gc_lifetime: Duration, +} +impl TuicEndpoint { + pub async fn connect(&self) -> Result { + tracing::trace!("Connect Tuic"); + let mut last_err = None; + + for addr in self.server.resolve().await? { + let connect_to = async { + let match_ipv4 = + addr.is_ipv4() && self.ep.local_addr().map_or(false, |addr| addr.is_ipv4()); + let match_ipv6 = + addr.is_ipv6() && self.ep.local_addr().map_or(false, |addr| addr.is_ipv6()); + + if !match_ipv4 && !match_ipv6 { + let bind_addr = if addr.is_ipv4() { + SocketAddr::from((Ipv4Addr::UNSPECIFIED, 0)) + } else { + SocketAddr::from((Ipv6Addr::UNSPECIFIED, 0)) + }; + + self.ep + .rebind(UdpSocket::bind(bind_addr).map_err(|err| { + anyhow!("failed to create endpoint UDP socket {}", err) + })?) + .map_err(|err| anyhow!("failed to rebind endpoint UDP socket {}", err))?; + } + + tracing::trace!("Connect to {} {}", addr, self.server.server_name()); + let conn = self.ep.connect(addr, self.server.server_name())?; + let (conn, zero_rtt_accepted) = if self.zero_rtt_handshake { + match conn.into_0rtt() { + Ok((conn, zero_rtt_accepted)) => (conn, Some(zero_rtt_accepted)), + Err(conn) => (conn.await?, None), + } + } else { + (conn.await?, None) + }; + + Ok((conn, zero_rtt_accepted)) + }; + + match connect_to.await { + Ok((conn, zero_rtt_accepted)) => { + return Ok(TuicConnection::new( + conn, + zero_rtt_accepted, + self.udp_relay_mode, + self.uuid, + self.password.clone(), + self.heartbeat, + self.gc_interval, + self.gc_lifetime, + )); + } + Err(err) => last_err = Some(err), + } + } + Err(last_err.unwrap_or(anyhow!("dns resolve"))) + } +} + +#[derive(Clone)] +pub struct TuicConnection { + pub conn: QuinnConnection, + pub inner: InnerConnection, + pub uuid: Uuid, + pub password: Arc<[u8]>, + pub udp_relay_mode: UdpRelayMode, + pub remote_uni_stream_cnt: Counter, + pub remote_bi_stream_cnt: Counter, + pub max_concurrent_uni_streams: Arc, + pub max_concurrent_bi_streams: Arc, +} + +impl TuicConnection { + pub fn is_closed(&self) -> bool { + self.conn.close_reason().is_some() + } + fn new( + conn: QuinnConnection, + zero_rtt_accepted: Option, + udp_relay_mode: UdpRelayMode, + uuid: Uuid, + password: Arc<[u8]>, + heartbeat: Duration, + gc_interval: Duration, + gc_lifetime: Duration, + ) -> Self { + let conn = Self { + conn: conn.clone(), + inner: InnerConnection::::new(conn), + uuid, + password, + udp_relay_mode, + remote_uni_stream_cnt: Counter::new(), + remote_bi_stream_cnt: Counter::new(), + max_concurrent_uni_streams: Arc::new(AtomicU32::new(32)), + max_concurrent_bi_streams: Arc::new(AtomicU32::new(32)), + }; + + tokio::spawn( + conn.clone() + .init(zero_rtt_accepted, heartbeat, gc_interval, gc_lifetime), + ); + + conn + } + async fn init( + self, + zero_rtt_accepted: Option, + heartbeat: Duration, + gc_interval: Duration, + gc_lifetime: Duration, + ) { + tracing::info!("[relay] connection established"); + + tokio::spawn(self.clone().authenticate(zero_rtt_accepted)); + tokio::spawn(self.clone().heartbeat(heartbeat)); + tokio::spawn(self.clone().collect_garbage(gc_interval, gc_lifetime)); + + let err = loop { + tokio::select! { + res = self.accept_uni_stream() => match res { + Ok((recv, reg)) => tokio::spawn(self.clone().handle_uni_stream(recv, reg)), + Err(err) => break err, + }, + res = self.accept_bi_stream() => match res { + Ok((send, recv, reg)) => tokio::spawn(self.clone().handle_bi_stream(send, recv, reg)), + Err(err) => break err, + }, + res = self.accept_datagram() => match res { + Ok(dg) => tokio::spawn(self.clone().handle_datagram(dg)), + Err(err) => break err, + }, + }; + }; + + tracing::warn!("[relay] connection error: {err}"); + } + + async fn collect_garbage(self, gc_interval: Duration, gc_lifetime: Duration) { + loop { + tokio::time::sleep(gc_interval).await; + + if self.is_closed() { + break; + } + + tracing::debug!("[relay] packet fragment garbage collecting event"); + self.inner.collect_garbage(gc_lifetime); + } + } +} + +pub struct ServerAddr { + domain: String, + port: u16, + ip: Option, +} +impl ServerAddr { + pub fn new(domain: String, port: u16, ip: Option) -> Self { + Self { domain, port, ip } + } + + pub fn server_name(&self) -> &str { + &self.domain + } + // TODO change to clash dns? + pub async fn resolve(&self) -> Result> { + if let Some(ip) = self.ip { + Ok(vec![SocketAddr::from((ip, self.port))].into_iter()) + } else { + Ok(tokio::net::lookup_host((self.domain.as_str(), self.port)) + .await? + .collect::>() + .into_iter()) + } + } +} + +#[derive(Clone, Copy)] +pub enum UdpRelayMode { + Native, + Quic, +} + +impl FromStr for UdpRelayMode { + type Err = &'static str; + + fn from_str(s: &str) -> Result { + if s.eq_ignore_ascii_case("native") { + Ok(Self::Native) + } else if s.eq_ignore_ascii_case("quic") { + Ok(Self::Quic) + } else { + Err("invalid UDP relay mode") + } + } +} From 5644b4d5b1798e465f3b2f9275ea4dc35a884cdf Mon Sep 17 00:00:00 2001 From: iHsin Date: Thu, 21 Mar 2024 16:25:56 +0800 Subject: [PATCH 06/17] feat(tuic): allow more configuration and remove hardcode vars --- clash_lib/src/config/internal/proxy.rs | 22 ++++++++++++ clash_lib/src/proxy/converters/tuic.rs | 33 ++++++++++++++++-- clash_lib/src/proxy/tuic/mod.rs | 46 +++++++++++++++++--------- clash_lib/src/proxy/tuic/types.rs | 33 ++++++++++++++++-- 4 files changed, 114 insertions(+), 20 deletions(-) diff --git a/clash_lib/src/config/internal/proxy.rs b/clash_lib/src/config/internal/proxy.rs index 5ecb47b49..233b9a37c 100644 --- a/clash_lib/src/config/internal/proxy.rs +++ b/clash_lib/src/config/internal/proxy.rs @@ -230,7 +230,29 @@ pub struct OutboundTuic { pub port: u16, pub uuid: Uuid, pub password: String, + /// override field 'server' dns record, not used for now + pub ip: Option, + pub heartbeat_interval: Option, + /// h3 + pub alpn: Option>, + pub disable_sni: Option, + pub reduct_rtt: Option, + /// millis + pub request_timeout: Option, pub udp_relay_mode: Option, + pub congestion_controller: Option, + /// bytes + pub max_udp_relay_packet_size: Option, + pub fast_open: Option, + pub skip_cert_verify: Option, + pub max_open_stream: Option, + pub sni: Option, + /// millis + pub gc_interval: Option, + /// millis + pub gc_lifetime: Option, + pub send_window: Option, + pub receive_window: Option, } #[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] diff --git a/clash_lib/src/proxy/converters/tuic.rs b/clash_lib/src/proxy/converters/tuic.rs index 77317afe3..4ef02cf33 100644 --- a/clash_lib/src/proxy/converters/tuic.rs +++ b/clash_lib/src/proxy/converters/tuic.rs @@ -1,7 +1,11 @@ +use std::time::Duration; + +use quinn::VarInt; + use crate::{ config::internal::proxy::OutboundTuic, proxy::{ - tuic::{Handler, HandlerOptions}, + tuic::{types::CongestionControl, Handler, HandlerOptions}, AnyOutboundHandler, }, }; @@ -25,7 +29,32 @@ impl TryFrom<&OutboundTuic> for AnyOutboundHandler { uuid: s.uuid.to_owned(), password: s.password.to_owned(), udp_relay_mode: s.udp_relay_mode.to_owned().unwrap_or("native".to_string()), - disable_sni: false, + disable_sni: s.disable_sni.unwrap_or(false), + alpn: s + .alpn + .clone() + .map(|v| v.into_iter().map(|alpn| alpn.into_bytes()).collect()) + .unwrap_or_default(), + heartbeat_interval: Duration::from_millis(s.heartbeat_interval.unwrap_or(3000)), + reduct_rtt: s.reduct_rtt.unwrap_or(false), + request_timeout: Duration::from_millis(s.request_timeout.unwrap_or(8000)), + congestion_controller: s + .congestion_controller + .clone() + .map(|v| CongestionControl::from(v.as_str())) + .unwrap_or_default(), + max_udp_relay_packet_size: s.max_udp_relay_packet_size.unwrap_or(1500), + max_open_stream: VarInt::from_u64(s.max_open_stream.unwrap_or(32)) + .unwrap_or(VarInt::MAX), + ip: s.ip.clone(), + fast_open: s.fast_open.clone(), + skip_cert_verify: s.skip_cert_verify.unwrap_or(false), + sni: s.sni.clone(), + gc_interval: Duration::from_millis(s.gc_interval.unwrap_or(3000)), + gc_lifetime: Duration::from_millis(s.gc_lifetime.unwrap_or(15000)), + send_window: s.send_window.unwrap_or(8 * 1024 * 1024 * 2), + receive_window: VarInt::from_u64(s.receive_window.unwrap_or(8 * 1024 * 1024)) + .unwrap_or(VarInt::MAX), }) .map_err(|e| { // TODO find a better way diff --git a/clash_lib/src/proxy/tuic/mod.rs b/clash_lib/src/proxy/tuic/mod.rs index 0b826a92e..c1057708e 100644 --- a/clash_lib/src/proxy/tuic/mod.rs +++ b/clash_lib/src/proxy/tuic/mod.rs @@ -1,6 +1,6 @@ mod handle_stream; mod handle_task; -mod types; +pub(crate) mod types; use std::{ net::{Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}, @@ -34,7 +34,7 @@ use tokio::sync::Mutex as AsyncMutex; use rustls::client::ClientConfig as TlsConfig; -use self::types::TuicConnection; +use self::types::{CongestionControl, TuicConnection}; use super::{AnyOutboundHandler, AnyStream, OutboundHandler, OutboundType}; @@ -47,6 +47,23 @@ pub struct HandlerOptions { pub password: String, pub udp_relay_mode: String, pub disable_sni: bool, + pub alpn: Vec>, + pub heartbeat_interval: Duration, + pub reduct_rtt: bool, + pub request_timeout: Duration, + pub congestion_controller: CongestionControl, + pub max_udp_relay_packet_size: u64, + pub max_open_stream: VarInt, + pub gc_interval: Duration, + pub gc_lifetime: Duration, + pub send_window: u64, + pub receive_window: VarInt, + + /// not used + pub ip: Option, + pub fast_open: Option, + pub skip_cert_verify: bool, + pub sni: Option, } pub struct Handler { @@ -115,19 +132,16 @@ impl Handler { .with_root_certificates(GLOBAL_ROOT_STORE.clone()) .with_no_client_auth(); // aborted by peer: the cryptographic handshake failed: error 120: peer doesn't support any known protocol - crypto.alpn_protocols = vec!["h3".to_string()] - .into_iter() - .map(|alpn| alpn.into_bytes()) - .collect(); + crypto.alpn_protocols = opts.alpn.clone(); crypto.enable_early_data = true; crypto.enable_sni = !opts.disable_sni; let mut quinn_config = QuinnConfig::new(Arc::new(crypto)); let mut quinn_transport_config = QuinnTransportConfig::default(); quinn_transport_config - .max_concurrent_bidi_streams(VarInt::from_u32(32)) - .max_concurrent_uni_streams(VarInt::from_u32(32)) - .send_window(16777216) - .stream_receive_window(VarInt::from_u32(8388608)) + .max_concurrent_bidi_streams(opts.max_open_stream) + .max_concurrent_uni_streams(opts.max_open_stream) + .send_window(opts.send_window) + .stream_receive_window(opts.receive_window) .max_idle_timeout(None) .congestion_controller_factory(Arc::new(CubicConfig::default())); quinn_config.transport_config(Arc::new(quinn_transport_config)); @@ -151,10 +165,10 @@ impl Handler { uuid: opts.uuid, password: Arc::from(opts.password.clone().into_bytes().into_boxed_slice()), udp_relay_mode: types::UdpRelayMode::Native, - zero_rtt_handshake: false, - heartbeat: Duration::from_secs(3), - gc_interval: Duration::from_secs(3), - gc_lifetime: Duration::from_secs(15), + zero_rtt_handshake: opts.reduct_rtt, + heartbeat: opts.heartbeat_interval, + gc_interval: opts.gc_interval, + gc_lifetime: opts.gc_lifetime, }; Ok(Arc::new(Handler { opts, @@ -180,7 +194,7 @@ impl Handler { *guard = Some(conn.clone()); Ok(conn) }; - tokio::time::timeout(Duration::from_secs(3), fut).await? + tokio::time::timeout(self.opts.request_timeout, fut).await? } async fn do_connect_stream( @@ -202,7 +216,7 @@ impl Handler { async fn do_connect_datagram( &self, - sess: &Session, + _sess: &Session, _resolver: ThreadSafeDNSResolver, ) -> std::io::Result { todo!() diff --git a/clash_lib/src/proxy/tuic/types.rs b/clash_lib/src/proxy/tuic/types.rs index d1637a527..ac95c52f4 100644 --- a/clash_lib/src/proxy/tuic/types.rs +++ b/clash_lib/src/proxy/tuic/types.rs @@ -162,8 +162,9 @@ impl TuicConnection { } async fn collect_garbage(self, gc_interval: Duration, gc_lifetime: Duration) { + let mut interval = tokio::time::interval(gc_interval); loop { - tokio::time::sleep(gc_interval).await; + interval.tick().await; if self.is_closed() { break; @@ -201,7 +202,7 @@ impl ServerAddr { } } -#[derive(Clone, Copy)] +#[derive(Debug, Clone, Copy)] pub enum UdpRelayMode { Native, Quic, @@ -220,3 +221,31 @@ impl FromStr for UdpRelayMode { } } } + +#[derive(Debug, Clone, Copy)] +pub enum CongestionControl { + Cubic, + NewReno, + Bbr, +} +impl From<&str> for CongestionControl { + fn from(s: &str) -> Self { + if s.eq_ignore_ascii_case("cubic") { + Self::Cubic + } else if s.eq_ignore_ascii_case("new_reno") || s.eq_ignore_ascii_case("newreno") { + Self::NewReno + } else if s.eq_ignore_ascii_case("bbr") { + Self::Bbr + } else { + tracing::warn!("[tuic] todo"); + // Err("invalid congestion control") + Self::default() + } + } +} + +impl Default for CongestionControl { + fn default() -> Self { + Self::Cubic + } +} From 23b26fd0e84fa6920a598f73d79292c7bb8b0073 Mon Sep 17 00:00:00 2001 From: iHsin Date: Thu, 21 Mar 2024 18:05:53 +0800 Subject: [PATCH 07/17] chore: clean logging and error handling --- clash_lib/src/proxy/converters/tuic.rs | 6 +---- clash_lib/src/proxy/tuic/handle_task.rs | 32 ++++++++++++------------- clash_lib/src/proxy/tuic/mod.rs | 17 +++++++------ clash_lib/src/proxy/tuic/types.rs | 11 ++++----- 4 files changed, 30 insertions(+), 36 deletions(-) diff --git a/clash_lib/src/proxy/converters/tuic.rs b/clash_lib/src/proxy/converters/tuic.rs index 4ef02cf33..74036404f 100644 --- a/clash_lib/src/proxy/converters/tuic.rs +++ b/clash_lib/src/proxy/converters/tuic.rs @@ -47,7 +47,7 @@ impl TryFrom<&OutboundTuic> for AnyOutboundHandler { max_open_stream: VarInt::from_u64(s.max_open_stream.unwrap_or(32)) .unwrap_or(VarInt::MAX), ip: s.ip.clone(), - fast_open: s.fast_open.clone(), + fast_open: s.fast_open, skip_cert_verify: s.skip_cert_verify.unwrap_or(false), sni: s.sni.clone(), gc_interval: Duration::from_millis(s.gc_interval.unwrap_or(3000)), @@ -56,9 +56,5 @@ impl TryFrom<&OutboundTuic> for AnyOutboundHandler { receive_window: VarInt::from_u64(s.receive_window.unwrap_or(8 * 1024 * 1024)) .unwrap_or(VarInt::MAX), }) - .map_err(|e| { - // TODO find a better way - crate::Error::Operation(e.to_string()) - }) } } diff --git a/clash_lib/src/proxy/tuic/handle_task.rs b/clash_lib/src/proxy/tuic/handle_task.rs index e6deace95..ee3f92afd 100644 --- a/clash_lib/src/proxy/tuic/handle_task.rs +++ b/clash_lib/src/proxy/tuic/handle_task.rs @@ -13,34 +13,34 @@ impl TuicConnection { pub async fn authenticate(self, zero_rtt_accepted: Option) { if let Some(zero_rtt_accepted) = zero_rtt_accepted { tracing::debug!( - "[relay] [authenticate] waiting for connection to be fully established" + "[authenticate] waiting for connection to be fully established" ); zero_rtt_accepted.await; } - tracing::debug!("[relay] [authenticate] sending authentication"); + tracing::debug!("[authenticate] sending authentication"); match self .inner .authenticate(self.uuid, self.password.clone()) .await { - Ok(()) => tracing::info!("[relay] [authenticate] {uuid}", uuid = self.uuid), + Ok(()) => tracing::info!("[authenticate] {uuid}", uuid = self.uuid), Err(err) => { - tracing::warn!("[relay] [authenticate] authentication sending error: {err}") + tracing::warn!("[authenticate] authentication sending error: {err}") } } } pub async fn connect(&self, addr: Address) -> Result { let addr_display = addr.to_string(); - tracing::info!("[relay] [connect] {addr_display}"); + tracing::info!("[connect] {addr_display}"); match self.inner.connect(addr).await { Ok(conn) => Ok(conn), Err(err) => { tracing::warn!( - "[relay] [connect] failed initializing relay to {addr_display}: {err}" + "[connect] failed initializing relay to {addr_display}: {err}" ); Err(anyhow!(err)) } @@ -52,22 +52,22 @@ impl TuicConnection { match self.udp_relay_mode { UdpRelayMode::Native => { - tracing::info!("[relay] [packet] [{assoc_id:#06x}] [to-native] to {addr_display}"); + tracing::info!("[packet] [{assoc_id:#06x}] [to-native] to {addr_display}"); match self.inner.packet_native(pkt, addr, assoc_id) { Ok(()) => Ok(()), Err(err) => { - tracing::warn!("[relay] [packet] [{assoc_id:#06x}] [to-native] to {addr_display}: {err}"); + tracing::warn!("[packet] [{assoc_id:#06x}] [to-native] to {addr_display}: {err}"); Err(anyhow!(err)) } } } UdpRelayMode::Quic => { - tracing::info!("[relay] [packet] [{assoc_id:#06x}] [to-quic] {addr_display}"); + tracing::info!("[packet] [{assoc_id:#06x}] [to-quic] {addr_display}"); match self.inner.packet_quic(pkt, addr, assoc_id).await { Ok(()) => Ok(()), Err(err) => { tracing::warn!( - "[relay] [packet] [{assoc_id:#06x}] [to-quic] to {addr_display}: {err}" + "[packet] [{assoc_id:#06x}] [to-quic] to {addr_display}: {err}" ); Err(anyhow!(err)) } @@ -77,11 +77,11 @@ impl TuicConnection { } pub async fn dissociate(&self, assoc_id: u16) -> anyhow::Result<()> { - tracing::info!("[relay] [dissociate] [{assoc_id:#06x}]"); + tracing::info!("[dissociate] [{assoc_id:#06x}]"); match self.inner.dissociate(assoc_id).await { Ok(()) => Ok(()), Err(err) => { - tracing::warn!("[relay] [dissociate] [{assoc_id:#06x}] {err}"); + tracing::warn!("[dissociate] [{assoc_id:#06x}] {err}"); Err(anyhow!(err)) } } @@ -100,8 +100,8 @@ impl TuicConnection { } match self.inner.heartbeat().await { - Ok(()) => tracing::debug!("[relay] [heartbeat]"), - Err(err) => tracing::warn!("[relay] [heartbeat] {err}"), + Ok(()) => tracing::debug!("[heartbeat]"), + Err(err) => tracing::warn!("[heartbeat] {err}"), } } } @@ -119,11 +119,11 @@ impl TuicConnection { }; tracing::info!( - "[relay] [packet] [{assoc_id:#06x}] [from-{mode}] [{pkt_id:#06x}] fragment {frag_id}/{frag_total}", + "[packet] [{assoc_id:#06x}] [from-{mode}] [{pkt_id:#06x}] fragment {frag_id}/{frag_total}", frag_id = pkt.frag_id() + 1, frag_total = pkt.frag_total(), ); - todo!("relay not yet impl") + todo!() // match pkt.accept().await { // Ok(Some((pkt, addr, _))) => { // tracing::info!("[relay] [packet] [{assoc_id:#06x}] [from-{mode}] [{pkt_id:#06x}] from {addr}"); diff --git a/clash_lib/src/proxy/tuic/mod.rs b/clash_lib/src/proxy/tuic/mod.rs index c1057708e..cd14efa1f 100644 --- a/clash_lib/src/proxy/tuic/mod.rs +++ b/clash_lib/src/proxy/tuic/mod.rs @@ -106,7 +106,7 @@ impl OutboundHandler for Handler { resolver: ThreadSafeDNSResolver, ) -> std::io::Result { self.do_connect_stream(sess, resolver).await.map_err(|e| { - tracing::error!("[tuic] {:?}", e); + tracing::error!("{:?}", e); std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) }) } @@ -116,14 +116,14 @@ impl OutboundHandler for Handler { resolver: ThreadSafeDNSResolver, ) -> std::io::Result { self.do_connect_datagram(sess, resolver).await.map_err(|e| { - tracing::error!("[tuic] {:?}", e); + tracing::error!("{:?}", e); std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) }) } } impl Handler { - pub fn new(opts: HandlerOptions) -> anyhow::Result { + pub fn new(opts: HandlerOptions) -> Result { let mut crypto = TlsConfig::builder() .with_safe_default_cipher_suites() .with_safe_default_kx_groups() @@ -146,11 +146,10 @@ impl Handler { .congestion_controller_factory(Arc::new(CubicConfig::default())); quinn_config.transport_config(Arc::new(quinn_transport_config)); // Try to create an IPv4 socket as the placeholder first, if it fails, try IPv6. - let socket = UdpSocket::bind(SocketAddr::from((Ipv4Addr::UNSPECIFIED, 0))) - .or_else(|err| { + let socket = + UdpSocket::bind(SocketAddr::from((Ipv4Addr::UNSPECIFIED, 0))).or_else(|err| { UdpSocket::bind(SocketAddr::from((Ipv6Addr::UNSPECIFIED, 0))).map_err(|_| err) - }) - .map_err(|err| anyhow!("failed to create endpoint UDP socket {}", err))?; + })?; let mut endpoint = QuinnEndpoint::new( EndpointConfig::default(), @@ -170,7 +169,7 @@ impl Handler { gc_interval: opts.gc_interval, gc_lifetime: opts.gc_lifetime, }; - Ok(Arc::new(Handler { + Ok(Arc::new(Self { opts, ep: endpoint, conn: AsyncMutex::new(None), @@ -190,7 +189,7 @@ impl Handler { } else { conn }; - // TODO TuicConnection is huge, is it necessary to clone it? + // TODO TuicConnection is huge, is it necessary to clone it? If it is, should we use Arc ? *guard = Some(conn.clone()); Ok(conn) }; diff --git a/clash_lib/src/proxy/tuic/types.rs b/clash_lib/src/proxy/tuic/types.rs index ac95c52f4..e5e4e1c22 100644 --- a/clash_lib/src/proxy/tuic/types.rs +++ b/clash_lib/src/proxy/tuic/types.rs @@ -24,7 +24,6 @@ pub struct TuicEndpoint { } impl TuicEndpoint { pub async fn connect(&self) -> Result { - tracing::trace!("Connect Tuic"); let mut last_err = None; for addr in self.server.resolve().await? { @@ -117,6 +116,7 @@ impl TuicConnection { udp_relay_mode, remote_uni_stream_cnt: Counter::new(), remote_bi_stream_cnt: Counter::new(), + // TODO: seems tuic dynamicly adjust the size of max concurrent streams, is it necessary to configure the stream size? max_concurrent_uni_streams: Arc::new(AtomicU32::new(32)), max_concurrent_bi_streams: Arc::new(AtomicU32::new(32)), }; @@ -135,7 +135,7 @@ impl TuicConnection { gc_interval: Duration, gc_lifetime: Duration, ) { - tracing::info!("[relay] connection established"); + tracing::info!("connection established"); tokio::spawn(self.clone().authenticate(zero_rtt_accepted)); tokio::spawn(self.clone().heartbeat(heartbeat)); @@ -158,7 +158,7 @@ impl TuicConnection { }; }; - tracing::warn!("[relay] connection error: {err}"); + tracing::warn!("connection error: {err}"); } async fn collect_garbage(self, gc_interval: Duration, gc_lifetime: Duration) { @@ -170,7 +170,7 @@ impl TuicConnection { break; } - tracing::debug!("[relay] packet fragment garbage collecting event"); + tracing::debug!("[gc] packet fragment garbage collecting event"); self.inner.collect_garbage(gc_lifetime); } } @@ -237,8 +237,7 @@ impl From<&str> for CongestionControl { } else if s.eq_ignore_ascii_case("bbr") { Self::Bbr } else { - tracing::warn!("[tuic] todo"); - // Err("invalid congestion control") + tracing::warn!("Unknown congestion controller {s}. Use default controller"); Self::default() } } From 570474d7774314e2ebd2896964a9680210242bf0 Mon Sep 17 00:00:00 2001 From: iHsin Date: Fri, 22 Mar 2024 22:42:35 +0800 Subject: [PATCH 08/17] feat(tuic): support UDP relay --- clash_lib/src/proxy/tuic/compat.rs | 41 +++++++++ clash_lib/src/proxy/tuic/handle_task.rs | 57 +++++-------- clash_lib/src/proxy/tuic/mod.rs | 109 ++++++++++++++++++++---- clash_lib/src/proxy/tuic/types.rs | 9 +- 4 files changed, 163 insertions(+), 53 deletions(-) create mode 100644 clash_lib/src/proxy/tuic/compat.rs diff --git a/clash_lib/src/proxy/tuic/compat.rs b/clash_lib/src/proxy/tuic/compat.rs new file mode 100644 index 000000000..997796060 --- /dev/null +++ b/clash_lib/src/proxy/tuic/compat.rs @@ -0,0 +1,41 @@ +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use futures::{Sink, SinkExt, Stream}; + +use crate::{common::errors::new_io_error, proxy::datagram::UdpPacket}; + +use super::TuicDatagramOutbound; + +impl Sink for TuicDatagramOutbound { + type Error = std::io::Error; + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.send_tx + .poll_ready_unpin(cx) + .map_err(|v| new_io_error(&format!("{v:?}"))) + } + fn start_send(mut self: Pin<&mut Self>, item: UdpPacket) -> Result<(), Self::Error> { + self.send_tx + .start_send_unpin(item) + .map_err(|v| new_io_error(&format!("{v:?}"))) + } + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.send_tx + .poll_flush_unpin(cx) + .map_err(|v| new_io_error(&format!("{v:?}"))) + } + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.send_tx + .poll_close_unpin(cx) + .map_err(|v| new_io_error(&format!("{v:?}"))) + } +} + +impl Stream for TuicDatagramOutbound { + type Item = UdpPacket; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.recv_rx.poll_recv(cx) + } +} diff --git a/clash_lib/src/proxy/tuic/handle_task.rs b/clash_lib/src/proxy/tuic/handle_task.rs index ee3f92afd..f3b68b913 100644 --- a/clash_lib/src/proxy/tuic/handle_task.rs +++ b/clash_lib/src/proxy/tuic/handle_task.rs @@ -119,42 +119,31 @@ impl TuicConnection { }; tracing::info!( - "[packet] [{assoc_id:#06x}] [from-{mode}] [{pkt_id:#06x}] fragment {frag_id}/{frag_total}", + "[udp] [{assoc_id:#06x}] [from-{mode}] [{pkt_id:#06x}] fragment {frag_id}/{frag_total}", frag_id = pkt.frag_id() + 1, frag_total = pkt.frag_total(), ); - todo!() - // match pkt.accept().await { - // Ok(Some((pkt, addr, _))) => { - // tracing::info!("[relay] [packet] [{assoc_id:#06x}] [from-{mode}] [{pkt_id:#06x}] from {addr}"); - - // let addr = match addr { - // Address::None => unreachable!(), - // Address::DomainAddress(domain, port) => { - // Socks5Address::DomainAddress(domain, port) - // } - // Address::SocketAddress(addr) => Socks5Address::SocketAddress(addr), - // }; - - // let session = SOCKS5_UDP_SESSIONS - // .get() - // .unwrap() - // .lock() - // .get(&assoc_id) - // .cloned(); - - // if let Some(session) = session { - // if let Err(err) = session.send(pkt, addr).await { - // tracing::warn!( - // "[relay] [packet] [{assoc_id:#06x}] [from-native] [{pkt_id:#06x}] failed sending packet to socks5 client: {err}", - // ); - // } - // } else { - // tracing::warn!("[relay] [packet] [{assoc_id:#06x}] [from-native] [{pkt_id:#06x}] unable to find socks5 associate session"); - // } - // } - // Ok(None) => {} - // Err(err) => tracing::warn!("[relay] [packet] [{assoc_id:#06x}] [from-native] [{pkt_id:#06x}] packet receiving error: {err}"), - // } + match pkt.accept().await { + Ok(Some((data, remote_addr, _))) => { + tracing::info!("[udp] [{assoc_id:#06x}] [from-{mode}] [{pkt_id:#06x}] from {remote_addr}"); + let (session, local_addr) = match self.udp_sessions.read().await.get(&assoc_id) { + Some(v) => (v.incoming.clone(), v.local_addr.clone()), + None => { + tracing::error!("[udp] [{assoc_id:#06x}] [from-{mode}] [{pkt_id:#06x}] unable to find udp session"); + return; + }, + }; + let remote_addr = match remote_addr { + Address::None => unreachable!(), + Address::DomainAddress(domain, port) => ClashSocksAddr::Domain(domain, port), + Address::SocketAddress(socket) => ClashSocksAddr::Ip(socket), + }; + if let Err(err) = session.send(UdpPacket::new(data.into(), remote_addr, local_addr)).await { + tracing::error!("[udp] [{assoc_id:#06x}] [from-{mode}] [{pkt_id:#06x}] failed sending packet: {err}") + }; + }, + Ok(None) => {} + Err(err) => tracing::error!("[udp] [{assoc_id:#06x}] [from-{mode}] [{pkt_id:#06x}] packet receiving error: {err}"), + } } } diff --git a/clash_lib/src/proxy/tuic/mod.rs b/clash_lib/src/proxy/tuic/mod.rs index cd14efa1f..f0a5e341a 100644 --- a/clash_lib/src/proxy/tuic/mod.rs +++ b/clash_lib/src/proxy/tuic/mod.rs @@ -1,23 +1,29 @@ +mod compat; mod handle_stream; mod handle_task; pub(crate) mod types; -use std::{ - net::{Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}, - sync::Arc, - time::Duration, -}; - +use crate::proxy::tuic::types::SocketAdderTrans; use anyhow::Result; use axum::async_trait; use quinn::{EndpointConfig, TokioRuntime}; +use std::net::SocketAddr; +use std::{ + net::{Ipv4Addr, Ipv6Addr, UdpSocket}, + sync::{ + atomic::{AtomicU16, Ordering}, + Arc, + }, + time::Duration, +}; use tokio_util::compat::FuturesAsyncReadCompatExt; use uuid::Uuid; use crate::{ app::{ dispatcher::{ - BoxedChainedDatagram, BoxedChainedStream, ChainedStream, ChainedStreamWrapper, + BoxedChainedDatagram, BoxedChainedStream, ChainedDatagram, ChainedDatagramWrapper, + ChainedStream, ChainedStreamWrapper, }, dns::ThreadSafeDNSResolver, }, @@ -26,6 +32,7 @@ use crate::{ session::{Session, SocksAddr}, }; +use crate::session::SocksAddr as ClashSocksAddr; use quinn::ClientConfig as QuinnConfig; use quinn::Endpoint as QuinnEndpoint; use quinn::TransportConfig as QuinnTransportConfig; @@ -34,9 +41,12 @@ use tokio::sync::Mutex as AsyncMutex; use rustls::client::ClientConfig as TlsConfig; -use self::types::{CongestionControl, TuicConnection}; +use self::types::{CongestionControl, TuicConnection, UdpSession}; -use super::{AnyOutboundHandler, AnyStream, OutboundHandler, OutboundType}; +use super::{ + datagram::UdpPacket, AnyOutboundDatagram, AnyOutboundHandler, AnyStream, OutboundHandler, + OutboundType, +}; #[derive(Debug, Clone)] pub struct HandlerOptions { @@ -70,6 +80,7 @@ pub struct Handler { opts: HandlerOptions, ep: TuicEndpoint, conn: AsyncMutex>, + next_assoc_id: AtomicU16 } #[async_trait] @@ -123,6 +134,7 @@ impl OutboundHandler for Handler { } impl Handler { + #[allow(clippy::new_ret_no_self)] pub fn new(opts: HandlerOptions) -> Result { let mut crypto = TlsConfig::builder() .with_safe_default_cipher_suites() @@ -173,6 +185,7 @@ impl Handler { opts, ep: endpoint, conn: AsyncMutex::new(None), + next_assoc_id: AtomicU16::new(0) })) } async fn get_conn(&self) -> Result { @@ -202,22 +215,82 @@ impl Handler { _resolver: ThreadSafeDNSResolver, ) -> Result { let conn = self.get_conn().await?; - let dest = match sess.destination.clone() { - SocksAddr::Ip(addr) => tuic::Address::SocketAddress(addr), - SocksAddr::Domain(domain, port) => tuic::Address::DomainAddress(domain, port), - }; - let relay = conn.connect(dest).await?.compat(); + let dest = sess.destination.clone().into_tuic(); + let tuic_tcp = conn.connect_tcp(dest).await?.compat(); - let s = ChainedStreamWrapper::new(relay); + let s = ChainedStreamWrapper::new(tuic_tcp); s.append_to_chain(self.name()).await; Ok(Box::new(s)) } async fn do_connect_datagram( &self, - _sess: &Session, + sess: &Session, _resolver: ThreadSafeDNSResolver, - ) -> std::io::Result { - todo!() + ) -> Result { + let conn = self.get_conn().await?; + + let assos_id = self.next_assoc_id.fetch_add(1, Ordering::Relaxed); + let quic_udp = TuicDatagramOutbound::new(assos_id, conn, sess.source.into()); + let s = ChainedDatagramWrapper::new(quic_udp); + s.append_to_chain(self.name()).await; + Ok(Box::new(s)) + } +} + +struct TuicDatagramOutbound { + assoc_id: u16, + // conn: TuicConnection, + // dest: tuic::Address, + handle: tokio::task::JoinHandle>, + send_tx: tokio_util::sync::PollSender, + recv_rx: tokio::sync::mpsc::Receiver, +} + +impl TuicDatagramOutbound { + pub fn new( + assoc_id: u16, + conn: TuicConnection, + local_addr: ClashSocksAddr, + ) -> AnyOutboundDatagram { + // TODO not sure about the size of buffer + let (send_tx, send_rx) = tokio::sync::mpsc::channel::(32); + let (recv_tx, recv_rx) = tokio::sync::mpsc::channel::(32); + let udp_sessions = conn.udp_sessions.clone(); + let handle = tokio::spawn(async move { + // capture vars + let (mut send_rx, recv_tx) = (send_rx, recv_tx); + udp_sessions.write().await.insert( + assoc_id, + UdpSession { + incoming: recv_tx, + local_addr, + }, + ); + loop { + if let Some(next_send) = send_rx.recv().await { + conn.outgoing_udp( + next_send.data.into(), + next_send.dst_addr.into_tuic(), + assoc_id, + ) + .await?; + } else { + break; + }; + } + tracing::info!("[close] [udp] no more outgoing udp packet from [{assoc_id:#06x}]"); + udp_sessions.write().await.remove(&assoc_id); + anyhow::Ok(()) + }); + let s = Self { + assoc_id, + // conn, + // dest, + handle, + send_tx: tokio_util::sync::PollSender::new(send_tx), + recv_rx, + }; + Box::new(s) } } diff --git a/clash_lib/src/proxy/tuic/types.rs b/clash_lib/src/proxy/tuic/types.rs index e5e4e1c22..70f83fd20 100644 --- a/clash_lib/src/proxy/tuic/types.rs +++ b/clash_lib/src/proxy/tuic/types.rs @@ -87,11 +87,17 @@ pub struct TuicConnection { pub inner: InnerConnection, pub uuid: Uuid, pub password: Arc<[u8]>, - pub udp_relay_mode: UdpRelayMode, pub remote_uni_stream_cnt: Counter, pub remote_bi_stream_cnt: Counter, pub max_concurrent_uni_streams: Arc, pub max_concurrent_bi_streams: Arc, + pub udp_relay_mode: UdpRelayMode, + pub udp_sessions: Arc>>, +} + +pub struct UdpSession { + pub incoming: tokio::sync::mpsc::Sender, + pub local_addr: ClashSocksAddr, } impl TuicConnection { @@ -119,6 +125,7 @@ impl TuicConnection { // TODO: seems tuic dynamicly adjust the size of max concurrent streams, is it necessary to configure the stream size? max_concurrent_uni_streams: Arc::new(AtomicU32::new(32)), max_concurrent_bi_streams: Arc::new(AtomicU32::new(32)), + udp_sessions: Arc::new(AsyncRwLock::new(HashMap::new())), }; tokio::spawn( From c19dd067d3ff32dc87c6fe12960c943539907ebf Mon Sep 17 00:00:00 2001 From: iHsin Date: Fri, 22 Mar 2024 22:43:07 +0800 Subject: [PATCH 09/17] chore(tuic): rename symbols --- clash_lib/src/proxy/tuic/handle_stream.rs | 4 +- clash_lib/src/proxy/tuic/handle_task.rs | 50 +++++++++++++---------- clash_lib/src/proxy/tuic/types.rs | 25 ++++++++++-- 3 files changed, 51 insertions(+), 28 deletions(-) diff --git a/clash_lib/src/proxy/tuic/handle_stream.rs b/clash_lib/src/proxy/tuic/handle_stream.rs index 51512b052..3d38ee58b 100644 --- a/clash_lib/src/proxy/tuic/handle_stream.rs +++ b/clash_lib/src/proxy/tuic/handle_stream.rs @@ -53,7 +53,7 @@ impl TuicConnection { Err(err) => Err(anyhow!(err)), Ok(Task::Packet(pkt)) => match self.udp_relay_mode { UdpRelayMode::Quic => { - Self::handle_packet(pkt).await; + self.incoming_udp(pkt).await; Ok(()) } UdpRelayMode::Native => Err(anyhow!("wrong packet source")), @@ -86,7 +86,7 @@ impl TuicConnection { Err(err) => Err(anyhow!(err)), Ok(Task::Packet(pkt)) => match self.udp_relay_mode { UdpRelayMode::Native => { - Self::handle_packet(pkt).await; + self.incoming_udp(pkt).await; Ok(()) } UdpRelayMode::Quic => Err(anyhow!("wrong packet source")), diff --git a/clash_lib/src/proxy/tuic/handle_task.rs b/clash_lib/src/proxy/tuic/handle_task.rs index f3b68b913..78297f92a 100644 --- a/clash_lib/src/proxy/tuic/handle_task.rs +++ b/clash_lib/src/proxy/tuic/handle_task.rs @@ -7,67 +7,73 @@ use tokio::time; use tuic::Address; use tuic_quinn::{Connect, Packet}; +use crate::session::SocksAddr as ClashSocksAddr; +use crate::proxy::datagram::UdpPacket; + use super::types::{TuicConnection, UdpRelayMode}; impl TuicConnection { - pub async fn authenticate(self, zero_rtt_accepted: Option) { + pub async fn tuic_auth(self, zero_rtt_accepted: Option) { if let Some(zero_rtt_accepted) = zero_rtt_accepted { - tracing::debug!( - "[authenticate] waiting for connection to be fully established" - ); + tracing::debug!("[auth] waiting for connection to be fully established"); zero_rtt_accepted.await; } - tracing::debug!("[authenticate] sending authentication"); + tracing::debug!("[auth] sending authentication"); match self .inner .authenticate(self.uuid, self.password.clone()) .await { - Ok(()) => tracing::info!("[authenticate] {uuid}", uuid = self.uuid), + Ok(()) => tracing::info!("[auth] {uuid}", uuid = self.uuid), Err(err) => { - tracing::warn!("[authenticate] authentication sending error: {err}") + tracing::warn!("[auth] authentication sending error: {err}") } } } - pub async fn connect(&self, addr: Address) -> Result { + pub async fn connect_tcp(&self, addr: Address) -> Result { let addr_display = addr.to_string(); - tracing::info!("[connect] {addr_display}"); + tracing::info!("[tcp] {addr_display}"); match self.inner.connect(addr).await { Ok(conn) => Ok(conn), Err(err) => { - tracing::warn!( - "[connect] failed initializing relay to {addr_display}: {err}" - ); + tracing::warn!("[tcp] failed initializing relay to {addr_display}: {err}"); Err(anyhow!(err)) } } } - pub async fn packet(&self, pkt: Bytes, addr: Address, assoc_id: u16) -> anyhow::Result<()> { + pub async fn outgoing_udp( + &self, + pkt: Bytes, + addr: Address, + assoc_id: u16, + ) -> anyhow::Result<()> { let addr_display = addr.to_string(); match self.udp_relay_mode { UdpRelayMode::Native => { - tracing::info!("[packet] [{assoc_id:#06x}] [to-native] to {addr_display}"); + tracing::info!("[udp] [{assoc_id:#06x}] [to-native] to {addr_display}"); match self.inner.packet_native(pkt, addr, assoc_id) { Ok(()) => Ok(()), Err(err) => { - tracing::warn!("[packet] [{assoc_id:#06x}] [to-native] to {addr_display}: {err}"); + tracing::warn!( + "[udp] [{assoc_id:#06x}] [to-native] to {addr_display}: {err}" + ); Err(anyhow!(err)) } } } UdpRelayMode::Quic => { - tracing::info!("[packet] [{assoc_id:#06x}] [to-quic] {addr_display}"); + tracing::info!("[udp] [{assoc_id:#06x}] [to-quic] {addr_display}"); match self.inner.packet_quic(pkt, addr, assoc_id).await { Ok(()) => Ok(()), Err(err) => { tracing::warn!( - "[packet] [{assoc_id:#06x}] [to-quic] to {addr_display}: {err}" + "[udp] [{assoc_id:#06x}] [to-quic] to {addr_display}: {err}" ); Err(anyhow!(err)) } @@ -77,11 +83,11 @@ impl TuicConnection { } pub async fn dissociate(&self, assoc_id: u16) -> anyhow::Result<()> { - tracing::info!("[dissociate] [{assoc_id:#06x}]"); + tracing::info!("[udp] [dissociate] [{assoc_id:#06x}]"); match self.inner.dissociate(assoc_id).await { Ok(()) => Ok(()), Err(err) => { - tracing::warn!("[dissociate] [{assoc_id:#06x}] {err}"); + tracing::warn!("[udp] [dissociate] [{assoc_id:#06x}] {err}"); Err(anyhow!(err)) } } @@ -100,13 +106,13 @@ impl TuicConnection { } match self.inner.heartbeat().await { - Ok(()) => tracing::debug!("[heartbeat]"), - Err(err) => tracing::warn!("[heartbeat] {err}"), + Ok(()) => tracing::trace!("[heartbeat]"), + Err(err) => tracing::error!("[heartbeat] {err}"), } } } - pub async fn handle_packet(pkt: Packet) { + pub async fn incoming_udp(&self, pkt: Packet) { let assoc_id = pkt.assoc_id(); let pkt_id = pkt.pkt_id(); diff --git a/clash_lib/src/proxy/tuic/types.rs b/clash_lib/src/proxy/tuic/types.rs index 70f83fd20..d70e183cd 100644 --- a/clash_lib/src/proxy/tuic/types.rs +++ b/clash_lib/src/proxy/tuic/types.rs @@ -1,16 +1,21 @@ +use crate::session::SocksAddr as ClashSocksAddr; use anyhow::Result; use quinn::Connection as QuinnConnection; use quinn::{Endpoint as QuinnEndpoint, ZeroRttAccepted}; use register_count::Counter; +use std::collections::HashMap; use std::{ net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}, str::FromStr, sync::{atomic::AtomicU32, Arc}, time::Duration, }; +use tokio::sync::RwLock as AsyncRwLock; use tuic_quinn::Connection as InnerConnection; use uuid::Uuid; +use crate::proxy::datagram::UdpPacket; + pub struct TuicEndpoint { pub ep: QuinnEndpoint, pub server: ServerAddr, @@ -144,7 +149,8 @@ impl TuicConnection { ) { tracing::info!("connection established"); - tokio::spawn(self.clone().authenticate(zero_rtt_accepted)); + // TODO reduct spawn + tokio::spawn(self.clone().tuic_auth(zero_rtt_accepted)); tokio::spawn(self.clone().heartbeat(heartbeat)); tokio::spawn(self.clone().collect_garbage(gc_interval, gc_lifetime)); @@ -172,12 +178,10 @@ impl TuicConnection { let mut interval = tokio::time::interval(gc_interval); loop { interval.tick().await; - if self.is_closed() { break; } - - tracing::debug!("[gc] packet fragment garbage collecting event"); + tracing::trace!("[gc]"); self.inner.collect_garbage(gc_lifetime); } } @@ -255,3 +259,16 @@ impl Default for CongestionControl { Self::Cubic } } + +pub trait SocketAdderTrans { + fn into_tuic(self) -> tuic::Address; +} +impl SocketAdderTrans for crate::session::SocksAddr { + fn into_tuic(self) -> tuic::Address { + use crate::session::SocksAddr; + match self { + SocksAddr::Ip(addr) => tuic::Address::SocketAddress(addr), + SocksAddr::Domain(domain, port) => tuic::Address::DomainAddress(domain, port), + } + } +} From 3d506114757ef84611fdbff4a78e96102810bc3f Mon Sep 17 00:00:00 2001 From: iHsin Date: Sat, 23 Mar 2024 00:25:14 +0800 Subject: [PATCH 10/17] build: change tuic into git deps --- Cargo.lock | 2 ++ clash_lib/Cargo.toml | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1ebd1fcde..65a6991db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6578,6 +6578,7 @@ checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" [[package]] name = "tuic" version = "5.0.0" +source = "git+https://github.com/Itsusinn/tuic.git?rev=160cf9b#160cf9bfb46e434cbab59379c762e7454bfdc14f" dependencies = [ "bytes", "futures-util", @@ -6590,6 +6591,7 @@ dependencies = [ [[package]] name = "tuic-quinn" version = "0.1.0" +source = "git+https://github.com/Itsusinn/tuic.git?rev=160cf9b#160cf9bfb46e434cbab59379c762e7454bfdc14f" dependencies = [ "bytes", "futures-util", diff --git a/clash_lib/Cargo.toml b/clash_lib/Cargo.toml index 3ad93e7dc..c10f0bc19 100644 --- a/clash_lib/Cargo.toml +++ b/clash_lib/Cargo.toml @@ -109,8 +109,8 @@ arti-client = { version = "0.14.0", default-features = false, features = ["tokio tor-rtcompat = { version = "0.10.0" } # tuic -tuic = { path = "/home/itsusinn/Workspace/Code/tuic/tuic" } -tuic-quinn = { path = "/home/itsusinn/Workspace/Code/tuic/tuic-quinn" } +tuic = { rev = "160cf9b", git = "https://github.com/Itsusinn/tuic.git" } +tuic-quinn = { rev = "160cf9b", git = "https://github.com/Itsusinn/tuic.git" } quinn = { version = "0.10", default-features = false, features = ["futures-io", "runtime-tokio", "tls-rustls"] } register-count = "0.1.0" From da9f5c018c1c45a4a57e12fbeaefbd27c9d8f372 Mon Sep 17 00:00:00 2001 From: iHsin Date: Sun, 24 Mar 2024 15:21:03 +0800 Subject: [PATCH 11/17] style: cargo fmt --- clash_lib/src/proxy/tuic/handle_task.rs | 2 +- clash_lib/src/proxy/tuic/mod.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/clash_lib/src/proxy/tuic/handle_task.rs b/clash_lib/src/proxy/tuic/handle_task.rs index 78297f92a..41ec4b91a 100644 --- a/clash_lib/src/proxy/tuic/handle_task.rs +++ b/clash_lib/src/proxy/tuic/handle_task.rs @@ -7,8 +7,8 @@ use tokio::time; use tuic::Address; use tuic_quinn::{Connect, Packet}; -use crate::session::SocksAddr as ClashSocksAddr; use crate::proxy::datagram::UdpPacket; +use crate::session::SocksAddr as ClashSocksAddr; use super::types::{TuicConnection, UdpRelayMode}; diff --git a/clash_lib/src/proxy/tuic/mod.rs b/clash_lib/src/proxy/tuic/mod.rs index f0a5e341a..489d8c820 100644 --- a/clash_lib/src/proxy/tuic/mod.rs +++ b/clash_lib/src/proxy/tuic/mod.rs @@ -80,7 +80,7 @@ pub struct Handler { opts: HandlerOptions, ep: TuicEndpoint, conn: AsyncMutex>, - next_assoc_id: AtomicU16 + next_assoc_id: AtomicU16, } #[async_trait] @@ -185,7 +185,7 @@ impl Handler { opts, ep: endpoint, conn: AsyncMutex::new(None), - next_assoc_id: AtomicU16::new(0) + next_assoc_id: AtomicU16::new(0), })) } async fn get_conn(&self) -> Result { From 270ae0f99a1ef9db86f8596da66bfd749fd2acae Mon Sep 17 00:00:00 2001 From: iHsin Date: Sun, 24 Mar 2024 16:35:20 +0800 Subject: [PATCH 12/17] build: update tuic --- Cargo.lock | 4 ++-- clash_lib/Cargo.toml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 65a6991db..b0708060e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6578,7 +6578,7 @@ checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" [[package]] name = "tuic" version = "5.0.0" -source = "git+https://github.com/Itsusinn/tuic.git?rev=160cf9b#160cf9bfb46e434cbab59379c762e7454bfdc14f" +source = "git+https://github.com/Itsusinn/tuic.git?rev=82fab62#82fab626d6344f69f26ce4af7c87916f640ee3d3" dependencies = [ "bytes", "futures-util", @@ -6591,7 +6591,7 @@ dependencies = [ [[package]] name = "tuic-quinn" version = "0.1.0" -source = "git+https://github.com/Itsusinn/tuic.git?rev=160cf9b#160cf9bfb46e434cbab59379c762e7454bfdc14f" +source = "git+https://github.com/Itsusinn/tuic.git?rev=82fab62#82fab626d6344f69f26ce4af7c87916f640ee3d3" dependencies = [ "bytes", "futures-util", diff --git a/clash_lib/Cargo.toml b/clash_lib/Cargo.toml index c10f0bc19..b4ce820ac 100644 --- a/clash_lib/Cargo.toml +++ b/clash_lib/Cargo.toml @@ -109,8 +109,8 @@ arti-client = { version = "0.14.0", default-features = false, features = ["tokio tor-rtcompat = { version = "0.10.0" } # tuic -tuic = { rev = "160cf9b", git = "https://github.com/Itsusinn/tuic.git" } -tuic-quinn = { rev = "160cf9b", git = "https://github.com/Itsusinn/tuic.git" } +tuic = { rev = "82fab62", git = "https://github.com/Itsusinn/tuic.git" } +tuic-quinn = { rev = "82fab62", git = "https://github.com/Itsusinn/tuic.git" } quinn = { version = "0.10", default-features = false, features = ["futures-io", "runtime-tokio", "tls-rustls"] } register-count = "0.1.0" From 18677c07d0808b9afe8e75a217c52540d516533d Mon Sep 17 00:00:00 2001 From: iHsin Date: Sun, 24 Mar 2024 17:12:33 +0800 Subject: [PATCH 13/17] refactor: reduce spawn task --- clash_lib/src/proxy/tuic/handle_task.rs | 88 ++++++++++++++++--------- clash_lib/src/proxy/tuic/mod.rs | 8 +-- clash_lib/src/proxy/tuic/types.rs | 27 +++----- 3 files changed, 68 insertions(+), 55 deletions(-) diff --git a/clash_lib/src/proxy/tuic/handle_task.rs b/clash_lib/src/proxy/tuic/handle_task.rs index 41ec4b91a..adb590997 100644 --- a/clash_lib/src/proxy/tuic/handle_task.rs +++ b/clash_lib/src/proxy/tuic/handle_task.rs @@ -1,9 +1,9 @@ +use std::time::Duration; + use bytes::Bytes; use quinn::ZeroRttAccepted; use anyhow::Result; -use std::time::Duration; -use tokio::time; use tuic::Address; use tuic_quinn::{Connect, Packet}; @@ -82,36 +82,6 @@ impl TuicConnection { } } - pub async fn dissociate(&self, assoc_id: u16) -> anyhow::Result<()> { - tracing::info!("[udp] [dissociate] [{assoc_id:#06x}]"); - match self.inner.dissociate(assoc_id).await { - Ok(()) => Ok(()), - Err(err) => { - tracing::warn!("[udp] [dissociate] [{assoc_id:#06x}] {err}"); - Err(anyhow!(err)) - } - } - } - - pub async fn heartbeat(self, heartbeat: Duration) { - loop { - time::sleep(heartbeat).await; - - if self.is_closed() { - break; - } - - if self.inner.task_connect_count() + self.inner.task_associate_count() == 0 { - continue; - } - - match self.inner.heartbeat().await { - Ok(()) => tracing::trace!("[heartbeat]"), - Err(err) => tracing::error!("[heartbeat] {err}"), - } - } - } - pub async fn incoming_udp(&self, pkt: Packet) { let assoc_id = pkt.assoc_id(); let pkt_id = pkt.pkt_id(); @@ -152,4 +122,58 @@ impl TuicConnection { Err(err) => tracing::error!("[udp] [{assoc_id:#06x}] [from-{mode}] [{pkt_id:#06x}] packet receiving error: {err}"), } } + + pub async fn dissociate(&self, assoc_id: u16) -> Result<()> { + tracing::info!("[udp] [dissociate] [{assoc_id:#06x}]"); + match self.inner.dissociate(assoc_id).await { + Ok(()) => Ok(()), + Err(err) => { + tracing::warn!("[udp] [dissociate] [{assoc_id:#06x}] {err}"); + Err(err)? + } + } + } + + async fn heartbeat(&self) -> Result<()> { + self.check_open()?; + if self.inner.task_connect_count() + self.inner.task_associate_count() == 0 { + return Ok(()); + } + + match self.inner.heartbeat().await { + Ok(()) => tracing::trace!("[heartbeat]"), + Err(err) => tracing::error!("[heartbeat] {err}"), + } + Ok(()) + } + + fn collect_garbage(&self, gc_lifetime: Duration) -> Result<()> { + self.check_open()?; + tracing::trace!("[gc]"); + self.inner.collect_garbage(gc_lifetime); + Ok(()) + } + /// Tasks triggered by timer + /// Won't return unless occurs error + pub async fn cyclical_tasks( + self, + heartbeat_interval: Duration, + gc_interval: Duration, + gc_lifetime: Duration, + ) -> anyhow::Error { + let mut heartbeat_interval = tokio::time::interval(heartbeat_interval); + let mut gc_interval = tokio::time::interval(gc_interval); + loop { + tokio::select! { + _ = heartbeat_interval.tick() => match self.heartbeat().await { + Ok(_) => { }, + Err(err) => break err, + }, + _ = gc_interval.tick() => match self.collect_garbage(gc_lifetime) { + Ok(_) => { }, + Err(err) => break err, + }, + } + } + } } diff --git a/clash_lib/src/proxy/tuic/mod.rs b/clash_lib/src/proxy/tuic/mod.rs index 489d8c820..7389b2adb 100644 --- a/clash_lib/src/proxy/tuic/mod.rs +++ b/clash_lib/src/proxy/tuic/mod.rs @@ -143,7 +143,7 @@ impl Handler { .unwrap() .with_root_certificates(GLOBAL_ROOT_STORE.clone()) .with_no_client_auth(); - // aborted by peer: the cryptographic handshake failed: error 120: peer doesn't support any known protocol + // TODO(error-handling) if alpn not match the following error will be throw: aborted by peer: the cryptographic handshake failed: error 120: peer doesn't support any known protocol crypto.alpn_protocols = opts.alpn.clone(); crypto.enable_early_data = true; crypto.enable_sni = !opts.disable_sni; @@ -196,7 +196,7 @@ impl Handler { *guard = Some(self.ep.connect().await?); } let conn = guard.take().unwrap(); - let conn = if conn.is_closed() { + let conn = if conn.check_open().is_err() { // reconnect self.ep.connect().await? } else { @@ -240,8 +240,6 @@ impl Handler { struct TuicDatagramOutbound { assoc_id: u16, - // conn: TuicConnection, - // dest: tuic::Address, handle: tokio::task::JoinHandle>, send_tx: tokio_util::sync::PollSender, recv_rx: tokio::sync::mpsc::Receiver, @@ -285,8 +283,6 @@ impl TuicDatagramOutbound { }); let s = Self { assoc_id, - // conn, - // dest, handle, send_tx: tokio_util::sync::PollSender::new(send_tx), recv_rx, diff --git a/clash_lib/src/proxy/tuic/types.rs b/clash_lib/src/proxy/tuic/types.rs index d70e183cd..d8052771f 100644 --- a/clash_lib/src/proxy/tuic/types.rs +++ b/clash_lib/src/proxy/tuic/types.rs @@ -106,8 +106,11 @@ pub struct UdpSession { } impl TuicConnection { - pub fn is_closed(&self) -> bool { - self.conn.close_reason().is_some() + pub fn check_open(&self) -> Result<()> { + match self.conn.close_reason() { + Some(err) => Err(err)?, + None => Ok(()), + } } fn new( conn: QuinnConnection, @@ -149,10 +152,12 @@ impl TuicConnection { ) { tracing::info!("connection established"); - // TODO reduct spawn + // TODO check the cancellation safety of tuic_auth tokio::spawn(self.clone().tuic_auth(zero_rtt_accepted)); - tokio::spawn(self.clone().heartbeat(heartbeat)); - tokio::spawn(self.clone().collect_garbage(gc_interval, gc_lifetime)); + tokio::spawn( + self.clone() + .cyclical_tasks(heartbeat, gc_interval, gc_lifetime), + ); let err = loop { tokio::select! { @@ -173,18 +178,6 @@ impl TuicConnection { tracing::warn!("connection error: {err}"); } - - async fn collect_garbage(self, gc_interval: Duration, gc_lifetime: Duration) { - let mut interval = tokio::time::interval(gc_interval); - loop { - interval.tick().await; - if self.is_closed() { - break; - } - tracing::trace!("[gc]"); - self.inner.collect_garbage(gc_lifetime); - } - } } pub struct ServerAddr { From 264e9691209e521c494c557be95127c9b4ab4b15 Mon Sep 17 00:00:00 2001 From: iHsin Date: Sun, 24 Mar 2024 17:53:22 +0800 Subject: [PATCH 14/17] refactor: use Arc --- clash_lib/src/proxy/tuic/handle_stream.rs | 26 ++++++++++++++--------- clash_lib/src/proxy/tuic/handle_task.rs | 6 +++--- clash_lib/src/proxy/tuic/mod.rs | 7 +++--- clash_lib/src/proxy/tuic/types.rs | 8 +++---- 4 files changed, 26 insertions(+), 21 deletions(-) diff --git a/clash_lib/src/proxy/tuic/handle_stream.rs b/clash_lib/src/proxy/tuic/handle_stream.rs index 3d38ee58b..6da1469dd 100644 --- a/clash_lib/src/proxy/tuic/handle_stream.rs +++ b/clash_lib/src/proxy/tuic/handle_stream.rs @@ -1,4 +1,5 @@ use std::sync::atomic::Ordering; +use std::sync::Arc; use bytes::Bytes; use quinn::{RecvStream, SendStream, VarInt}; @@ -46,7 +47,7 @@ impl TuicConnection { Ok(self.conn.read_datagram().await?) } - pub async fn handle_uni_stream(self, recv: RecvStream, _reg: Register) { + pub async fn handle_uni_stream(self: Arc, recv: RecvStream, _reg: Register) { tracing::debug!("[relay] incoming unidirectional stream"); let res = match self.inner.accept_uni_stream(recv).await { @@ -66,20 +67,23 @@ impl TuicConnection { } } - pub async fn handle_bi_stream(self, send: SendStream, recv: RecvStream, _reg: Register) { + pub async fn handle_bi_stream( + self: Arc, + send: SendStream, + recv: RecvStream, + _reg: Register, + ) { tracing::debug!("[relay] incoming bidirectional stream"); - let res = match self.inner.accept_bi_stream(send, recv).await { - Err(err) => Err::<(), _>(anyhow!(err)), - _ => unreachable!(), // already filtered in `tuic_quinn` + let err = match self.inner.accept_bi_stream(send, recv).await { + Err(err) => anyhow!(err), + _ => anyhow!("A client shouldn't receive bi stream"), }; - if let Err(err) = res { - tracing::warn!("[relay] incoming bidirectional stream error: {err}"); - } + tracing::warn!("[relay] incoming bidirectional stream error: {err}"); } - pub async fn handle_datagram(self, dg: Bytes) { + pub async fn handle_datagram(self: Arc, dg: Bytes) { tracing::debug!("[relay] incoming datagram"); let res = match self.inner.accept_datagram(dg) { @@ -91,7 +95,9 @@ impl TuicConnection { } UdpRelayMode::Quic => Err(anyhow!("wrong packet source")), }, - _ => unreachable!(), // already filtered in `tuic_quinn` + _ => Err(anyhow!( + "Datagram shouldn't receive any data expect UDP packet" + )), }; if let Err(err) = res { diff --git a/clash_lib/src/proxy/tuic/handle_task.rs b/clash_lib/src/proxy/tuic/handle_task.rs index adb590997..340dc027f 100644 --- a/clash_lib/src/proxy/tuic/handle_task.rs +++ b/clash_lib/src/proxy/tuic/handle_task.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use bytes::Bytes; use quinn::ZeroRttAccepted; @@ -13,7 +13,7 @@ use crate::session::SocksAddr as ClashSocksAddr; use super::types::{TuicConnection, UdpRelayMode}; impl TuicConnection { - pub async fn tuic_auth(self, zero_rtt_accepted: Option) { + pub async fn tuic_auth(self: Arc, zero_rtt_accepted: Option) { if let Some(zero_rtt_accepted) = zero_rtt_accepted { tracing::debug!("[auth] waiting for connection to be fully established"); zero_rtt_accepted.await; @@ -156,7 +156,7 @@ impl TuicConnection { /// Tasks triggered by timer /// Won't return unless occurs error pub async fn cyclical_tasks( - self, + self: Arc, heartbeat_interval: Duration, gc_interval: Duration, gc_lifetime: Duration, diff --git a/clash_lib/src/proxy/tuic/mod.rs b/clash_lib/src/proxy/tuic/mod.rs index 7389b2adb..03b8f4531 100644 --- a/clash_lib/src/proxy/tuic/mod.rs +++ b/clash_lib/src/proxy/tuic/mod.rs @@ -79,7 +79,7 @@ pub struct HandlerOptions { pub struct Handler { opts: HandlerOptions, ep: TuicEndpoint, - conn: AsyncMutex>, + conn: AsyncMutex>>, next_assoc_id: AtomicU16, } @@ -188,7 +188,7 @@ impl Handler { next_assoc_id: AtomicU16::new(0), })) } - async fn get_conn(&self) -> Result { + async fn get_conn(&self) -> Result> { let fut = async { let mut guard = self.conn.lock().await; if guard.is_none() { @@ -202,7 +202,6 @@ impl Handler { } else { conn }; - // TODO TuicConnection is huge, is it necessary to clone it? If it is, should we use Arc ? *guard = Some(conn.clone()); Ok(conn) }; @@ -248,7 +247,7 @@ struct TuicDatagramOutbound { impl TuicDatagramOutbound { pub fn new( assoc_id: u16, - conn: TuicConnection, + conn: Arc, local_addr: ClashSocksAddr, ) -> AnyOutboundDatagram { // TODO not sure about the size of buffer diff --git a/clash_lib/src/proxy/tuic/types.rs b/clash_lib/src/proxy/tuic/types.rs index d8052771f..5c73b6841 100644 --- a/clash_lib/src/proxy/tuic/types.rs +++ b/clash_lib/src/proxy/tuic/types.rs @@ -28,7 +28,7 @@ pub struct TuicEndpoint { pub gc_lifetime: Duration, } impl TuicEndpoint { - pub async fn connect(&self) -> Result { + pub async fn connect(&self) -> Result> { let mut last_err = None; for addr in self.server.resolve().await? { @@ -121,7 +121,7 @@ impl TuicConnection { heartbeat: Duration, gc_interval: Duration, gc_lifetime: Duration, - ) -> Self { + ) -> Arc { let conn = Self { conn: conn.clone(), inner: InnerConnection::::new(conn), @@ -135,7 +135,7 @@ impl TuicConnection { max_concurrent_bi_streams: Arc::new(AtomicU32::new(32)), udp_sessions: Arc::new(AsyncRwLock::new(HashMap::new())), }; - + let conn = Arc::new(conn); tokio::spawn( conn.clone() .init(zero_rtt_accepted, heartbeat, gc_interval, gc_lifetime), @@ -144,7 +144,7 @@ impl TuicConnection { conn } async fn init( - self, + self: Arc, zero_rtt_accepted: Option, heartbeat: Duration, gc_interval: Duration, From a59836c0c0f097e7322e74c17f957172abc77494 Mon Sep 17 00:00:00 2001 From: iHsin Date: Sun, 24 Mar 2024 18:35:41 +0800 Subject: [PATCH 15/17] style: make clippy happy --- clash_lib/src/proxy/tuic/mod.rs | 19 ++++++++----------- clash_lib/src/proxy/tuic/types.rs | 1 + 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/clash_lib/src/proxy/tuic/mod.rs b/clash_lib/src/proxy/tuic/mod.rs index 03b8f4531..83e04b5b7 100644 --- a/clash_lib/src/proxy/tuic/mod.rs +++ b/clash_lib/src/proxy/tuic/mod.rs @@ -245,6 +245,7 @@ struct TuicDatagramOutbound { } impl TuicDatagramOutbound { + #[allow(clippy::new_ret_no_self)] pub fn new( assoc_id: u16, conn: Arc, @@ -264,17 +265,13 @@ impl TuicDatagramOutbound { local_addr, }, ); - loop { - if let Some(next_send) = send_rx.recv().await { - conn.outgoing_udp( - next_send.data.into(), - next_send.dst_addr.into_tuic(), - assoc_id, - ) - .await?; - } else { - break; - }; + while let Some(next_send) = send_rx.recv().await { + conn.outgoing_udp( + next_send.data.into(), + next_send.dst_addr.into_tuic(), + assoc_id, + ) + .await?; } tracing::info!("[close] [udp] no more outgoing udp packet from [{assoc_id:#06x}]"); udp_sessions.write().await.remove(&assoc_id); diff --git a/clash_lib/src/proxy/tuic/types.rs b/clash_lib/src/proxy/tuic/types.rs index 5c73b6841..84d8a5692 100644 --- a/clash_lib/src/proxy/tuic/types.rs +++ b/clash_lib/src/proxy/tuic/types.rs @@ -112,6 +112,7 @@ impl TuicConnection { None => Ok(()), } } + #[allow(clippy::too_many_arguments)] fn new( conn: QuinnConnection, zero_rtt_accepted: Option, From 54e03d40ec44a3f514b14b9068953409c4795daa Mon Sep 17 00:00:00 2001 From: iHsin Date: Sun, 24 Mar 2024 18:52:22 +0800 Subject: [PATCH 16/17] fix: handle UDP dissociate --- clash_lib/src/proxy/tuic/mod.rs | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/clash_lib/src/proxy/tuic/mod.rs b/clash_lib/src/proxy/tuic/mod.rs index 83e04b5b7..ed2699563 100644 --- a/clash_lib/src/proxy/tuic/mod.rs +++ b/clash_lib/src/proxy/tuic/mod.rs @@ -237,9 +237,8 @@ impl Handler { } } +#[derive(Debug)] struct TuicDatagramOutbound { - assoc_id: u16, - handle: tokio::task::JoinHandle>, send_tx: tokio_util::sync::PollSender, recv_rx: tokio::sync::mpsc::Receiver, } @@ -255,7 +254,7 @@ impl TuicDatagramOutbound { let (send_tx, send_rx) = tokio::sync::mpsc::channel::(32); let (recv_tx, recv_rx) = tokio::sync::mpsc::channel::(32); let udp_sessions = conn.udp_sessions.clone(); - let handle = tokio::spawn(async move { + tokio::spawn(async move { // capture vars let (mut send_rx, recv_tx) = (send_rx, recv_tx); udp_sessions.write().await.insert( @@ -266,20 +265,24 @@ impl TuicDatagramOutbound { }, ); while let Some(next_send) = send_rx.recv().await { - conn.outgoing_udp( - next_send.data.into(), - next_send.dst_addr.into_tuic(), - assoc_id, - ) - .await?; + let res = conn + .outgoing_udp( + next_send.data.into(), + next_send.dst_addr.into_tuic(), + assoc_id, + ) + .await; + if res.is_err() { + break; + } } - tracing::info!("[close] [udp] no more outgoing udp packet from [{assoc_id:#06x}]"); + // TuicDatagramOutbound dropped or outgoing_udp occurs error + tracing::info!("[udp] [dissociate] closing UDP session [{assoc_id:#06x}]"); + _ = conn.dissociate(assoc_id).await; udp_sessions.write().await.remove(&assoc_id); anyhow::Ok(()) }); let s = Self { - assoc_id, - handle, send_tx: tokio_util::sync::PollSender::new(send_tx), recv_rx, }; From 7a777c5d98310a66a3099ef1ce0be729c597020e Mon Sep 17 00:00:00 2001 From: iHsin Date: Sun, 24 Mar 2024 19:11:05 +0800 Subject: [PATCH 17/17] refactor: remove redundant fast open and fix typo --- clash_lib/src/config/internal/proxy.rs | 2 +- clash_lib/src/proxy/converters/tuic.rs | 3 +-- clash_lib/src/proxy/tuic/mod.rs | 5 ++--- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/clash_lib/src/config/internal/proxy.rs b/clash_lib/src/config/internal/proxy.rs index 233b9a37c..918e48cff 100644 --- a/clash_lib/src/config/internal/proxy.rs +++ b/clash_lib/src/config/internal/proxy.rs @@ -236,7 +236,7 @@ pub struct OutboundTuic { /// h3 pub alpn: Option>, pub disable_sni: Option, - pub reduct_rtt: Option, + pub reduce_rtt: Option, /// millis pub request_timeout: Option, pub udp_relay_mode: Option, diff --git a/clash_lib/src/proxy/converters/tuic.rs b/clash_lib/src/proxy/converters/tuic.rs index 74036404f..5dd43c546 100644 --- a/clash_lib/src/proxy/converters/tuic.rs +++ b/clash_lib/src/proxy/converters/tuic.rs @@ -36,7 +36,7 @@ impl TryFrom<&OutboundTuic> for AnyOutboundHandler { .map(|v| v.into_iter().map(|alpn| alpn.into_bytes()).collect()) .unwrap_or_default(), heartbeat_interval: Duration::from_millis(s.heartbeat_interval.unwrap_or(3000)), - reduct_rtt: s.reduct_rtt.unwrap_or(false), + reduce_rtt: s.reduce_rtt.unwrap_or(false) || s.fast_open.unwrap_or(false), request_timeout: Duration::from_millis(s.request_timeout.unwrap_or(8000)), congestion_controller: s .congestion_controller @@ -47,7 +47,6 @@ impl TryFrom<&OutboundTuic> for AnyOutboundHandler { max_open_stream: VarInt::from_u64(s.max_open_stream.unwrap_or(32)) .unwrap_or(VarInt::MAX), ip: s.ip.clone(), - fast_open: s.fast_open, skip_cert_verify: s.skip_cert_verify.unwrap_or(false), sni: s.sni.clone(), gc_interval: Duration::from_millis(s.gc_interval.unwrap_or(3000)), diff --git a/clash_lib/src/proxy/tuic/mod.rs b/clash_lib/src/proxy/tuic/mod.rs index ed2699563..23327c859 100644 --- a/clash_lib/src/proxy/tuic/mod.rs +++ b/clash_lib/src/proxy/tuic/mod.rs @@ -59,7 +59,7 @@ pub struct HandlerOptions { pub disable_sni: bool, pub alpn: Vec>, pub heartbeat_interval: Duration, - pub reduct_rtt: bool, + pub reduce_rtt: bool, pub request_timeout: Duration, pub congestion_controller: CongestionControl, pub max_udp_relay_packet_size: u64, @@ -71,7 +71,6 @@ pub struct HandlerOptions { /// not used pub ip: Option, - pub fast_open: Option, pub skip_cert_verify: bool, pub sni: Option, } @@ -176,7 +175,7 @@ impl Handler { uuid: opts.uuid, password: Arc::from(opts.password.clone().into_bytes().into_boxed_slice()), udp_relay_mode: types::UdpRelayMode::Native, - zero_rtt_handshake: opts.reduct_rtt, + zero_rtt_handshake: opts.reduce_rtt, heartbeat: opts.heartbeat_interval, gc_interval: opts.gc_interval, gc_lifetime: opts.gc_lifetime,