From f99e92a892e912224a824555b4dcdd438e871d04 Mon Sep 17 00:00:00 2001 From: Yuwei Ba Date: Thu, 7 Dec 2023 03:32:50 +1100 Subject: [PATCH] implement grpc for trojan and vmess (#203) * grpc * up * up * up * up * up * WIP * up * WIP * WIP * WIP * WIP * test with trojan + grpc * up * up * up * up * up * up * up * trojan grpc working * it worked --- Cargo.lock | 7 +- clash/src/main.rs | 10 +- clash/tests/data/config/rules.yaml | 36 ++- clash_lib/Cargo.toml | 6 +- clash_lib/src/app/outbound/manager.rs | 4 + .../app/remote_content_manager/healthcheck.rs | 16 +- clash_lib/src/config/internal/proxy.rs | 1 + clash_lib/src/lib.rs | 19 +- clash_lib/src/proxy/converters/trojan.rs | 7 +- clash_lib/src/proxy/converters/vmess.rs | 20 +- clash_lib/src/proxy/transport/grpc.rs | 294 ++++++++++++------ clash_lib/src/proxy/transport/tls.rs | 29 +- clash_lib/src/proxy/trojan/mod.rs | 34 +- clash_lib/src/proxy/trojan/stream.rs | 1 - clash_lib/src/proxy/vmess/mod.rs | 18 +- .../src/proxy/vmess/vmess_impl/stream.rs | 21 +- docker/docker-compose.yml | 8 +- docker/nginx/nginx.conf | 27 ++ docker/trojan/config.json | 51 --- docker/v2ray/config.json | 63 ++++ scripts/gun.proto | 11 + 21 files changed, 475 insertions(+), 208 deletions(-) delete mode 100644 clash_lib/src/proxy/trojan/stream.rs create mode 100644 docker/nginx/nginx.conf delete mode 100644 docker/trojan/config.json create mode 100644 scripts/gun.proto diff --git a/Cargo.lock b/Cargo.lock index c9535e930..80e157198 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1582,9 +1582,9 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "h2" -version = "0.3.21" +version = "0.3.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91fc23aa11be92976ef4729127f1a74adf36d8436f7816b185d18df956790833" +checksum = "4d6250322ef6e60f93f9a2162799302cd6f68f79f6e5d85c8c16f14d1d958178" dependencies = [ "bytes", "fnv", @@ -1592,7 +1592,7 @@ dependencies = [ "futures-sink", "futures-util", "http", - "indexmap 1.9.3", + "indexmap 2.0.0", "slab", "tokio", "tokio-util", @@ -3872,6 +3872,7 @@ checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", "pin-project-lite", "tokio", diff --git a/clash/src/main.rs b/clash/src/main.rs index b2d6fbd34..44c9f70ac 100644 --- a/clash/src/main.rs +++ b/clash/src/main.rs @@ -59,11 +59,15 @@ fn main() { } } } - clash::start(clash::Options { + match clash::start(clash::Options { config: clash::Config::File(file), cwd: cli.directory.map(|x| x.to_string_lossy().to_string()), rt: Some(TokioRuntime::MultiThread), log_file: None, - }) - .unwrap(); + }) { + Ok(_) => {} + Err(_) => { + exit(1); + } + } } diff --git a/clash/tests/data/config/rules.yaml b/clash/tests/data/config/rules.yaml index c7d9eb5c7..567378372 100644 --- a/clash/tests/data/config/rules.yaml +++ b/clash/tests/data/config/rules.yaml @@ -43,7 +43,6 @@ dns: - 114.114.114.114 # default value - 1.1.1.1 # default value - tls://1.1.1.1:853 # DNS over TLS - - https://1.1.1.1/dns-query # DNS over HTTPS # - dhcp://en0 # dns from dhcp allow-lan: true @@ -167,6 +166,20 @@ proxies: h2-opts: path: /ray + - name: grpc-vmess + type: vmess + server: 10.0.0.13 + port: 19443 + uuid: b831381d-6324-4d53-ad4f-8cda48b30811 + alterId: 0 + cipher: auto + udp: true + skip-cert-verify: true + tls: true + network: grpc + grpc-opts: + grpc-service-name: abc + - name: vmess-altid type: vmess server: tw-1.ac.laowanxiang.com @@ -187,6 +200,7 @@ proxies: cipher: aes-256-gcm password: "password" udp: true + - name: "trojan" type: trojan server: 10.0.0.13 @@ -199,6 +213,20 @@ proxies: - http/1.1 skip-cert-verify: true + - name: "trojan-grpc" + type: trojan + server: 10.0.0.13 + port: 19443 + password: password1 + udp: true + # sni: example.com # aka server name + alpn: + - h2 + skip-cert-verify: true + network: grpc + grpc-opts: + grpc-service-name: def + proxy-providers: file-provider: type: file @@ -217,11 +245,13 @@ rule-providers: behavior: domain rules: - - DOMAIN,ipinfo.io,relay + - DOMAIN,google.com,grpc-vmess + - DOMAIN-KEYWORD,httpbin,trojan-grpc + - DOMAIN,ipinfo.io,trojan-grpc # - RULE-SET,file-provider,trojan - GEOIP,CN,relay - DOMAIN-SUFFIX,facebook.com,REJECT - - DOMAIN-KEYWORD,google,select + - DOMAIN-KEYWORD,google,grpc-vmess - DOMAIN,google.com,select - SRC-IP-CIDR,192.168.1.1/24,DIRECT - GEOIP,CN,DIRECT diff --git a/clash_lib/Cargo.toml b/clash_lib/Cargo.toml index 14307a64b..767ae16f9 100644 --- a/clash_lib/Cargo.toml +++ b/clash_lib/Cargo.toml @@ -11,7 +11,7 @@ bench = ["criterion"] [dependencies] tokio = { version = "1", features = ["full"] } -tokio-util = { version = "0.7", features = ["net", "codec", "io"] } +tokio-util = { version = "0.7", features = ["net", "codec", "io", "compat"] } tokio-rustls = "0.24" thiserror = "1.0" async-trait = "0.1" @@ -26,9 +26,9 @@ byteorder = "1.5" state = "0.6" lru_time_cache = "0.11" hyper = { version = "0.14", features = ["http1","http2","client", "server", "tcp"] } -http = { version = "0.2" } +http = { version = "0.2.11" } httparse = "1.8.0" -h2 = "0.3" +h2 = "0.3.22" prost = "0.12" tower = { version = "0.4", features = ["util"] } libc = "0.2" diff --git a/clash_lib/src/app/outbound/manager.rs b/clash_lib/src/app/outbound/manager.rs index e2758e732..29a60a8e4 100644 --- a/clash_lib/src/app/outbound/manager.rs +++ b/clash_lib/src/app/outbound/manager.rs @@ -6,6 +6,7 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use tokio::sync::{Mutex, RwLock}; +use tracing::debug; use tracing::error; use tracing::info; @@ -65,6 +66,7 @@ impl OutboundManager { let mut selector_control = HashMap::new(); let proxy_manager = ProxyManager::new(dns_resolver.clone()); + debug!("initializing proxy providers"); Self::load_proxy_providers( cwd, proxy_providers, @@ -74,6 +76,7 @@ impl OutboundManager { ) .await?; + debug!("initializing handlers"); Self::load_handlers( outbounds, outbound_groups, @@ -628,6 +631,7 @@ impl OutboundManager { error!("failed to initialize proxy provider {}: {}", p.name(), err); } } + info!("initialized provider {}", p.name()); } Ok(()) diff --git a/clash_lib/src/app/remote_content_manager/healthcheck.rs b/clash_lib/src/app/remote_content_manager/healthcheck.rs index 11298faee..32756792d 100644 --- a/clash_lib/src/app/remote_content_manager/healthcheck.rs +++ b/clash_lib/src/app/remote_content_manager/healthcheck.rs @@ -49,13 +49,15 @@ impl HealthCheck { let lazy = self.lazy; let proxies = self.inner.read().await.proxies.clone(); - let url = self.url.clone(); - tokio::spawn(async move { - proxy_manager.check(&proxies, &url, None).await; - }); + { + let url = self.url.clone(); + let proxies = proxies.clone(); + tokio::spawn(async move { + proxy_manager.check(&proxies, &url, None).await; + }); + } let inner = self.inner.clone(); - let proxies = self.inner.read().await.proxies.clone(); let proxy_manager = self.proxy_manager.clone(); let url = self.url.clone(); let task_handle = tokio::spawn(async move { @@ -65,8 +67,8 @@ impl HealthCheck { _ = ticker.tick() => { pm_debug!("healthcheck ticking: {}, lazy: {}", url, lazy); let now = tokio::time::Instant::now(); - let r = inner.read().await; - if !lazy || now.duration_since(r.last_check).as_secs() >= interval { + let last_check = inner.read().await.last_check; + if !lazy || now.duration_since(last_check).as_secs() >= interval { proxy_manager.check(&proxies, &url, None).await; let mut w = inner.write().await; w.last_check = now; diff --git a/clash_lib/src/config/internal/proxy.rs b/clash_lib/src/config/internal/proxy.rs index 5d37dfacc..83fce4d16 100644 --- a/clash_lib/src/config/internal/proxy.rs +++ b/clash_lib/src/config/internal/proxy.rs @@ -170,6 +170,7 @@ pub struct OutboundVmess { pub network: Option, pub ws_opts: Option, pub h2_opts: Option, + pub grpc_opts: Option, } #[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] diff --git a/clash_lib/src/lib.rs b/clash_lib/src/lib.rs index fa4a13246..a6f412ca9 100644 --- a/clash_lib/src/lib.rs +++ b/clash_lib/src/lib.rs @@ -17,16 +17,17 @@ use common::http::new_http_client; use common::mmdb; use config::def::LogLevel; use proxy::tun::get_tun_runner; + use state::InitCell; use std::io; use std::path::PathBuf; -use tokio::task::JoinHandle; -use tracing::error; -use tracing::info; - use std::sync::Arc; use thiserror::Error; use tokio::sync::{broadcast, mpsc, Mutex}; +use tokio::task::JoinHandle; +use tracing::debug; +use tracing::error; +use tracing::info; mod app; mod common; @@ -161,9 +162,12 @@ async fn start_async(opts: Options) -> Result<(), Error> { let mut tasks = Vec::::new(); let mut runners = Vec::new(); + debug!("initializing dns resolver"); let system_resolver = Arc::new(SystemResolver::new().map_err(|x| Error::DNSError(x.to_string()))?); let client = new_http_client(system_resolver).map_err(|x| Error::DNSError(x.to_string()))?; + + debug!("initializing mmdb"); let mmdb = Arc::new( mmdb::MMDB::new( cwd.join(&config.general.mmdb), @@ -173,6 +177,7 @@ async fn start_async(opts: Options) -> Result<(), Error> { .await?, ); + debug!("initializing cache store"); let cache_store = profile::ThreadSafeCacheFile::new( cwd.join("cache.db").as_path().to_str().unwrap(), config.profile.store_selected, @@ -180,6 +185,7 @@ async fn start_async(opts: Options) -> Result<(), Error> { let dns_resolver = dns::Resolver::new(&config.dns, cache_store.clone(), mmdb.clone()).await; + debug!("initializing outbound manager"); let outbound_manager = Arc::new( OutboundManager::new( config @@ -207,6 +213,7 @@ async fn start_async(opts: Options) -> Result<(), Error> { .await?, ); + debug!("initializing router"); let router = Arc::new( Router::new( config.rules, @@ -230,6 +237,7 @@ async fn start_async(opts: Options) -> Result<(), Error> { let authenticator = Arc::new(auth::PlainAuthenticator::new(config.users)); + debug!("initializing inbound manager"); let inbound_manager = Arc::new(Mutex::new(InboundManager::new( config.general.inbound, dispatcher.clone(), @@ -244,6 +252,7 @@ async fn start_async(opts: Options) -> Result<(), Error> { runners.push(tun_runner); } + debug!("initializing dns listener"); let dns_listener_handle = dns::get_dns_listener(config.dns, dns_resolver.clone()) .await .map(|l| tokio::spawn(l)); @@ -272,7 +281,7 @@ async fn start_async(opts: Options) -> Result<(), Error> { } runners.push(Box::pin(async move { - info!("receive shutdown signal"); + info!("receiving shutdown signal"); shutdown_rx.recv().await; Ok(()) })); diff --git a/clash_lib/src/proxy/converters/trojan.rs b/clash_lib/src/proxy/converters/trojan.rs index abcaac806..40f322dae 100644 --- a/clash_lib/src/proxy/converters/trojan.rs +++ b/clash_lib/src/proxy/converters/trojan.rs @@ -82,7 +82,12 @@ impl TryFrom<&OutboundTrojan> for AnyOutboundHandler { .ok_or(Error::InvalidConfig( "grpc_opts is required for grpc".to_owned(), )), - _ => return Err(Error::InvalidConfig(format!("unsupported network: {}", x))), + _ => { + return Err(Error::InvalidConfig(format!( + "unsupported trojan network: {}", + x + ))) + } }) .transpose()?, }); diff --git a/clash_lib/src/proxy/converters/vmess.rs b/clash_lib/src/proxy/converters/vmess.rs index 67bc4c9be..d4c31520a 100644 --- a/clash_lib/src/proxy/converters/vmess.rs +++ b/clash_lib/src/proxy/converters/vmess.rs @@ -3,7 +3,7 @@ use tracing::warn; use crate::{ config::internal::proxy::OutboundVmess, proxy::{ - options::{Http2Option, WsOption}, + options::{GrpcOption, Http2Option, WsOption}, transport::TLSOptions, vmess::{Handler, HandlerOptions, VmessTransport}, AnyOutboundHandler, CommonOption, @@ -75,6 +75,22 @@ impl TryFrom<&OutboundVmess> for AnyOutboundHandler { .ok_or(Error::InvalidConfig( "h2_opts is required for h2".to_owned(), )), + "grpc" => s + .grpc_opts + .as_ref() + .map(|x| { + VmessTransport::Grpc(GrpcOption { + service_name: x + .grpc_service_name + .as_ref() + .to_owned() + .unwrap_or(&"GunService".to_owned()) + .to_owned(), + }) + }) + .ok_or(Error::InvalidConfig( + "grpc_opts is required for grpc".to_owned(), + )), _ => { return Err(Error::InvalidConfig(format!("unsupported network: {}", x))); } @@ -106,7 +122,7 @@ impl TryFrom<&OutboundVmess> for AnyOutboundHandler { .map(|x| match x.as_str() { "ws" => Ok(vec!["http/1.1".to_owned()]), "http" => Ok(vec![]), - "h2" => Ok(vec!["h2".to_owned()]), + "h2" | "grpc" => Ok(vec!["h2".to_owned()]), _ => Err(Error::InvalidConfig(format!("unsupported network: {}", x))), }) .transpose()?, diff --git a/clash_lib/src/proxy/transport/grpc.rs b/clash_lib/src/proxy/transport/grpc.rs index d7c9eca69..e75fd1442 100644 --- a/clash_lib/src/proxy/transport/grpc.rs +++ b/clash_lib/src/proxy/transport/grpc.rs @@ -1,20 +1,23 @@ use crate::common::errors::map_io_error; use crate::proxy::AnyStream; -use bytes::{Buf, BufMut, Bytes, BytesMut}; +use bytes::Buf; +use bytes::{BufMut, Bytes, BytesMut}; use futures::ready; use h2::{RecvStream, SendStream}; use http::{Request, Uri, Version}; use prost::encoding::decode_varint; use prost::encoding::encode_varint; -use tracing::log; +use tokio::sync::{mpsc, Mutex}; +use tracing::warn; +use tracing::{debug, trace}; use std::fmt::Debug; -use std::future::Future; use std::io; use std::io::{Error, ErrorKind}; use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; use tokio::io::{AsyncRead, AsyncWrite}; @@ -28,12 +31,13 @@ impl GrpcStreamBuilder { pub fn new(host: String, path: http::uri::PathAndQuery) -> Self { Self { host, path } } + fn req(&self) -> io::Result> { let uri: Uri = { Uri::builder() .scheme("https") .authority(self.host.as_str()) - .path_and_query(self.path.as_str()) + .path_and_query(format!("/{}/Tun", self.path.as_str())) .build() .map_err(map_io_error)? }; @@ -42,36 +46,71 @@ impl GrpcStreamBuilder { .uri(uri) .version(Version::HTTP_2) .header("content-type", "application/grpc") - .header("user-agent", "grpc-go/1.46.0"); + .header("user-agent", "tonic/0.10"); Ok(request.body(()).unwrap()) } pub async fn proxy_stream(&self, stream: AnyStream) -> io::Result { - let (mut client, h2) = h2::client::handshake(stream).await.map_err(map_io_error)?; + let (client, h2) = h2::client::Builder::new() + .initial_connection_window_size(0x7FFFFFFF) + .initial_window_size(0x7FFFFFFF) + .initial_max_send_streams(1024) + .enable_push(false) + .handshake(stream) + .await + .map_err(map_io_error)?; + let mut client = client.ready().await.map_err(map_io_error)?; + let req = self.req()?; let (resp, send_stream) = client.send_request(req, false).map_err(map_io_error)?; tokio::spawn(async move { if let Err(e) = h2.await { - log::error!("http2 got err:{:?}", e); + //TODO: collect this somewhere? + warn!("http2 got err:{:?}", e); } }); - return Ok(Box::new(GrpcStream::new(resp, send_stream))); + + let (init_sender, init_ready) = mpsc::channel(1); + let recv_stream = Arc::new(Mutex::new(None)); + + { + let recv_stream = recv_stream.clone(); + tokio::spawn(async move { + trace!("initiating grpc recv stream"); + match resp.await { + Ok(resp) => { + debug!("grpc resp: {:?}", resp); + let stream = resp.into_body(); + + recv_stream.lock().await.replace(stream); + } + Err(e) => { + debug!("grpc resp err: {:?}", e); + } + } + let _ = init_sender.send(()); + }); + } + + return Ok(Box::new(GrpcStream::new( + init_ready, + recv_stream, + send_stream, + ))); } } pub struct GrpcStream { - resp_fut: h2::client::ResponseFuture, - recv: Option, + init_ready: mpsc::Receiver<()>, + recv: Arc>>, send: SendStream, buffer: BytesMut, - payload_len: u64, + payload_len: usize, } impl Debug for GrpcStream { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("GrpcStream") - .field("resp_fut", &self.resp_fut) - .field("recv", &self.recv) .field("send", &self.send) .field("buffer", &self.buffer) .field("payload_len", &self.payload_len) @@ -80,31 +119,33 @@ impl Debug for GrpcStream { } impl GrpcStream { - pub fn new(resp_fut: h2::client::ResponseFuture, send: SendStream) -> Self { + pub fn new( + init_ready: mpsc::Receiver<()>, + recv: Arc>>, + send: SendStream, + ) -> Self { Self { - resp_fut, - recv: None, + init_ready, + recv, send, buffer: BytesMut::with_capacity(1024 * 4), payload_len: 0, } } - fn reserve_send_capacity(&mut self, data: &[u8]) { - let mut buf = [0u8; 10]; - let mut buf = &mut buf[..]; - encode_varint(data.len() as u64, &mut buf); - self.send.reserve_capacity(6 + 10 - buf.len() + data.len()); - } - + // encode data to grpc + protobuf format fn encode_buf(&self, data: &[u8]) -> Bytes { - let mut buf = BytesMut::with_capacity(16 + data.len()); - let grpc_header = [0u8; 5]; + let mut protobuf_header = BytesMut::with_capacity(10 + 1); + protobuf_header.put_u8(0x0a); + encode_varint(data.len() as u64, &mut protobuf_header); + let mut grpc_header = [0u8; 5]; + let grpc_payload_len = (protobuf_header.len() + data.len()) as u32; + grpc_header[1..5].copy_from_slice(&grpc_payload_len.to_be_bytes()); + + let mut buf = + BytesMut::with_capacity(grpc_header.len() + protobuf_header.len() + data.len()); buf.put_slice(&grpc_header[..]); - buf.put_u8(0x0a); - encode_varint(data.len() as u64, &mut buf); - let payload_len = ((buf.len() - 5 + data.len()) as u32).to_be_bytes(); - buf[1..5].copy_from_slice(&payload_len[..4]); + buf.put_slice(&protobuf_header.freeze()[..]); buf.put_slice(data); buf.freeze() } @@ -115,60 +156,110 @@ impl AsyncRead for GrpcStream { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - dst: &mut tokio::io::ReadBuf<'_>, + buf: &mut tokio::io::ReadBuf<'_>, ) -> Poll> { - if self.recv.is_none() { - self.recv = Some( - ready!(Pin::new(&mut self.resp_fut).poll(cx)) - .map_err(map_io_error)? - .into_body(), - ); - log::debug!("receive grpc recv stream"); + ready!(self.init_ready.poll_recv(cx)); + + let recv = self.recv.clone(); + + let mut recv = recv.try_lock().unwrap(); + if recv.is_none() { + warn!("grpc initialization error"); + return Poll::Ready(Err(Error::new( + ErrorKind::ConnectionReset, + "initialization error", + ))); } - if !self.buffer.is_empty() { - let to_read = std::cmp::min(dst.remaining(), self.buffer.len()); + + if (self.payload_len > 0 && self.buffer.len() > 0) + || (self.payload_len == 0 && self.buffer.len() > 6) + { + if self.payload_len == 0 { + self.buffer.advance(6); + let payload_len = decode_varint(&mut self.buffer).map_err(map_io_error)?; + self.payload_len = payload_len as usize; + } + + trace!("grpc poll_read data left payload_len: {}", self.payload_len); + let to_read = std::cmp::min(buf.remaining(), self.payload_len as usize); + let to_read = std::cmp::min(to_read, self.buffer.len()); + + if to_read == 0 { + assert!(buf.remaining() > 0); + trace!("no data left in buffer"); + return Poll::Pending; + } + let data = self.buffer.split_to(to_read); - self.payload_len -= to_read as u64; - dst.put_slice(&data[..to_read]); + trace!( + "consuming {} data, payload left: {}, buffer left: {}", + to_read, + self.payload_len - to_read, + self.buffer.len() + ); + self.payload_len -= to_read; + buf.put_slice(&data[..]); return Poll::Ready(Ok(())); - }; - Poll::Ready( - match ready!(Pin::new(&mut self.recv).as_pin_mut().unwrap().poll_data(cx)) { - Some(Ok(mut data)) => { - let before_parse_data_len = data.len(); - while self.payload_len > 0 || data.len() > 6 { - if self.payload_len == 0 { - data.advance(6); - self.payload_len = decode_varint(&mut data).map_err(map_io_error)?; - } - let to_read = std::cmp::min(dst.remaining(), data.len()); - let to_read = std::cmp::min(self.payload_len as usize, to_read); - if to_read == 0 { - self.buffer.extend_from_slice(&data[..]); - data.clear(); - break; - } - dst.put_slice(&data[..to_read]); - self.payload_len -= to_read as u64; - data.advance(to_read); + } + + trace!( + "no more data left, polling recv stream, current capacity: {}", + recv.as_mut().unwrap().flow_control().available_capacity() + ); + + match ready!(Pin::new(&mut recv.as_mut().unwrap()).poll_data(cx)) { + Some(Ok(b)) => { + trace!("got data from recv stream: {}", b.len()); + + self.buffer.reserve(b.len()); + self.buffer.extend_from_slice(&b[..]); + + while self.payload_len > 0 || self.buffer.len() > 6 { + if self.payload_len == 0 { + self.buffer.advance(6); + let payload_len = decode_varint(&mut self.buffer).map_err(map_io_error)?; + self.payload_len = payload_len as usize; + } + let to_read = std::cmp::min(self.buffer.len(), self.payload_len as usize); + let to_read = std::cmp::min(buf.remaining(), to_read); + if to_read == 0 { + break; } - // increase recv window - self.recv - .as_mut() - .unwrap() - .flow_control() - .release_capacity(before_parse_data_len - data.len()) - .map_or_else( - |e| Err(Error::new(ErrorKind::ConnectionReset, e)), - |_| Ok(()), - ) + trace!( + "consuming {} data, payload left: {}, buffer left: {}", + to_read, + self.payload_len - to_read, + self.buffer.len() - to_read + ); + + buf.put_slice(self.buffer.split_to(to_read).freeze().as_ref()); + self.payload_len -= to_read; } - // no more data frames - // maybe trailer - // or cancelled - _ => Ok(()), - }, - ) + + trace!("releasing grpc flow control capacity: {}", b.len()); + recv.as_mut() + .unwrap() + .flow_control() + .release_capacity(b.len()) + .map_or_else( + |e| { + debug!("grpc flow control error: {}", e); + Poll::Ready(Err(Error::new(ErrorKind::ConnectionReset, e))) + }, + |_| Poll::Ready(Ok(())), + ) + } + _ => { + assert_eq!(self.payload_len, 0); + if recv.as_mut().unwrap().is_end_stream() { + trace!("no more data left, recv stream closed"); + Poll::Ready(Ok(())) + } else { + trace!("no more data left, recv stream pending"); + Poll::Pending + } + } + } } } @@ -179,19 +270,36 @@ impl AsyncWrite for GrpcStream { cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - self.reserve_send_capacity(buf); + let encoded_buf = self.encode_buf(buf); + trace!("requesting capacity: {} bytes", encoded_buf.len()); + self.send.reserve_capacity(encoded_buf.len()); + Poll::Ready(match ready!(self.send.poll_capacity(cx)) { - Some(Ok(to_write)) => { - let encoded_buf = self.encode_buf(buf); + Some(Ok(cap)) => { + trace!( + "grpc got capacity: {} bytes, payload size: {}", + cap, + encoded_buf.len() + ); self.send.send_data(encoded_buf, false).map_or_else( - |e| Err(Error::new(ErrorKind::BrokenPipe, e)), - |_| Ok(to_write), + |e| { + debug!("grpc write error: {}", e); + Err(Error::new(ErrorKind::BrokenPipe, e)) + }, + |_| { + debug!("grpc wrote {} bytes", buf.len()); + Ok(buf.len()) + }, ) } - // is_send_streaming returns false - // which indicates the state is - // neither open nor half_close_remote - _ => Err(Error::new(ErrorKind::BrokenPipe, "broken pipe")), + Some(Err(e)) => { + warn!("grpc poll_capacity error: {}", e); + Err(Error::new(ErrorKind::BrokenPipe, e)) + } + _ => { + debug!("grpc poll_capacity conn closed"); + Err(Error::new(ErrorKind::BrokenPipe, "broken pipe")) + } }) } @@ -202,14 +310,10 @@ impl AsyncWrite for GrpcStream { #[inline] fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.send.reserve_capacity(0); - Poll::Ready(ready!(self.send.poll_capacity(cx)).map_or( - Err(Error::new(ErrorKind::BrokenPipe, "broken pipe")), - |_| { - self.send - .send_data(Bytes::new(), true) - .map_or_else(|e| Err(Error::new(ErrorKind::BrokenPipe, e)), |_| Ok(())) - }, - )) + self.send.send_reset(h2::Reason::NO_ERROR); + self.send + .poll_reset(cx) + .map_err(map_io_error) + .map(|_| Ok(())) } } diff --git a/clash_lib/src/proxy/transport/tls.rs b/clash_lib/src/proxy/transport/tls.rs index f70826f90..7d6abfa19 100644 --- a/clash_lib/src/proxy/transport/tls.rs +++ b/clash_lib/src/proxy/transport/tls.rs @@ -16,7 +16,11 @@ pub struct TLSOptions { pub alpn: Option>, } -pub async fn wrap_stream(stream: AnyStream, opt: TLSOptions) -> io::Result { +pub async fn wrap_stream( + stream: AnyStream, + opt: TLSOptions, + expected_alpn: Option<&str>, +) -> io::Result { let mut tls_config = ClientConfig::builder() .with_safe_defaults() .with_root_certificates(GLOBAL_ROOT_STORE.clone()) @@ -34,12 +38,27 @@ pub async fn wrap_stream(stream: AnyStream, opt: TLSOptions) -> io::Result { + let ws_builder = transport::WebsocketStreamBuilder::new( + self.opts.server.clone(), + self.opts.port, + ws_opts.path.clone(), + ws_opts.headers.clone(), + None, + ws_opts.max_early_data, + ws_opts.early_data_header_name.clone(), + ); + + ws_builder.proxy_stream(s).await? + } + Transport::Grpc(grpc_opts) => { + let grpc_builder = transport::GrpcStreamBuilder::new( + self.opts.server.clone(), + grpc_opts + .service_name + .to_owned() + .try_into() + .expect("invalid gRPC service path"), + ); + grpc_builder.proxy_stream(s).await? + } + } + } else { + s + }; let mut buf = BytesMut::new(); let password = Sha224::digest(self.opts.password.as_bytes()); diff --git a/clash_lib/src/proxy/trojan/stream.rs b/clash_lib/src/proxy/trojan/stream.rs deleted file mode 100644 index 8b1378917..000000000 --- a/clash_lib/src/proxy/trojan/stream.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/clash_lib/src/proxy/vmess/mod.rs b/clash_lib/src/proxy/vmess/mod.rs index 4120f6249..23402feaa 100644 --- a/clash_lib/src/proxy/vmess/mod.rs +++ b/clash_lib/src/proxy/vmess/mod.rs @@ -26,17 +26,9 @@ use super::{ AnyOutboundHandler, AnyStream, CommonOption, OutboundHandler, OutboundType, }; -#[macro_export] -macro_rules! vmess_debug { - ($($arg:tt)*) => { - debug!(target: "vmess", $($arg)*) - }; -} - pub enum VmessTransport { Ws(WsOption), H2(Http2Option), - #[allow(dead_code)] Grpc(GrpcOption), #[allow(dead_code)] Http(HttpOption), @@ -85,7 +77,7 @@ impl Handler { ); if let Some(tls_opt) = &self.opts.tls { - stream = transport::tls::wrap_stream(stream, tls_opt.to_owned()).await?; + stream = transport::tls::wrap_stream(stream, tls_opt.to_owned(), None).await?; } ws_builder.proxy_stream(stream).await? @@ -98,7 +90,7 @@ impl Handler { .expect("H2 conn must have tls opt") .clone(); tls_opt.alpn = Some(vec!["h2".to_string()]); - stream = transport::tls::wrap_stream(stream, tls_opt.to_owned()).await?; + stream = transport::tls::wrap_stream(stream, tls_opt.to_owned(), None).await?; let h2_builder = Http2Config { hosts: vec![self.opts.server.clone()], @@ -111,7 +103,9 @@ impl Handler { } Some(VmessTransport::Grpc(ref opt)) => { let tls_opt = self.opts.tls.as_ref().expect("gRPC conn must have tls opt"); - stream = transport::tls::wrap_stream(stream, tls_opt.to_owned()).await?; + stream = + transport::tls::wrap_stream(stream, tls_opt.to_owned(), Some("h2")).await?; + let grpc_builder = transport::GrpcStreamBuilder::new( self.opts.server.clone(), opt.service_name @@ -126,7 +120,7 @@ impl Handler { } None => { if let Some(tls_opt) = self.opts.tls.as_ref() { - stream = transport::tls::wrap_stream(stream, tls_opt.to_owned()).await?; + stream = transport::tls::wrap_stream(stream, tls_opt.to_owned(), None).await?; } stream } diff --git a/clash_lib/src/proxy/vmess/vmess_impl/stream.rs b/clash_lib/src/proxy/vmess/vmess_impl/stream.rs index 3ddd4dfde..3a1bd785e 100644 --- a/clash_lib/src/proxy/vmess/vmess_impl/stream.rs +++ b/clash_lib/src/proxy/vmess/vmess_impl/stream.rs @@ -15,7 +15,6 @@ use crate::{ }, proxy::vmess::vmess_impl::MAX_CHUNK_SIZE, session::SocksAddr, - vmess_debug, }; use super::{ @@ -94,7 +93,7 @@ impl ReadExt for VmessStream { ) -> Poll> { self.read_buf.reserve(size); unsafe { self.read_buf.set_len(size) } - vmess_debug!( + debug!( "poll read exact: {}, read_pos: {}, buf: {}", size, self.read_pos, @@ -309,12 +308,12 @@ where mbuf.put_slice(data.as_slice()); let out = mbuf.freeze(); - vmess_debug!("send non aead handshake request for user {}", id.uuid); + debug!("send non aead handshake request for user {}", id.uuid); stream.write_all(&out).await?; } else { let out = header::seal_vmess_aead_header(id.cmd_key, buf.freeze().to_vec(), now) .map_err(map_io_error)?; - vmess_debug!("send aead handshake request for user {}", id.uuid); + debug!("send aead handshake request for user {}", id.uuid); stream.write_all(&out).await?; } @@ -334,12 +333,12 @@ where cx: &mut std::task::Context<'_>, buf: &mut tokio::io::ReadBuf<'_>, ) -> std::task::Poll> { - vmess_debug!("poll read with aead"); + debug!("poll read with aead"); loop { match self.read_state { ReadState::AeadWaitingHeaderSize => { - vmess_debug!("recv handshake response header"); + debug!("recv handshake response header"); let this = &mut *self; let resp_body_key = this.resp_body_key.clone(); let resp_body_iv = this.resp_body_iv.clone(); @@ -366,7 +365,7 @@ where this.read_state = ReadState::StreamWaitingLength; } else { - vmess_debug!("recv handshake response header length"); + debug!("recv handshake response header length"); ready!(this.poll_read_exact(cx, 18))?; let aead_response_header_length_encryption_key = &kdf::vmess_kdf_1_one_shot( @@ -402,7 +401,7 @@ where } ReadState::AeadWaitingHeader(header_size) => { - vmess_debug!("recv handshake header body: {}", header_size); + debug!("recv handshake header body: {}", header_size); let this = &mut *self; ready!(this.poll_read_exact(cx, header_size + 16))?; @@ -452,7 +451,7 @@ where } ReadState::StreamWaitingLength => { - vmess_debug!("recv stream length"); + debug!("checking recv stream data length"); let this = &mut *self; ready!(this.poll_read_exact(cx, 2))?; let len = u16::from_be_bytes(this.read_buf.split().as_ref().try_into().unwrap()) @@ -469,7 +468,7 @@ where } ReadState::StreamWaitingData(size) => { - vmess_debug!("recv stream data: {}", size); + debug!("got recv stream data length: {}", size); let this = &mut *self; ready!(this.poll_read_exact(cx, size))?; @@ -484,7 +483,7 @@ where } ReadState::StreamFlushingData(size) => { - vmess_debug!("flush stream data: {}", size); + debug!("chunking stream data: {}", size); let to_read = std::cmp::min(buf.remaining(), size); let payload = self.read_buf.split_to(to_read); buf.put_slice(&payload); diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 1af11437b..a9e77e3f2 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -24,13 +24,13 @@ services: source: ./v2ray/key.pem target: /etc/v2ray/v2ray.key - trojan: - image: trojangfw/trojan + nginx: + image: nginx network_mode: "host" volumes: - type: bind - source: ./trojan/config.json - target: /config/config.json + source: ./nginx/nginx.conf + target: /etc/nginx/nginx.conf - type: bind source: ./v2ray/cert.pem target: /etc/v2ray/v2ray.crt diff --git a/docker/nginx/nginx.conf b/docker/nginx/nginx.conf new file mode 100644 index 000000000..d794b730f --- /dev/null +++ b/docker/nginx/nginx.conf @@ -0,0 +1,27 @@ +events { + worker_connections 4096; ## Default: 1024 +} + +http { + error_log /tmp/error.log debug; + + server { + listen 19443 ssl; + server_name localhost; + http2 on; + + ssl_certificate /etc/v2ray/v2ray.crt; + ssl_certificate_key /etc/v2ray/v2ray.key; + ssl_protocols TLSv1.2 TLSv1.3; + ssl_ciphers ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:DHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES256-GCM-SHA384; + + + location /abc/Tun { + grpc_pass grpc://127.0.0.1:16825; + } + + location /def/Tun { + grpc_pass grpc://127.0.0.1:9444; + } + } +} \ No newline at end of file diff --git a/docker/trojan/config.json b/docker/trojan/config.json deleted file mode 100644 index 6ce146e8d..000000000 --- a/docker/trojan/config.json +++ /dev/null @@ -1,51 +0,0 @@ -{ - "run_type": "server", - "local_addr": "0.0.0.0", - "local_port": 9443, - "remote_addr": "127.0.0.1", - "remote_port": 80, - "password": [ - "password1", - "password2" - ], - "log_level": 1, - "ssl": { - "cert": "/etc/v2ray/v2ray.crt", - "key": "/etc/v2ray/v2ray.key", - "key_password": "", - "cipher": "ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:DHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES256-GCM-SHA384", - "cipher_tls13": "TLS_AES_128_GCM_SHA256:TLS_CHACHA20_POLY1305_SHA256:TLS_AES_256_GCM_SHA384", - "prefer_server_cipher": true, - "alpn": [ - "http/1.1" - ], - "alpn_port_override": { - "h2": 81 - }, - "reuse_session": true, - "session_ticket": false, - "session_timeout": 600, - "plain_http_response": "", - "curves": "", - "dhparam": "" - }, - "tcp": { - "prefer_ipv4": false, - "no_delay": true, - "keep_alive": true, - "reuse_port": false, - "fast_open": false, - "fast_open_qlen": 20 - }, - "mysql": { - "enabled": false, - "server_addr": "127.0.0.1", - "server_port": 3306, - "database": "trojan", - "username": "trojan", - "password": "", - "key": "", - "cert": "", - "ca": "" - } -} \ No newline at end of file diff --git a/docker/v2ray/config.json b/docker/v2ray/config.json index 1ab68ac95..b0927064f 100644 --- a/docker/v2ray/config.json +++ b/docker/v2ray/config.json @@ -39,6 +39,50 @@ } } }, + { + "port": 9443, + "protocol": "trojan", + "settings": { + "clients": [ + { + "password": "password1" + } + ] + }, + "streamSettings": { + "network": "tcp", + "security": "tls", + "tlsSettings": { + "alpn": [ + "http/1.1" + ], + "certificates": [ + { + "certificateFile": "/etc/v2ray/v2ray.crt", // 证书文件 + "keyFile": "/etc/v2ray/v2ray.key" // 密钥文件 + } + ] + } + } + }, + { + "port": 9444, + "protocol": "trojan", + "settings": { + "clients": [ + { + "password": "password1" // 填写你的 password + } + ] + }, + "streamSettings": { + "network": "grpc", + "grpcSettings": { + "serviceName": "def" // 填写你的 ServiceName + }, + "security": "none" + } + }, { "port": 16824, "protocol": "vmess", @@ -60,6 +104,25 @@ } } }, + { + "port": 16825, + "protocol": "vmess", + "settings": { + "clients": [ + { + "id": "b831381d-6324-4d53-ad4f-8cda48b30811", + "alterId": 64 + } + ] + }, + "streamSettings": { + "network": "grpc", + "grpcSettings": { + "serviceName": "abc" + }, + "security": "none" + } + }, { "port": 8444, "protocol": "vmess", diff --git a/scripts/gun.proto b/scripts/gun.proto new file mode 100644 index 000000000..1d127b4c1 --- /dev/null +++ b/scripts/gun.proto @@ -0,0 +1,11 @@ +syntax = "proto3"; +option go_package = "github.com/Qv2ray/gun/pkg/proto"; + +message Hunk { + bytes data = 1; +} + +service GunService { + rpc Tun (stream Hunk) returns (stream Hunk); + rpc TunDatagram (stream Hunk) returns (stream Hunk); +} \ No newline at end of file