From bfddc5214a4ebfe16775f7e81db0a5a0d9d805fe Mon Sep 17 00:00:00 2001 From: iHsin Date: Mon, 25 Mar 2024 17:00:12 +0800 Subject: [PATCH] Support TUIC (#332) * chore: ignore nix direnv files * build: add tuic related deps * feat: support parse tuic configuration * fix(logging): shutdown should be logged after receiving signal * feat: support TUIC TCP relay * feat(tuic): allow more configuration and remove hardcode vars * chore: clean logging and error handling * feat(tuic): support UDP relay * chore(tuic): rename symbols * build: change tuic into git deps * style: cargo fmt * build: update tuic * refactor: reduce spawn task * refactor: use Arc * style: make clippy happy * fix: handle UDP dissociate * refactor: remove redundant fast open and fix typo --- .gitignore | 5 + Cargo.lock | 84 +++++ clash_lib/Cargo.toml | 10 +- clash_lib/src/app/outbound/manager.rs | 4 +- .../proxy_provider/proxy_set_provider.rs | 1 + clash_lib/src/config/internal/proxy.rs | 38 +++ clash_lib/src/lib.rs | 2 +- clash_lib/src/proxy/converters/mod.rs | 1 + clash_lib/src/proxy/converters/tuic.rs | 59 ++++ clash_lib/src/proxy/mod.rs | 2 + clash_lib/src/proxy/tuic/compat.rs | 41 +++ clash_lib/src/proxy/tuic/handle_stream.rs | 107 +++++++ clash_lib/src/proxy/tuic/handle_task.rs | 179 +++++++++++ clash_lib/src/proxy/tuic/mod.rs | 290 ++++++++++++++++++ clash_lib/src/proxy/tuic/types.rs | 268 ++++++++++++++++ 15 files changed, 1087 insertions(+), 4 deletions(-) create mode 100644 clash_lib/src/proxy/converters/tuic.rs create mode 100644 clash_lib/src/proxy/tuic/compat.rs create mode 100644 clash_lib/src/proxy/tuic/handle_stream.rs create mode 100644 clash_lib/src/proxy/tuic/handle_task.rs create mode 100644 clash_lib/src/proxy/tuic/mod.rs create mode 100644 clash_lib/src/proxy/tuic/types.rs diff --git a/.gitignore b/.gitignore index d3fb7bab9..f995f49d0 100644 --- a/.gitignore +++ b/.gitignore @@ -28,8 +28,13 @@ venv/ # don't check in this real config ignore*.yaml +config.yaml cache.db Country.mmdb ruleset/ rust-project.json + +# for NixOS direnv +.envrc +shell.nix diff --git a/Cargo.lock b/Cargo.lock index ab73ffbd1..53aac37f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1083,8 +1083,10 @@ dependencies = [ "opentelemetry_sdk", "prost", "public-suffix", + "quinn", "rand", "regex", + "register-count", "rustls 0.21.10", "rustls-pemfile", "security-framework", @@ -1111,6 +1113,8 @@ dependencies = [ "tracing-oslog", "tracing-subscriber", "tracing-timing", + "tuic", + "tuic-quinn", "tun", "url", "uuid", @@ -4137,6 +4141,54 @@ version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" +[[package]] +name = "quinn" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cc2c5017e4b43d5995dcea317bc46c1e09404c0a9664d2908f7f02dfe943d75" +dependencies = [ + "bytes", + "futures-io", + "pin-project-lite", + "quinn-proto", + "quinn-udp", + "rustc-hash", + "rustls 0.21.10", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "quinn-proto" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "141bf7dfde2fbc246bfd3fe12f2455aa24b0fbd9af535d8c86c7bd1381ff2b1a" +dependencies = [ + "bytes", + "rand", + "ring 0.16.20", + "rustc-hash", + "rustls 0.21.10", + "slab", + "thiserror", + "tinyvec", + "tracing", +] + +[[package]] +name = "quinn-udp" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "055b4e778e8feb9f93c4e439f71dc2156ef13360b432b799e179a8c4cdf0b1d7" +dependencies = [ + "bytes", + "libc", + "socket2", + "tracing", + "windows-sys 0.48.0", +] + [[package]] name = "quote" version = "1.0.35" @@ -4294,6 +4346,12 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" +[[package]] +name = "register-count" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6d8b2af7d3e6675306d6757f10b4cf0b218a9fa6a0b44d668f2132684ae4893" + [[package]] name = "resolv-conf" version = "0.7.0" @@ -6520,6 +6578,32 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tuic" +version = "5.0.0" +source = "git+https://github.com/Itsusinn/tuic.git?rev=82fab62#82fab626d6344f69f26ce4af7c87916f640ee3d3" +dependencies = [ + "bytes", + "futures-util", + "parking_lot 0.12.1", + "register-count", + "thiserror", + "uuid", +] + +[[package]] +name = "tuic-quinn" +version = "0.1.0" +source = "git+https://github.com/Itsusinn/tuic.git?rev=82fab62#82fab626d6344f69f26ce4af7c87916f640ee3d3" +dependencies = [ + "bytes", + "futures-util", + "quinn", + "thiserror", + "tuic", + "uuid", +] + [[package]] name = "tun" version = "0.6.1" diff --git a/clash_lib/Cargo.toml b/clash_lib/Cargo.toml index ebd7ec2e5..b4ce820ac 100644 --- a/clash_lib/Cargo.toml +++ b/clash_lib/Cargo.toml @@ -44,7 +44,7 @@ once_cell = "1.18.0" # opentelemetry opentelemetry = "0.22" opentelemetry_sdk = { version = "0.22", features = ["rt-tokio"] } -tracing-opentelemetry = "0.23" +tracing-opentelemetry = "0.23" opentelemetry-jaeger-propagator = "0.1.0" opentelemetry-jaeger = { version = "0.21", features = ["collector_client", "hyper_collector_client", "rt-tokio"] } opentelemetry-otlp = { version = "0.15.0", features = ["http-proto"] } @@ -84,7 +84,7 @@ hickory-proto = { version = "0.24", features = ["dns-over-rustls", "dns-over-htt # DoH # ideally we should make a CryptoProvider with boringssl and get rid of rings -rustls = { version = "0.21", features=["dangerous_configuration"] } +rustls = { version = "0.21", features=["dangerous_configuration", "quic"] } rustls-pemfile = "1.0.4" webpki-roots = "0.25" dhcproto = "0.11" @@ -108,6 +108,12 @@ murmur3 = "0.5.2" arti-client = { version = "0.14.0", default-features = false, features = ["tokio", "rustls", "compression", "static-sqlite"] } tor-rtcompat = { version = "0.10.0" } +# tuic +tuic = { rev = "82fab62", git = "https://github.com/Itsusinn/tuic.git" } +tuic-quinn = { rev = "82fab62", git = "https://github.com/Itsusinn/tuic.git" } +quinn = { version = "0.10", default-features = false, features = ["futures-io", "runtime-tokio", "tls-rustls"] } +register-count = "0.1.0" + console-subscriber = { version = "0.2.0" } tracing-timing = { version = "0.6.0" } criterion = { version = "0.5", features = ["html_reports", "async_tokio"], optional = true } diff --git a/clash_lib/src/app/outbound/manager.rs b/clash_lib/src/app/outbound/manager.rs index ee048531e..14fa49a50 100644 --- a/clash_lib/src/app/outbound/manager.rs +++ b/clash_lib/src/app/outbound/manager.rs @@ -215,7 +215,9 @@ impl OutboundManager { OutboundProxyProtocol::Tor(tor) => { handlers.insert(tor.name.clone(), tor.try_into()?); } - + OutboundProxyProtocol::Tuic(tuic) => { + handlers.insert(tuic.name.clone(), tuic.try_into()?); + } p => { unimplemented!("proto {} not supported yet", p); } diff --git a/clash_lib/src/app/remote_content_manager/providers/proxy_provider/proxy_set_provider.rs b/clash_lib/src/app/remote_content_manager/providers/proxy_provider/proxy_set_provider.rs index b28ac7fdd..e918ba611 100644 --- a/clash_lib/src/app/remote_content_manager/providers/proxy_provider/proxy_set_provider.rs +++ b/clash_lib/src/app/remote_content_manager/providers/proxy_provider/proxy_set_provider.rs @@ -104,6 +104,7 @@ impl ProxySetProvider { OutboundProxyProtocol::Vmess(vm) => vm.try_into(), OutboundProxyProtocol::Wireguard(wg) => wg.try_into(), OutboundProxyProtocol::Tor(tor) => tor.try_into(), + OutboundProxyProtocol::Tuic(tuic) => tuic.try_into(), }) .collect::, _>>(); Ok(proxies?) diff --git a/clash_lib/src/config/internal/proxy.rs b/clash_lib/src/config/internal/proxy.rs index 0603c7348..918e48cff 100644 --- a/clash_lib/src/config/internal/proxy.rs +++ b/clash_lib/src/config/internal/proxy.rs @@ -6,6 +6,7 @@ use serde::Deserialize; use serde_yaml::Value; use std::collections::HashMap; use std::fmt::{Display, Formatter}; +use uuid::Uuid; pub const PROXY_DIRECT: &str = "DIRECT"; pub const PROXY_REJECT: &str = "REJECT"; @@ -61,6 +62,8 @@ pub enum OutboundProxyProtocol { Wireguard(OutboundWireguard), #[serde(rename = "tor")] Tor(OutboundTor), + #[serde(rename = "tuic")] + Tuic(OutboundTuic), } impl OutboundProxyProtocol { @@ -74,6 +77,7 @@ impl OutboundProxyProtocol { OutboundProxyProtocol::Vmess(vmess) => &vmess.name, OutboundProxyProtocol::Wireguard(wireguard) => &wireguard.name, OutboundProxyProtocol::Tor(tor) => &tor.name, + OutboundProxyProtocol::Tuic(tuic) => &tuic.name, } } } @@ -105,6 +109,7 @@ impl Display for OutboundProxyProtocol { OutboundProxyProtocol::Vmess(_) => write!(f, "Vmess"), OutboundProxyProtocol::Wireguard(_) => write!(f, "Wireguard"), OutboundProxyProtocol::Tor(_) => write!(f, "Tor"), + OutboundProxyProtocol::Tuic(_) => write!(f, "Tuic"), } } } @@ -217,6 +222,39 @@ pub struct OutboundTor { pub name: String, } +#[derive(serde::Serialize, serde::Deserialize, Debug, Default)] +#[serde(rename_all = "kebab-case")] +pub struct OutboundTuic { + pub name: String, + pub server: String, + pub port: u16, + pub uuid: Uuid, + pub password: String, + /// override field 'server' dns record, not used for now + pub ip: Option, + pub heartbeat_interval: Option, + /// h3 + pub alpn: Option>, + pub disable_sni: Option, + pub reduce_rtt: Option, + /// millis + pub request_timeout: Option, + pub udp_relay_mode: Option, + pub congestion_controller: Option, + /// bytes + pub max_udp_relay_packet_size: Option, + pub fast_open: Option, + pub skip_cert_verify: Option, + pub max_open_stream: Option, + pub sni: Option, + /// millis + pub gc_interval: Option, + /// millis + pub gc_lifetime: Option, + pub send_window: Option, + pub receive_window: Option, +} + #[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] #[serde(tag = "type")] pub enum OutboundGroupProtocol { diff --git a/clash_lib/src/lib.rs b/clash_lib/src/lib.rs index 82f12cdd4..3910e8785 100644 --- a/clash_lib/src/lib.rs +++ b/clash_lib/src/lib.rs @@ -286,8 +286,8 @@ async fn start_async(opts: Options) -> Result<(), Error> { } runners.push(Box::pin(async move { - info!("receiving shutdown signal"); shutdown_rx.recv().await; + info!("receiving shutdown signal"); Ok(()) })); diff --git a/clash_lib/src/proxy/converters/mod.rs b/clash_lib/src/proxy/converters/mod.rs index 3b747b248..9d4cfc520 100644 --- a/clash_lib/src/proxy/converters/mod.rs +++ b/clash_lib/src/proxy/converters/mod.rs @@ -1,5 +1,6 @@ pub mod shadowsocks; pub mod tor; pub mod trojan; +pub mod tuic; pub mod vmess; pub mod wireguard; diff --git a/clash_lib/src/proxy/converters/tuic.rs b/clash_lib/src/proxy/converters/tuic.rs new file mode 100644 index 000000000..5dd43c546 --- /dev/null +++ b/clash_lib/src/proxy/converters/tuic.rs @@ -0,0 +1,59 @@ +use std::time::Duration; + +use quinn::VarInt; + +use crate::{ + config::internal::proxy::OutboundTuic, + proxy::{ + tuic::{types::CongestionControl, Handler, HandlerOptions}, + AnyOutboundHandler, + }, +}; + +impl TryFrom for AnyOutboundHandler { + type Error = crate::Error; + + fn try_from(value: OutboundTuic) -> Result { + (&value).try_into() + } +} + +impl TryFrom<&OutboundTuic> for AnyOutboundHandler { + type Error = crate::Error; + + fn try_from(s: &OutboundTuic) -> Result { + Handler::new(HandlerOptions { + name: s.name.to_owned(), + server: s.server.to_owned(), + port: s.port, + uuid: s.uuid.to_owned(), + password: s.password.to_owned(), + udp_relay_mode: s.udp_relay_mode.to_owned().unwrap_or("native".to_string()), + disable_sni: s.disable_sni.unwrap_or(false), + alpn: s + .alpn + .clone() + .map(|v| v.into_iter().map(|alpn| alpn.into_bytes()).collect()) + .unwrap_or_default(), + heartbeat_interval: Duration::from_millis(s.heartbeat_interval.unwrap_or(3000)), + reduce_rtt: s.reduce_rtt.unwrap_or(false) || s.fast_open.unwrap_or(false), + request_timeout: Duration::from_millis(s.request_timeout.unwrap_or(8000)), + congestion_controller: s + .congestion_controller + .clone() + .map(|v| CongestionControl::from(v.as_str())) + .unwrap_or_default(), + max_udp_relay_packet_size: s.max_udp_relay_packet_size.unwrap_or(1500), + max_open_stream: VarInt::from_u64(s.max_open_stream.unwrap_or(32)) + .unwrap_or(VarInt::MAX), + ip: s.ip.clone(), + skip_cert_verify: s.skip_cert_verify.unwrap_or(false), + sni: s.sni.clone(), + gc_interval: Duration::from_millis(s.gc_interval.unwrap_or(3000)), + gc_lifetime: Duration::from_millis(s.gc_lifetime.unwrap_or(15000)), + send_window: s.send_window.unwrap_or(8 * 1024 * 1024 * 2), + receive_window: VarInt::from_u64(s.receive_window.unwrap_or(8 * 1024 * 1024)) + .unwrap_or(VarInt::MAX), + }) + } +} diff --git a/clash_lib/src/proxy/mod.rs b/clash_lib/src/proxy/mod.rs index 9b0fe0b2b..96c76b826 100644 --- a/clash_lib/src/proxy/mod.rs +++ b/clash_lib/src/proxy/mod.rs @@ -31,6 +31,7 @@ pub mod shadowsocks; pub mod socks; pub mod tor; pub mod trojan; +pub mod tuic; pub mod tun; pub mod utils; pub mod vmess; @@ -110,6 +111,7 @@ pub enum OutboundType { Trojan, WireGuard, Tor, + Tuic, #[serde(rename = "URLTest")] UrlTest, diff --git a/clash_lib/src/proxy/tuic/compat.rs b/clash_lib/src/proxy/tuic/compat.rs new file mode 100644 index 000000000..997796060 --- /dev/null +++ b/clash_lib/src/proxy/tuic/compat.rs @@ -0,0 +1,41 @@ +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use futures::{Sink, SinkExt, Stream}; + +use crate::{common::errors::new_io_error, proxy::datagram::UdpPacket}; + +use super::TuicDatagramOutbound; + +impl Sink for TuicDatagramOutbound { + type Error = std::io::Error; + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.send_tx + .poll_ready_unpin(cx) + .map_err(|v| new_io_error(&format!("{v:?}"))) + } + fn start_send(mut self: Pin<&mut Self>, item: UdpPacket) -> Result<(), Self::Error> { + self.send_tx + .start_send_unpin(item) + .map_err(|v| new_io_error(&format!("{v:?}"))) + } + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.send_tx + .poll_flush_unpin(cx) + .map_err(|v| new_io_error(&format!("{v:?}"))) + } + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.send_tx + .poll_close_unpin(cx) + .map_err(|v| new_io_error(&format!("{v:?}"))) + } +} + +impl Stream for TuicDatagramOutbound { + type Item = UdpPacket; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.recv_rx.poll_recv(cx) + } +} diff --git a/clash_lib/src/proxy/tuic/handle_stream.rs b/clash_lib/src/proxy/tuic/handle_stream.rs new file mode 100644 index 000000000..6da1469dd --- /dev/null +++ b/clash_lib/src/proxy/tuic/handle_stream.rs @@ -0,0 +1,107 @@ +use std::sync::atomic::Ordering; +use std::sync::Arc; + +use bytes::Bytes; +use quinn::{RecvStream, SendStream, VarInt}; +use register_count::Register; +use tuic_quinn::Task; + +use crate::proxy::tuic::types::UdpRelayMode; + +use super::types::TuicConnection; + +impl TuicConnection { + pub async fn accept_uni_stream(&self) -> anyhow::Result<(RecvStream, Register)> { + let max = self.max_concurrent_uni_streams.load(Ordering::Relaxed); + + if self.remote_uni_stream_cnt.count() as u32 == max { + self.max_concurrent_uni_streams + .store(max * 2, Ordering::Relaxed); + + self.conn + .set_max_concurrent_uni_streams(VarInt::from(max * 2)); + } + + let recv = self.conn.accept_uni().await?; + let reg = self.remote_uni_stream_cnt.reg(); + Ok((recv, reg)) + } + + pub async fn accept_bi_stream(&self) -> anyhow::Result<(SendStream, RecvStream, Register)> { + let max = self.max_concurrent_bi_streams.load(Ordering::Relaxed); + + if self.remote_bi_stream_cnt.count() as u32 == max { + self.max_concurrent_bi_streams + .store(max * 2, Ordering::Relaxed); + + self.conn + .set_max_concurrent_bi_streams(VarInt::from(max * 2)); + } + + let (send, recv) = self.conn.accept_bi().await?; + let reg = self.remote_bi_stream_cnt.reg(); + Ok((send, recv, reg)) + } + + pub async fn accept_datagram(&self) -> anyhow::Result { + Ok(self.conn.read_datagram().await?) + } + + pub async fn handle_uni_stream(self: Arc, recv: RecvStream, _reg: Register) { + tracing::debug!("[relay] incoming unidirectional stream"); + + let res = match self.inner.accept_uni_stream(recv).await { + Err(err) => Err(anyhow!(err)), + Ok(Task::Packet(pkt)) => match self.udp_relay_mode { + UdpRelayMode::Quic => { + self.incoming_udp(pkt).await; + Ok(()) + } + UdpRelayMode::Native => Err(anyhow!("wrong packet source")), + }, + _ => unreachable!(), // already filtered in `tuic_quinn` + }; + + if let Err(err) = res { + tracing::warn!("[relay] incoming unidirectional stream error: {err}"); + } + } + + pub async fn handle_bi_stream( + self: Arc, + send: SendStream, + recv: RecvStream, + _reg: Register, + ) { + tracing::debug!("[relay] incoming bidirectional stream"); + + let err = match self.inner.accept_bi_stream(send, recv).await { + Err(err) => anyhow!(err), + _ => anyhow!("A client shouldn't receive bi stream"), + }; + + tracing::warn!("[relay] incoming bidirectional stream error: {err}"); + } + + pub async fn handle_datagram(self: Arc, dg: Bytes) { + tracing::debug!("[relay] incoming datagram"); + + let res = match self.inner.accept_datagram(dg) { + Err(err) => Err(anyhow!(err)), + Ok(Task::Packet(pkt)) => match self.udp_relay_mode { + UdpRelayMode::Native => { + self.incoming_udp(pkt).await; + Ok(()) + } + UdpRelayMode::Quic => Err(anyhow!("wrong packet source")), + }, + _ => Err(anyhow!( + "Datagram shouldn't receive any data expect UDP packet" + )), + }; + + if let Err(err) = res { + tracing::warn!("[relay] incoming datagram error: {err}"); + } + } +} diff --git a/clash_lib/src/proxy/tuic/handle_task.rs b/clash_lib/src/proxy/tuic/handle_task.rs new file mode 100644 index 000000000..340dc027f --- /dev/null +++ b/clash_lib/src/proxy/tuic/handle_task.rs @@ -0,0 +1,179 @@ +use std::{sync::Arc, time::Duration}; + +use bytes::Bytes; +use quinn::ZeroRttAccepted; + +use anyhow::Result; +use tuic::Address; +use tuic_quinn::{Connect, Packet}; + +use crate::proxy::datagram::UdpPacket; +use crate::session::SocksAddr as ClashSocksAddr; + +use super::types::{TuicConnection, UdpRelayMode}; + +impl TuicConnection { + pub async fn tuic_auth(self: Arc, zero_rtt_accepted: Option) { + if let Some(zero_rtt_accepted) = zero_rtt_accepted { + tracing::debug!("[auth] waiting for connection to be fully established"); + zero_rtt_accepted.await; + } + + tracing::debug!("[auth] sending authentication"); + + match self + .inner + .authenticate(self.uuid, self.password.clone()) + .await + { + Ok(()) => tracing::info!("[auth] {uuid}", uuid = self.uuid), + Err(err) => { + tracing::warn!("[auth] authentication sending error: {err}") + } + } + } + + pub async fn connect_tcp(&self, addr: Address) -> Result { + let addr_display = addr.to_string(); + tracing::info!("[tcp] {addr_display}"); + + match self.inner.connect(addr).await { + Ok(conn) => Ok(conn), + Err(err) => { + tracing::warn!("[tcp] failed initializing relay to {addr_display}: {err}"); + Err(anyhow!(err)) + } + } + } + + pub async fn outgoing_udp( + &self, + pkt: Bytes, + addr: Address, + assoc_id: u16, + ) -> anyhow::Result<()> { + let addr_display = addr.to_string(); + + match self.udp_relay_mode { + UdpRelayMode::Native => { + tracing::info!("[udp] [{assoc_id:#06x}] [to-native] to {addr_display}"); + match self.inner.packet_native(pkt, addr, assoc_id) { + Ok(()) => Ok(()), + Err(err) => { + tracing::warn!( + "[udp] [{assoc_id:#06x}] [to-native] to {addr_display}: {err}" + ); + Err(anyhow!(err)) + } + } + } + UdpRelayMode::Quic => { + tracing::info!("[udp] [{assoc_id:#06x}] [to-quic] {addr_display}"); + match self.inner.packet_quic(pkt, addr, assoc_id).await { + Ok(()) => Ok(()), + Err(err) => { + tracing::warn!( + "[udp] [{assoc_id:#06x}] [to-quic] to {addr_display}: {err}" + ); + Err(anyhow!(err)) + } + } + } + } + } + + pub async fn incoming_udp(&self, pkt: Packet) { + let assoc_id = pkt.assoc_id(); + let pkt_id = pkt.pkt_id(); + + let mode = if pkt.is_from_native() { + "native" + } else if pkt.is_from_quic() { + "quic" + } else { + unreachable!() + }; + + tracing::info!( + "[udp] [{assoc_id:#06x}] [from-{mode}] [{pkt_id:#06x}] fragment {frag_id}/{frag_total}", + frag_id = pkt.frag_id() + 1, + frag_total = pkt.frag_total(), + ); + match pkt.accept().await { + Ok(Some((data, remote_addr, _))) => { + tracing::info!("[udp] [{assoc_id:#06x}] [from-{mode}] [{pkt_id:#06x}] from {remote_addr}"); + let (session, local_addr) = match self.udp_sessions.read().await.get(&assoc_id) { + Some(v) => (v.incoming.clone(), v.local_addr.clone()), + None => { + tracing::error!("[udp] [{assoc_id:#06x}] [from-{mode}] [{pkt_id:#06x}] unable to find udp session"); + return; + }, + }; + let remote_addr = match remote_addr { + Address::None => unreachable!(), + Address::DomainAddress(domain, port) => ClashSocksAddr::Domain(domain, port), + Address::SocketAddress(socket) => ClashSocksAddr::Ip(socket), + }; + if let Err(err) = session.send(UdpPacket::new(data.into(), remote_addr, local_addr)).await { + tracing::error!("[udp] [{assoc_id:#06x}] [from-{mode}] [{pkt_id:#06x}] failed sending packet: {err}") + }; + }, + Ok(None) => {} + Err(err) => tracing::error!("[udp] [{assoc_id:#06x}] [from-{mode}] [{pkt_id:#06x}] packet receiving error: {err}"), + } + } + + pub async fn dissociate(&self, assoc_id: u16) -> Result<()> { + tracing::info!("[udp] [dissociate] [{assoc_id:#06x}]"); + match self.inner.dissociate(assoc_id).await { + Ok(()) => Ok(()), + Err(err) => { + tracing::warn!("[udp] [dissociate] [{assoc_id:#06x}] {err}"); + Err(err)? + } + } + } + + async fn heartbeat(&self) -> Result<()> { + self.check_open()?; + if self.inner.task_connect_count() + self.inner.task_associate_count() == 0 { + return Ok(()); + } + + match self.inner.heartbeat().await { + Ok(()) => tracing::trace!("[heartbeat]"), + Err(err) => tracing::error!("[heartbeat] {err}"), + } + Ok(()) + } + + fn collect_garbage(&self, gc_lifetime: Duration) -> Result<()> { + self.check_open()?; + tracing::trace!("[gc]"); + self.inner.collect_garbage(gc_lifetime); + Ok(()) + } + /// Tasks triggered by timer + /// Won't return unless occurs error + pub async fn cyclical_tasks( + self: Arc, + heartbeat_interval: Duration, + gc_interval: Duration, + gc_lifetime: Duration, + ) -> anyhow::Error { + let mut heartbeat_interval = tokio::time::interval(heartbeat_interval); + let mut gc_interval = tokio::time::interval(gc_interval); + loop { + tokio::select! { + _ = heartbeat_interval.tick() => match self.heartbeat().await { + Ok(_) => { }, + Err(err) => break err, + }, + _ = gc_interval.tick() => match self.collect_garbage(gc_lifetime) { + Ok(_) => { }, + Err(err) => break err, + }, + } + } + } +} diff --git a/clash_lib/src/proxy/tuic/mod.rs b/clash_lib/src/proxy/tuic/mod.rs new file mode 100644 index 000000000..23327c859 --- /dev/null +++ b/clash_lib/src/proxy/tuic/mod.rs @@ -0,0 +1,290 @@ +mod compat; +mod handle_stream; +mod handle_task; +pub(crate) mod types; + +use crate::proxy::tuic::types::SocketAdderTrans; +use anyhow::Result; +use axum::async_trait; +use quinn::{EndpointConfig, TokioRuntime}; +use std::net::SocketAddr; +use std::{ + net::{Ipv4Addr, Ipv6Addr, UdpSocket}, + sync::{ + atomic::{AtomicU16, Ordering}, + Arc, + }, + time::Duration, +}; +use tokio_util::compat::FuturesAsyncReadCompatExt; +use uuid::Uuid; + +use crate::{ + app::{ + dispatcher::{ + BoxedChainedDatagram, BoxedChainedStream, ChainedDatagram, ChainedDatagramWrapper, + ChainedStream, ChainedStreamWrapper, + }, + dns::ThreadSafeDNSResolver, + }, + common::tls::GLOBAL_ROOT_STORE, + proxy::tuic::types::{ServerAddr, TuicEndpoint}, + session::{Session, SocksAddr}, +}; + +use crate::session::SocksAddr as ClashSocksAddr; +use quinn::ClientConfig as QuinnConfig; +use quinn::Endpoint as QuinnEndpoint; +use quinn::TransportConfig as QuinnTransportConfig; +use quinn::{congestion::CubicConfig, VarInt}; +use tokio::sync::Mutex as AsyncMutex; + +use rustls::client::ClientConfig as TlsConfig; + +use self::types::{CongestionControl, TuicConnection, UdpSession}; + +use super::{ + datagram::UdpPacket, AnyOutboundDatagram, AnyOutboundHandler, AnyStream, OutboundHandler, + OutboundType, +}; + +#[derive(Debug, Clone)] +pub struct HandlerOptions { + pub name: String, + pub server: String, + pub port: u16, + pub uuid: Uuid, + pub password: String, + pub udp_relay_mode: String, + pub disable_sni: bool, + pub alpn: Vec>, + pub heartbeat_interval: Duration, + pub reduce_rtt: bool, + pub request_timeout: Duration, + pub congestion_controller: CongestionControl, + pub max_udp_relay_packet_size: u64, + pub max_open_stream: VarInt, + pub gc_interval: Duration, + pub gc_lifetime: Duration, + pub send_window: u64, + pub receive_window: VarInt, + + /// not used + pub ip: Option, + pub skip_cert_verify: bool, + pub sni: Option, +} + +pub struct Handler { + opts: HandlerOptions, + ep: TuicEndpoint, + conn: AsyncMutex>>, + next_assoc_id: AtomicU16, +} + +#[async_trait] +impl OutboundHandler for Handler { + fn name(&self) -> &str { + &self.opts.name + } + + fn proto(&self) -> OutboundType { + OutboundType::Tuic + } + + async fn remote_addr(&self) -> Option { + None + } + + async fn support_udp(&self) -> bool { + true + } + + async fn proxy_stream( + &self, + s: AnyStream, + _sess: &Session, + _resolver: ThreadSafeDNSResolver, + ) -> std::io::Result { + tracing::warn!("Proxy stream currently is direcrt connect"); + Ok(s) + } + + async fn connect_stream( + &self, + sess: &Session, + resolver: ThreadSafeDNSResolver, + ) -> std::io::Result { + self.do_connect_stream(sess, resolver).await.map_err(|e| { + tracing::error!("{:?}", e); + std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) + }) + } + async fn connect_datagram( + &self, + sess: &Session, + resolver: ThreadSafeDNSResolver, + ) -> std::io::Result { + self.do_connect_datagram(sess, resolver).await.map_err(|e| { + tracing::error!("{:?}", e); + std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) + }) + } +} + +impl Handler { + #[allow(clippy::new_ret_no_self)] + pub fn new(opts: HandlerOptions) -> Result { + let mut crypto = TlsConfig::builder() + .with_safe_default_cipher_suites() + .with_safe_default_kx_groups() + .with_protocol_versions(&[&rustls::version::TLS13]) + .unwrap() + .with_root_certificates(GLOBAL_ROOT_STORE.clone()) + .with_no_client_auth(); + // TODO(error-handling) if alpn not match the following error will be throw: aborted by peer: the cryptographic handshake failed: error 120: peer doesn't support any known protocol + crypto.alpn_protocols = opts.alpn.clone(); + crypto.enable_early_data = true; + crypto.enable_sni = !opts.disable_sni; + let mut quinn_config = QuinnConfig::new(Arc::new(crypto)); + let mut quinn_transport_config = QuinnTransportConfig::default(); + quinn_transport_config + .max_concurrent_bidi_streams(opts.max_open_stream) + .max_concurrent_uni_streams(opts.max_open_stream) + .send_window(opts.send_window) + .stream_receive_window(opts.receive_window) + .max_idle_timeout(None) + .congestion_controller_factory(Arc::new(CubicConfig::default())); + quinn_config.transport_config(Arc::new(quinn_transport_config)); + // Try to create an IPv4 socket as the placeholder first, if it fails, try IPv6. + let socket = + UdpSocket::bind(SocketAddr::from((Ipv4Addr::UNSPECIFIED, 0))).or_else(|err| { + UdpSocket::bind(SocketAddr::from((Ipv6Addr::UNSPECIFIED, 0))).map_err(|_| err) + })?; + + let mut endpoint = QuinnEndpoint::new( + EndpointConfig::default(), + None, + socket, + Arc::new(TokioRuntime), + )?; + endpoint.set_default_client_config(quinn_config); + let endpoint = TuicEndpoint { + ep: endpoint, + server: ServerAddr::new(opts.server.clone(), opts.port, None), + uuid: opts.uuid, + password: Arc::from(opts.password.clone().into_bytes().into_boxed_slice()), + udp_relay_mode: types::UdpRelayMode::Native, + zero_rtt_handshake: opts.reduce_rtt, + heartbeat: opts.heartbeat_interval, + gc_interval: opts.gc_interval, + gc_lifetime: opts.gc_lifetime, + }; + Ok(Arc::new(Self { + opts, + ep: endpoint, + conn: AsyncMutex::new(None), + next_assoc_id: AtomicU16::new(0), + })) + } + async fn get_conn(&self) -> Result> { + let fut = async { + let mut guard = self.conn.lock().await; + if guard.is_none() { + // init + *guard = Some(self.ep.connect().await?); + } + let conn = guard.take().unwrap(); + let conn = if conn.check_open().is_err() { + // reconnect + self.ep.connect().await? + } else { + conn + }; + *guard = Some(conn.clone()); + Ok(conn) + }; + tokio::time::timeout(self.opts.request_timeout, fut).await? + } + + async fn do_connect_stream( + &self, + sess: &Session, + _resolver: ThreadSafeDNSResolver, + ) -> Result { + let conn = self.get_conn().await?; + let dest = sess.destination.clone().into_tuic(); + let tuic_tcp = conn.connect_tcp(dest).await?.compat(); + + let s = ChainedStreamWrapper::new(tuic_tcp); + s.append_to_chain(self.name()).await; + Ok(Box::new(s)) + } + + async fn do_connect_datagram( + &self, + sess: &Session, + _resolver: ThreadSafeDNSResolver, + ) -> Result { + let conn = self.get_conn().await?; + + let assos_id = self.next_assoc_id.fetch_add(1, Ordering::Relaxed); + let quic_udp = TuicDatagramOutbound::new(assos_id, conn, sess.source.into()); + let s = ChainedDatagramWrapper::new(quic_udp); + s.append_to_chain(self.name()).await; + Ok(Box::new(s)) + } +} + +#[derive(Debug)] +struct TuicDatagramOutbound { + send_tx: tokio_util::sync::PollSender, + recv_rx: tokio::sync::mpsc::Receiver, +} + +impl TuicDatagramOutbound { + #[allow(clippy::new_ret_no_self)] + pub fn new( + assoc_id: u16, + conn: Arc, + local_addr: ClashSocksAddr, + ) -> AnyOutboundDatagram { + // TODO not sure about the size of buffer + let (send_tx, send_rx) = tokio::sync::mpsc::channel::(32); + let (recv_tx, recv_rx) = tokio::sync::mpsc::channel::(32); + let udp_sessions = conn.udp_sessions.clone(); + tokio::spawn(async move { + // capture vars + let (mut send_rx, recv_tx) = (send_rx, recv_tx); + udp_sessions.write().await.insert( + assoc_id, + UdpSession { + incoming: recv_tx, + local_addr, + }, + ); + while let Some(next_send) = send_rx.recv().await { + let res = conn + .outgoing_udp( + next_send.data.into(), + next_send.dst_addr.into_tuic(), + assoc_id, + ) + .await; + if res.is_err() { + break; + } + } + // TuicDatagramOutbound dropped or outgoing_udp occurs error + tracing::info!("[udp] [dissociate] closing UDP session [{assoc_id:#06x}]"); + _ = conn.dissociate(assoc_id).await; + udp_sessions.write().await.remove(&assoc_id); + anyhow::Ok(()) + }); + let s = Self { + send_tx: tokio_util::sync::PollSender::new(send_tx), + recv_rx, + }; + Box::new(s) + } +} diff --git a/clash_lib/src/proxy/tuic/types.rs b/clash_lib/src/proxy/tuic/types.rs new file mode 100644 index 000000000..84d8a5692 --- /dev/null +++ b/clash_lib/src/proxy/tuic/types.rs @@ -0,0 +1,268 @@ +use crate::session::SocksAddr as ClashSocksAddr; +use anyhow::Result; +use quinn::Connection as QuinnConnection; +use quinn::{Endpoint as QuinnEndpoint, ZeroRttAccepted}; +use register_count::Counter; +use std::collections::HashMap; +use std::{ + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}, + str::FromStr, + sync::{atomic::AtomicU32, Arc}, + time::Duration, +}; +use tokio::sync::RwLock as AsyncRwLock; +use tuic_quinn::Connection as InnerConnection; +use uuid::Uuid; + +use crate::proxy::datagram::UdpPacket; + +pub struct TuicEndpoint { + pub ep: QuinnEndpoint, + pub server: ServerAddr, + pub uuid: Uuid, + pub password: Arc<[u8]>, + pub udp_relay_mode: UdpRelayMode, + pub zero_rtt_handshake: bool, + pub heartbeat: Duration, + pub gc_interval: Duration, + pub gc_lifetime: Duration, +} +impl TuicEndpoint { + pub async fn connect(&self) -> Result> { + let mut last_err = None; + + for addr in self.server.resolve().await? { + let connect_to = async { + let match_ipv4 = + addr.is_ipv4() && self.ep.local_addr().map_or(false, |addr| addr.is_ipv4()); + let match_ipv6 = + addr.is_ipv6() && self.ep.local_addr().map_or(false, |addr| addr.is_ipv6()); + + if !match_ipv4 && !match_ipv6 { + let bind_addr = if addr.is_ipv4() { + SocketAddr::from((Ipv4Addr::UNSPECIFIED, 0)) + } else { + SocketAddr::from((Ipv6Addr::UNSPECIFIED, 0)) + }; + + self.ep + .rebind(UdpSocket::bind(bind_addr).map_err(|err| { + anyhow!("failed to create endpoint UDP socket {}", err) + })?) + .map_err(|err| anyhow!("failed to rebind endpoint UDP socket {}", err))?; + } + + tracing::trace!("Connect to {} {}", addr, self.server.server_name()); + let conn = self.ep.connect(addr, self.server.server_name())?; + let (conn, zero_rtt_accepted) = if self.zero_rtt_handshake { + match conn.into_0rtt() { + Ok((conn, zero_rtt_accepted)) => (conn, Some(zero_rtt_accepted)), + Err(conn) => (conn.await?, None), + } + } else { + (conn.await?, None) + }; + + Ok((conn, zero_rtt_accepted)) + }; + + match connect_to.await { + Ok((conn, zero_rtt_accepted)) => { + return Ok(TuicConnection::new( + conn, + zero_rtt_accepted, + self.udp_relay_mode, + self.uuid, + self.password.clone(), + self.heartbeat, + self.gc_interval, + self.gc_lifetime, + )); + } + Err(err) => last_err = Some(err), + } + } + Err(last_err.unwrap_or(anyhow!("dns resolve"))) + } +} + +#[derive(Clone)] +pub struct TuicConnection { + pub conn: QuinnConnection, + pub inner: InnerConnection, + pub uuid: Uuid, + pub password: Arc<[u8]>, + pub remote_uni_stream_cnt: Counter, + pub remote_bi_stream_cnt: Counter, + pub max_concurrent_uni_streams: Arc, + pub max_concurrent_bi_streams: Arc, + pub udp_relay_mode: UdpRelayMode, + pub udp_sessions: Arc>>, +} + +pub struct UdpSession { + pub incoming: tokio::sync::mpsc::Sender, + pub local_addr: ClashSocksAddr, +} + +impl TuicConnection { + pub fn check_open(&self) -> Result<()> { + match self.conn.close_reason() { + Some(err) => Err(err)?, + None => Ok(()), + } + } + #[allow(clippy::too_many_arguments)] + fn new( + conn: QuinnConnection, + zero_rtt_accepted: Option, + udp_relay_mode: UdpRelayMode, + uuid: Uuid, + password: Arc<[u8]>, + heartbeat: Duration, + gc_interval: Duration, + gc_lifetime: Duration, + ) -> Arc { + let conn = Self { + conn: conn.clone(), + inner: InnerConnection::::new(conn), + uuid, + password, + udp_relay_mode, + remote_uni_stream_cnt: Counter::new(), + remote_bi_stream_cnt: Counter::new(), + // TODO: seems tuic dynamicly adjust the size of max concurrent streams, is it necessary to configure the stream size? + max_concurrent_uni_streams: Arc::new(AtomicU32::new(32)), + max_concurrent_bi_streams: Arc::new(AtomicU32::new(32)), + udp_sessions: Arc::new(AsyncRwLock::new(HashMap::new())), + }; + let conn = Arc::new(conn); + tokio::spawn( + conn.clone() + .init(zero_rtt_accepted, heartbeat, gc_interval, gc_lifetime), + ); + + conn + } + async fn init( + self: Arc, + zero_rtt_accepted: Option, + heartbeat: Duration, + gc_interval: Duration, + gc_lifetime: Duration, + ) { + tracing::info!("connection established"); + + // TODO check the cancellation safety of tuic_auth + tokio::spawn(self.clone().tuic_auth(zero_rtt_accepted)); + tokio::spawn( + self.clone() + .cyclical_tasks(heartbeat, gc_interval, gc_lifetime), + ); + + let err = loop { + tokio::select! { + res = self.accept_uni_stream() => match res { + Ok((recv, reg)) => tokio::spawn(self.clone().handle_uni_stream(recv, reg)), + Err(err) => break err, + }, + res = self.accept_bi_stream() => match res { + Ok((send, recv, reg)) => tokio::spawn(self.clone().handle_bi_stream(send, recv, reg)), + Err(err) => break err, + }, + res = self.accept_datagram() => match res { + Ok(dg) => tokio::spawn(self.clone().handle_datagram(dg)), + Err(err) => break err, + }, + }; + }; + + tracing::warn!("connection error: {err}"); + } +} + +pub struct ServerAddr { + domain: String, + port: u16, + ip: Option, +} +impl ServerAddr { + pub fn new(domain: String, port: u16, ip: Option) -> Self { + Self { domain, port, ip } + } + + pub fn server_name(&self) -> &str { + &self.domain + } + // TODO change to clash dns? + pub async fn resolve(&self) -> Result> { + if let Some(ip) = self.ip { + Ok(vec![SocketAddr::from((ip, self.port))].into_iter()) + } else { + Ok(tokio::net::lookup_host((self.domain.as_str(), self.port)) + .await? + .collect::>() + .into_iter()) + } + } +} + +#[derive(Debug, Clone, Copy)] +pub enum UdpRelayMode { + Native, + Quic, +} + +impl FromStr for UdpRelayMode { + type Err = &'static str; + + fn from_str(s: &str) -> Result { + if s.eq_ignore_ascii_case("native") { + Ok(Self::Native) + } else if s.eq_ignore_ascii_case("quic") { + Ok(Self::Quic) + } else { + Err("invalid UDP relay mode") + } + } +} + +#[derive(Debug, Clone, Copy)] +pub enum CongestionControl { + Cubic, + NewReno, + Bbr, +} +impl From<&str> for CongestionControl { + fn from(s: &str) -> Self { + if s.eq_ignore_ascii_case("cubic") { + Self::Cubic + } else if s.eq_ignore_ascii_case("new_reno") || s.eq_ignore_ascii_case("newreno") { + Self::NewReno + } else if s.eq_ignore_ascii_case("bbr") { + Self::Bbr + } else { + tracing::warn!("Unknown congestion controller {s}. Use default controller"); + Self::default() + } + } +} + +impl Default for CongestionControl { + fn default() -> Self { + Self::Cubic + } +} + +pub trait SocketAdderTrans { + fn into_tuic(self) -> tuic::Address; +} +impl SocketAdderTrans for crate::session::SocksAddr { + fn into_tuic(self) -> tuic::Address { + use crate::session::SocksAddr; + match self { + SocksAddr::Ip(addr) => tuic::Address::SocketAddress(addr), + SocksAddr::Domain(domain, port) => tuic::Address::DomainAddress(domain, port), + } + } +}