From f56f7e5c59395b061598ebe10404667a37c5f917 Mon Sep 17 00:00:00 2001 From: dev0 Date: Sat, 2 Dec 2023 14:50:59 +1100 Subject: [PATCH 01/22] grpc --- clash/tests/data/config/rules.yaml | 16 ++++++++++++++- clash_lib/src/config/internal/proxy.rs | 1 + clash_lib/src/lib.rs | 2 +- clash_lib/src/proxy/converters/vmess.rs | 20 +++++++++++++++++-- clash_lib/src/proxy/vmess/mod.rs | 1 - docker/docker-compose.yml | 6 +++++- docker/nginx/Dockerfile | 2 ++ docker/nginx/nginx.conf | 26 +++++++++++++++++++++++++ docker/v2ray/config.json | 19 ++++++++++++++++++ 9 files changed, 87 insertions(+), 6 deletions(-) create mode 100644 docker/nginx/Dockerfile create mode 100644 docker/nginx/nginx.conf diff --git a/clash/tests/data/config/rules.yaml b/clash/tests/data/config/rules.yaml index c7d9eb5c7..e3d090014 100644 --- a/clash/tests/data/config/rules.yaml +++ b/clash/tests/data/config/rules.yaml @@ -166,6 +166,19 @@ proxies: network: h2 h2-opts: path: /ray + - name: grpc-vmess + type: vmess + server: 10.0.0.13 + port: 9443 + uuid: b831381d-6324-4d53-ad4f-8cda48b30811 + alterId: 0 + cipher: auto + udp: true + skip-cert-verify: true + tls: true + network: grpc + grpc-opts: + grpc-service-name: abc - name: vmess-altid type: vmess @@ -187,6 +200,7 @@ proxies: cipher: aes-256-gcm password: "password" udp: true + - name: "trojan" type: trojan server: 10.0.0.13 @@ -217,7 +231,7 @@ rule-providers: behavior: domain rules: - - DOMAIN,ipinfo.io,relay + - DOMAIN,ipinfo.io,grpc-vmess # - RULE-SET,file-provider,trojan - GEOIP,CN,relay - DOMAIN-SUFFIX,facebook.com,REJECT diff --git a/clash_lib/src/config/internal/proxy.rs b/clash_lib/src/config/internal/proxy.rs index 5d37dfacc..83fce4d16 100644 --- a/clash_lib/src/config/internal/proxy.rs +++ b/clash_lib/src/config/internal/proxy.rs @@ -170,6 +170,7 @@ pub struct OutboundVmess { pub network: Option, pub ws_opts: Option, pub h2_opts: Option, + pub grpc_opts: Option, } #[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] diff --git a/clash_lib/src/lib.rs b/clash_lib/src/lib.rs index fa4a13246..c29e5550a 100644 --- a/clash_lib/src/lib.rs +++ b/clash_lib/src/lib.rs @@ -272,7 +272,7 @@ async fn start_async(opts: Options) -> Result<(), Error> { } runners.push(Box::pin(async move { - info!("receive shutdown signal"); + info!("receiving shutdown signal"); shutdown_rx.recv().await; Ok(()) })); diff --git a/clash_lib/src/proxy/converters/vmess.rs b/clash_lib/src/proxy/converters/vmess.rs index 67bc4c9be..d4c31520a 100644 --- a/clash_lib/src/proxy/converters/vmess.rs +++ b/clash_lib/src/proxy/converters/vmess.rs @@ -3,7 +3,7 @@ use tracing::warn; use crate::{ config::internal::proxy::OutboundVmess, proxy::{ - options::{Http2Option, WsOption}, + options::{GrpcOption, Http2Option, WsOption}, transport::TLSOptions, vmess::{Handler, HandlerOptions, VmessTransport}, AnyOutboundHandler, CommonOption, @@ -75,6 +75,22 @@ impl TryFrom<&OutboundVmess> for AnyOutboundHandler { .ok_or(Error::InvalidConfig( "h2_opts is required for h2".to_owned(), )), + "grpc" => s + .grpc_opts + .as_ref() + .map(|x| { + VmessTransport::Grpc(GrpcOption { + service_name: x + .grpc_service_name + .as_ref() + .to_owned() + .unwrap_or(&"GunService".to_owned()) + .to_owned(), + }) + }) + .ok_or(Error::InvalidConfig( + "grpc_opts is required for grpc".to_owned(), + )), _ => { return Err(Error::InvalidConfig(format!("unsupported network: {}", x))); } @@ -106,7 +122,7 @@ impl TryFrom<&OutboundVmess> for AnyOutboundHandler { .map(|x| match x.as_str() { "ws" => Ok(vec!["http/1.1".to_owned()]), "http" => Ok(vec![]), - "h2" => Ok(vec!["h2".to_owned()]), + "h2" | "grpc" => Ok(vec!["h2".to_owned()]), _ => Err(Error::InvalidConfig(format!("unsupported network: {}", x))), }) .transpose()?, diff --git a/clash_lib/src/proxy/vmess/mod.rs b/clash_lib/src/proxy/vmess/mod.rs index 4120f6249..2b8067645 100644 --- a/clash_lib/src/proxy/vmess/mod.rs +++ b/clash_lib/src/proxy/vmess/mod.rs @@ -36,7 +36,6 @@ macro_rules! vmess_debug { pub enum VmessTransport { Ws(WsOption), H2(Http2Option), - #[allow(dead_code)] Grpc(GrpcOption), #[allow(dead_code)] Http(HttpOption), diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 1af11437b..6cfb3b724 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -36,4 +36,8 @@ services: target: /etc/v2ray/v2ray.crt - type: bind source: ./v2ray/key.pem - target: /etc/v2ray/v2ray.key \ No newline at end of file + target: /etc/v2ray/v2ray.key + + nginx: + build: ./nginx + network_mode: "host" \ No newline at end of file diff --git a/docker/nginx/Dockerfile b/docker/nginx/Dockerfile new file mode 100644 index 000000000..0d947da02 --- /dev/null +++ b/docker/nginx/Dockerfile @@ -0,0 +1,2 @@ +FROM nginx +COPY nginx.conf /etc/nginx/nginx.conf \ No newline at end of file diff --git a/docker/nginx/nginx.conf b/docker/nginx/nginx.conf new file mode 100644 index 000000000..13e0953e0 --- /dev/null +++ b/docker/nginx/nginx.conf @@ -0,0 +1,26 @@ +server { + listen 9443 http2; + server_name localhost; + + location /abc/Tun { + # 网上参考级配置 + client_max_body_size 0; + client_body_timeout 60m; + send_timeout 60m; + lingering_close always; + + keepalive_time 2h; + keepalive_timeout 30s; + keepalive_requests 256; + grpc_socket_keepalive on; + + # 实践级配置 + grpc_read_timeout 3m; + grpc_send_timeout 2m; + grpc_set_header Host $host; + grpc_set_header X-Real-IP $remote_addr; + grpc_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + + grpc_pass grpc://127.0.0.1:16825; + } +} \ No newline at end of file diff --git a/docker/v2ray/config.json b/docker/v2ray/config.json index 1ab68ac95..c1ca78078 100644 --- a/docker/v2ray/config.json +++ b/docker/v2ray/config.json @@ -60,6 +60,25 @@ } } }, + { + "port": 16825, + "protocol": "vmess", + "settings": { + "clients": [ + { + "id": "b831381d-6324-4d53-ad4f-8cda48b30811", + "alterId": 64 + } + ] + }, + "streamSettings": { + "network": "grpc", + "grpcSettings": { + "serviceName": "abc" + }, + "security": "none" + } + }, { "port": 8444, "protocol": "vmess", From 19a290d3b370e1af21988a985022584fdf4878d8 Mon Sep 17 00:00:00 2001 From: dev0 Date: Sat, 2 Dec 2023 15:12:31 +1100 Subject: [PATCH 02/22] up --- clash/tests/data/config/rules.yaml | 2 +- docker/docker-compose.yml | 9 ++++++++- docker/nginx/nginx.conf | 8 +++++++- 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/clash/tests/data/config/rules.yaml b/clash/tests/data/config/rules.yaml index e3d090014..b4b6868b7 100644 --- a/clash/tests/data/config/rules.yaml +++ b/clash/tests/data/config/rules.yaml @@ -178,7 +178,7 @@ proxies: tls: true network: grpc grpc-opts: - grpc-service-name: abc + grpc-service-name: abc/Tun - name: vmess-altid type: vmess diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 6cfb3b724..851b78959 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -40,4 +40,11 @@ services: nginx: build: ./nginx - network_mode: "host" \ No newline at end of file + network_mode: "host" + volumes: + - type: bind + source: ./v2ray/cert.pem + target: /etc/v2ray/v2ray.crt + - type: bind + source: ./v2ray/key.pem + target: /etc/v2ray/v2ray.key \ No newline at end of file diff --git a/docker/nginx/nginx.conf b/docker/nginx/nginx.conf index 13e0953e0..432737cde 100644 --- a/docker/nginx/nginx.conf +++ b/docker/nginx/nginx.conf @@ -1,7 +1,13 @@ server { - listen 9443 http2; + listen 9443 http2 ssl; server_name localhost; + ssl_certificate /etc/v2ray/v2ray.crt; + ssl_certificate_key /etc/v2ray/v2ray.key; + ssl_protocols TLSv1.2 TLSv1.3; + ssl_ciphers ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:DHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES256-GCM-SHA384; + + location /abc/Tun { # 网上参考级配置 client_max_body_size 0; From 9d446897fe8d776d0c0893142bf90e5ebb3db7e8 Mon Sep 17 00:00:00 2001 From: dev0 Date: Sat, 2 Dec 2023 15:16:25 +1100 Subject: [PATCH 03/22] up --- docker/docker-compose.yml | 5 +++- docker/nginx/Dockerfile | 2 -- docker/nginx/nginx.conf | 52 ++++++++++++++++++++------------------- 3 files changed, 31 insertions(+), 28 deletions(-) delete mode 100644 docker/nginx/Dockerfile diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 851b78959..2df22d138 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -39,9 +39,12 @@ services: target: /etc/v2ray/v2ray.key nginx: - build: ./nginx + image: nginx network_mode: "host" volumes: + - type: bind + source: ./nginx/nginx.conf + target: /etc/nginx/nginx.conf - type: bind source: ./v2ray/cert.pem target: /etc/v2ray/v2ray.crt diff --git a/docker/nginx/Dockerfile b/docker/nginx/Dockerfile deleted file mode 100644 index 0d947da02..000000000 --- a/docker/nginx/Dockerfile +++ /dev/null @@ -1,2 +0,0 @@ -FROM nginx -COPY nginx.conf /etc/nginx/nginx.conf \ No newline at end of file diff --git a/docker/nginx/nginx.conf b/docker/nginx/nginx.conf index 432737cde..95bd92cad 100644 --- a/docker/nginx/nginx.conf +++ b/docker/nginx/nginx.conf @@ -1,32 +1,34 @@ -server { - listen 9443 http2 ssl; - server_name localhost; +http { + server { + listen 9443 http2 ssl; + server_name localhost; - ssl_certificate /etc/v2ray/v2ray.crt; - ssl_certificate_key /etc/v2ray/v2ray.key; - ssl_protocols TLSv1.2 TLSv1.3; - ssl_ciphers ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:DHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES256-GCM-SHA384; - + ssl_certificate /etc/v2ray/v2ray.crt; + ssl_certificate_key /etc/v2ray/v2ray.key; + ssl_protocols TLSv1.2 TLSv1.3; + ssl_ciphers ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:DHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES256-GCM-SHA384; + - location /abc/Tun { - # 网上参考级配置 - client_max_body_size 0; - client_body_timeout 60m; - send_timeout 60m; - lingering_close always; + location /abc/Tun { + # 网上参考级配置 + client_max_body_size 0; + client_body_timeout 60m; + send_timeout 60m; + lingering_close always; - keepalive_time 2h; - keepalive_timeout 30s; - keepalive_requests 256; - grpc_socket_keepalive on; + keepalive_time 2h; + keepalive_timeout 30s; + keepalive_requests 256; + grpc_socket_keepalive on; - # 实践级配置 - grpc_read_timeout 3m; - grpc_send_timeout 2m; - grpc_set_header Host $host; - grpc_set_header X-Real-IP $remote_addr; - grpc_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + # 实践级配置 + grpc_read_timeout 3m; + grpc_send_timeout 2m; + grpc_set_header Host $host; + grpc_set_header X-Real-IP $remote_addr; + grpc_set_header X-Forwarded-For $proxy_add_x_forwarded_for; - grpc_pass grpc://127.0.0.1:16825; + grpc_pass grpc://127.0.0.1:16825; + } } } \ No newline at end of file From 0c46176a928d6dc006a108eb4ff6559b3064eee7 Mon Sep 17 00:00:00 2001 From: dev0 Date: Sat, 2 Dec 2023 15:19:24 +1100 Subject: [PATCH 04/22] up --- docker/nginx/nginx.conf | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/docker/nginx/nginx.conf b/docker/nginx/nginx.conf index 95bd92cad..373bdff7f 100644 --- a/docker/nginx/nginx.conf +++ b/docker/nginx/nginx.conf @@ -1,4 +1,8 @@ http { + events { + worker_connections 4096; ## Default: 1024 + } + server { listen 9443 http2 ssl; server_name localhost; @@ -9,7 +13,7 @@ http { ssl_ciphers ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:DHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES256-GCM-SHA384; - location /abc/Tun { + location /abc { # 网上参考级配置 client_max_body_size 0; client_body_timeout 60m; From 9512c4612eebbd47edafe582379340054af0e9ac Mon Sep 17 00:00:00 2001 From: dev0 Date: Sat, 2 Dec 2023 15:20:37 +1100 Subject: [PATCH 05/22] up --- docker/nginx/nginx.conf | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docker/nginx/nginx.conf b/docker/nginx/nginx.conf index 373bdff7f..d40b87805 100644 --- a/docker/nginx/nginx.conf +++ b/docker/nginx/nginx.conf @@ -1,8 +1,8 @@ -http { - events { - worker_connections 4096; ## Default: 1024 - } +events { + worker_connections 4096; ## Default: 1024 +} +http { server { listen 9443 http2 ssl; server_name localhost; From e84fd5d802a1e8b2c31309e4dc493cfbe206233b Mon Sep 17 00:00:00 2001 From: dev0 Date: Sat, 2 Dec 2023 15:22:03 +1100 Subject: [PATCH 06/22] up --- clash/tests/data/config/rules.yaml | 2 +- docker/nginx/nginx.conf | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/clash/tests/data/config/rules.yaml b/clash/tests/data/config/rules.yaml index b4b6868b7..fc17b5136 100644 --- a/clash/tests/data/config/rules.yaml +++ b/clash/tests/data/config/rules.yaml @@ -169,7 +169,7 @@ proxies: - name: grpc-vmess type: vmess server: 10.0.0.13 - port: 9443 + port: 19443 uuid: b831381d-6324-4d53-ad4f-8cda48b30811 alterId: 0 cipher: auto diff --git a/docker/nginx/nginx.conf b/docker/nginx/nginx.conf index d40b87805..27f72d145 100644 --- a/docker/nginx/nginx.conf +++ b/docker/nginx/nginx.conf @@ -4,7 +4,7 @@ events { http { server { - listen 9443 http2 ssl; + listen 19443 http2 ssl; server_name localhost; ssl_certificate /etc/v2ray/v2ray.crt; From ab332f03eb8a500a3c3d896dc6a828a531055906 Mon Sep 17 00:00:00 2001 From: dev0 Date: Sat, 2 Dec 2023 15:55:44 +1100 Subject: [PATCH 07/22] WIP --- clash/tests/data/config/rules.yaml | 2 +- clash_lib/src/proxy/transport/grpc.rs | 89 ++++++++++++--------------- 2 files changed, 39 insertions(+), 52 deletions(-) diff --git a/clash/tests/data/config/rules.yaml b/clash/tests/data/config/rules.yaml index fc17b5136..3f89a9ec5 100644 --- a/clash/tests/data/config/rules.yaml +++ b/clash/tests/data/config/rules.yaml @@ -178,7 +178,7 @@ proxies: tls: true network: grpc grpc-opts: - grpc-service-name: abc/Tun + grpc-service-name: abc - name: vmess-altid type: vmess diff --git a/clash_lib/src/proxy/transport/grpc.rs b/clash_lib/src/proxy/transport/grpc.rs index d7c9eca69..1de23881b 100644 --- a/clash_lib/src/proxy/transport/grpc.rs +++ b/clash_lib/src/proxy/transport/grpc.rs @@ -11,7 +11,6 @@ use prost::encoding::encode_varint; use tracing::log; use std::fmt::Debug; -use std::future::Future; use std::io; use std::io::{Error, ErrorKind}; use std::pin::Pin; @@ -28,6 +27,7 @@ impl GrpcStreamBuilder { pub fn new(host: String, path: http::uri::PathAndQuery) -> Self { Self { host, path } } + fn req(&self) -> io::Result> { let uri: Uri = { Uri::builder() @@ -55,13 +55,13 @@ impl GrpcStreamBuilder { log::error!("http2 got err:{:?}", e); } }); - return Ok(Box::new(GrpcStream::new(resp, send_stream))); + let recv_stream = resp.await.map_err(map_io_error)?.into_body(); + return Ok(Box::new(GrpcStream::new(recv_stream, send_stream))); } } pub struct GrpcStream { - resp_fut: h2::client::ResponseFuture, - recv: Option, + recv: RecvStream, send: SendStream, buffer: BytesMut, payload_len: u64, @@ -70,7 +70,6 @@ pub struct GrpcStream { impl Debug for GrpcStream { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("GrpcStream") - .field("resp_fut", &self.resp_fut) .field("recv", &self.recv) .field("send", &self.send) .field("buffer", &self.buffer) @@ -80,10 +79,9 @@ impl Debug for GrpcStream { } impl GrpcStream { - pub fn new(resp_fut: h2::client::ResponseFuture, send: SendStream) -> Self { + pub fn new(recv: RecvStream, send: SendStream) -> Self { Self { - resp_fut, - recv: None, + recv, send, buffer: BytesMut::with_capacity(1024 * 4), payload_len: 0, @@ -117,14 +115,6 @@ impl AsyncRead for GrpcStream { cx: &mut Context<'_>, dst: &mut tokio::io::ReadBuf<'_>, ) -> Poll> { - if self.recv.is_none() { - self.recv = Some( - ready!(Pin::new(&mut self.resp_fut).poll(cx)) - .map_err(map_io_error)? - .into_body(), - ); - log::debug!("receive grpc recv stream"); - } if !self.buffer.is_empty() { let to_read = std::cmp::min(dst.remaining(), self.buffer.len()); let data = self.buffer.split_to(to_read); @@ -132,43 +122,40 @@ impl AsyncRead for GrpcStream { dst.put_slice(&data[..to_read]); return Poll::Ready(Ok(())); }; - Poll::Ready( - match ready!(Pin::new(&mut self.recv).as_pin_mut().unwrap().poll_data(cx)) { - Some(Ok(mut data)) => { - let before_parse_data_len = data.len(); - while self.payload_len > 0 || data.len() > 6 { - if self.payload_len == 0 { - data.advance(6); - self.payload_len = decode_varint(&mut data).map_err(map_io_error)?; - } - let to_read = std::cmp::min(dst.remaining(), data.len()); - let to_read = std::cmp::min(self.payload_len as usize, to_read); - if to_read == 0 { - self.buffer.extend_from_slice(&data[..]); - data.clear(); - break; - } - dst.put_slice(&data[..to_read]); - self.payload_len -= to_read as u64; - data.advance(to_read); + + Poll::Ready(match ready!(Pin::new(&mut self.recv).poll_data(cx)) { + Some(Ok(mut data)) => { + let before_parse_data_len = data.len(); + while self.payload_len > 0 || data.len() > 6 { + if self.payload_len == 0 { + data.advance(6); + self.payload_len = decode_varint(&mut data).map_err(map_io_error)?; + } + let to_read = std::cmp::min(dst.remaining(), data.len()); + let to_read = std::cmp::min(self.payload_len as usize, to_read); + if to_read == 0 { + self.buffer.extend_from_slice(&data[..]); + data.clear(); + break; } - // increase recv window - self.recv - .as_mut() - .unwrap() - .flow_control() - .release_capacity(before_parse_data_len - data.len()) - .map_or_else( - |e| Err(Error::new(ErrorKind::ConnectionReset, e)), - |_| Ok(()), - ) + dst.put_slice(&data[..to_read]); + self.payload_len -= to_read as u64; + data.advance(to_read); } - // no more data frames - // maybe trailer - // or cancelled - _ => Ok(()), - }, - ) + // increase recv window + self.recv + .flow_control() + .release_capacity(before_parse_data_len - data.len()) + .map_or_else( + |e| Err(Error::new(ErrorKind::ConnectionReset, e)), + |_| Ok(()), + ) + } + // no more data frames + // maybe trailer + // or cancelled + _ => Ok(()), + }) } } From 95fda2d3b9ac2a8902334d1f1665bc08c84e8bc8 Mon Sep 17 00:00:00 2001 From: dev0 Date: Sat, 2 Dec 2023 16:30:50 +1100 Subject: [PATCH 08/22] up --- clash_lib/src/proxy/transport/grpc.rs | 31 ++++++++++++++++++++------- docker/nginx/nginx.conf | 2 ++ 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/clash_lib/src/proxy/transport/grpc.rs b/clash_lib/src/proxy/transport/grpc.rs index 1de23881b..1a1644359 100644 --- a/clash_lib/src/proxy/transport/grpc.rs +++ b/clash_lib/src/proxy/transport/grpc.rs @@ -8,6 +8,7 @@ use h2::{RecvStream, SendStream}; use http::{Request, Uri, Version}; use prost::encoding::decode_varint; use prost::encoding::encode_varint; +use tracing::debug; use tracing::log; use std::fmt::Debug; @@ -113,13 +114,13 @@ impl AsyncRead for GrpcStream { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - dst: &mut tokio::io::ReadBuf<'_>, + buf: &mut tokio::io::ReadBuf<'_>, ) -> Poll> { if !self.buffer.is_empty() { - let to_read = std::cmp::min(dst.remaining(), self.buffer.len()); + let to_read = std::cmp::min(buf.remaining(), self.buffer.len()); let data = self.buffer.split_to(to_read); self.payload_len -= to_read as u64; - dst.put_slice(&data[..to_read]); + buf.put_slice(&data[..to_read]); return Poll::Ready(Ok(())); }; @@ -131,14 +132,14 @@ impl AsyncRead for GrpcStream { data.advance(6); self.payload_len = decode_varint(&mut data).map_err(map_io_error)?; } - let to_read = std::cmp::min(dst.remaining(), data.len()); + let to_read = std::cmp::min(buf.remaining(), data.len()); let to_read = std::cmp::min(self.payload_len as usize, to_read); if to_read == 0 { self.buffer.extend_from_slice(&data[..]); data.clear(); break; } - dst.put_slice(&data[..to_read]); + buf.put_slice(&data[..to_read]); self.payload_len -= to_read as u64; data.advance(to_read); } @@ -147,7 +148,10 @@ impl AsyncRead for GrpcStream { .flow_control() .release_capacity(before_parse_data_len - data.len()) .map_or_else( - |e| Err(Error::new(ErrorKind::ConnectionReset, e)), + |e| { + debug!("grpc flow control error: {}", e); + Err(Error::new(ErrorKind::ConnectionReset, e)) + }, |_| Ok(()), ) } @@ -167,18 +171,29 @@ impl AsyncWrite for GrpcStream { buf: &[u8], ) -> Poll> { self.reserve_send_capacity(buf); + Poll::Ready(match ready!(self.send.poll_capacity(cx)) { Some(Ok(to_write)) => { let encoded_buf = self.encode_buf(buf); self.send.send_data(encoded_buf, false).map_or_else( - |e| Err(Error::new(ErrorKind::BrokenPipe, e)), + |e| { + debug!("grpc write error: {}", e); + Err(Error::new(ErrorKind::BrokenPipe, e)) + }, |_| Ok(to_write), ) } + Some(Err(e)) => { + debug!("grpc poll_capacity error: {}", e); + Err(Error::new(ErrorKind::BrokenPipe, e)) + } // is_send_streaming returns false // which indicates the state is // neither open nor half_close_remote - _ => Err(Error::new(ErrorKind::BrokenPipe, "broken pipe")), + _ => { + debug!("grpc poll_capacity conn closed"); + Err(Error::new(ErrorKind::BrokenPipe, "broken pipe")) + } }) } diff --git a/docker/nginx/nginx.conf b/docker/nginx/nginx.conf index 27f72d145..aadf3fbfd 100644 --- a/docker/nginx/nginx.conf +++ b/docker/nginx/nginx.conf @@ -3,6 +3,8 @@ events { } http { + error_log /tmp/error.log debug; + server { listen 19443 http2 ssl; server_name localhost; From 6ea6f8b02348a83d6dfd0c2b9ed1978bef1f6408 Mon Sep 17 00:00:00 2001 From: dev0 Date: Sat, 2 Dec 2023 16:54:05 +1100 Subject: [PATCH 09/22] WIP --- clash_lib/src/proxy/transport/grpc.rs | 103 +++++++++++++++----------- 1 file changed, 58 insertions(+), 45 deletions(-) diff --git a/clash_lib/src/proxy/transport/grpc.rs b/clash_lib/src/proxy/transport/grpc.rs index 1a1644359..09e0fdfdc 100644 --- a/clash_lib/src/proxy/transport/grpc.rs +++ b/clash_lib/src/proxy/transport/grpc.rs @@ -1,9 +1,11 @@ use crate::common::errors::map_io_error; use crate::proxy::AnyStream; -use bytes::{Buf, BufMut, Bytes, BytesMut}; +use bytes::Buf; +use bytes::{BufMut, Bytes, BytesMut}; use futures::ready; +use futures::Future; use h2::{RecvStream, SendStream}; use http::{Request, Uri, Version}; use prost::encoding::decode_varint; @@ -34,7 +36,7 @@ impl GrpcStreamBuilder { Uri::builder() .scheme("https") .authority(self.host.as_str()) - .path_and_query(self.path.as_str()) + .path_and_query(format!("/{}/Tun", self.path.as_str())) .build() .map_err(map_io_error)? }; @@ -43,7 +45,7 @@ impl GrpcStreamBuilder { .uri(uri) .version(Version::HTTP_2) .header("content-type", "application/grpc") - .header("user-agent", "grpc-go/1.46.0"); + .header("user-agent", "tonic/0.10"); Ok(request.body(()).unwrap()) } @@ -56,13 +58,13 @@ impl GrpcStreamBuilder { log::error!("http2 got err:{:?}", e); } }); - let recv_stream = resp.await.map_err(map_io_error)?.into_body(); - return Ok(Box::new(GrpcStream::new(recv_stream, send_stream))); + return Ok(Box::new(GrpcStream::new(resp, send_stream))); } } pub struct GrpcStream { - recv: RecvStream, + resp_fut: h2::client::ResponseFuture, + recv: Option, send: SendStream, buffer: BytesMut, payload_len: u64, @@ -80,9 +82,10 @@ impl Debug for GrpcStream { } impl GrpcStream { - pub fn new(recv: RecvStream, send: SendStream) -> Self { + pub fn new(resp_fut: h2::client::ResponseFuture, send: SendStream) -> Self { Self { - recv, + resp_fut, + recv: None, send, buffer: BytesMut::with_capacity(1024 * 4), payload_len: 0, @@ -116,6 +119,15 @@ impl AsyncRead for GrpcStream { cx: &mut Context<'_>, buf: &mut tokio::io::ReadBuf<'_>, ) -> Poll> { + if self.recv.is_none() { + self.recv = Some( + ready!(Pin::new(&mut self.resp_fut).poll(cx)) + .map_err(map_io_error)? + .into_body(), + ); + log::debug!("receive grpc recv stream"); + } + if !self.buffer.is_empty() { let to_read = std::cmp::min(buf.remaining(), self.buffer.len()); let data = self.buffer.split_to(to_read); @@ -124,42 +136,46 @@ impl AsyncRead for GrpcStream { return Poll::Ready(Ok(())); }; - Poll::Ready(match ready!(Pin::new(&mut self.recv).poll_data(cx)) { - Some(Ok(mut data)) => { - let before_parse_data_len = data.len(); - while self.payload_len > 0 || data.len() > 6 { - if self.payload_len == 0 { - data.advance(6); - self.payload_len = decode_varint(&mut data).map_err(map_io_error)?; - } - let to_read = std::cmp::min(buf.remaining(), data.len()); - let to_read = std::cmp::min(self.payload_len as usize, to_read); - if to_read == 0 { - self.buffer.extend_from_slice(&data[..]); - data.clear(); - break; + Poll::Ready( + match ready!(Pin::new(&mut self.recv).as_pin_mut().unwrap().poll_data(cx)) { + Some(Ok(mut data)) => { + let before_parse_data_len = data.len(); + while self.payload_len > 0 || data.len() > 6 { + if self.payload_len == 0 { + data.advance(6); + self.payload_len = decode_varint(&mut data).map_err(map_io_error)?; + } + let to_read = std::cmp::min(buf.remaining(), data.len()); + let to_read = std::cmp::min(self.payload_len as usize, to_read); + if to_read == 0 { + self.buffer.extend_from_slice(&data[..]); + data.clear(); + break; + } + buf.put_slice(&data[..to_read]); + self.payload_len -= to_read as u64; + data.advance(to_read); } - buf.put_slice(&data[..to_read]); - self.payload_len -= to_read as u64; - data.advance(to_read); + // increase recv window + self.recv + .as_mut() + .unwrap() + .flow_control() + .release_capacity(before_parse_data_len - data.len()) + .map_or_else( + |e| { + debug!("grpc flow control error: {}", e); + Err(Error::new(ErrorKind::ConnectionReset, e)) + }, + |_| Ok(()), + ) } - // increase recv window - self.recv - .flow_control() - .release_capacity(before_parse_data_len - data.len()) - .map_or_else( - |e| { - debug!("grpc flow control error: {}", e); - Err(Error::new(ErrorKind::ConnectionReset, e)) - }, - |_| Ok(()), - ) - } - // no more data frames - // maybe trailer - // or cancelled - _ => Ok(()), - }) + // no more data frames + // maybe trailer + // or cancelled + _ => Ok(()), + }, + ) } } @@ -187,9 +203,6 @@ impl AsyncWrite for GrpcStream { debug!("grpc poll_capacity error: {}", e); Err(Error::new(ErrorKind::BrokenPipe, e)) } - // is_send_streaming returns false - // which indicates the state is - // neither open nor half_close_remote _ => { debug!("grpc poll_capacity conn closed"); Err(Error::new(ErrorKind::BrokenPipe, "broken pipe")) From 19d281046ab4b0aa709801d8a0e50cdcb6afb6a8 Mon Sep 17 00:00:00 2001 From: dev0 Date: Sun, 3 Dec 2023 03:59:14 +1100 Subject: [PATCH 10/22] WIP --- clash/tests/data/config/rules.yaml | 1 - clash_lib/src/lib.rs | 17 ++++++++--- clash_lib/src/proxy/transport/grpc.rs | 41 +++++++++++++-------------- 3 files changed, 32 insertions(+), 27 deletions(-) diff --git a/clash/tests/data/config/rules.yaml b/clash/tests/data/config/rules.yaml index 3f89a9ec5..f31ef8f72 100644 --- a/clash/tests/data/config/rules.yaml +++ b/clash/tests/data/config/rules.yaml @@ -43,7 +43,6 @@ dns: - 114.114.114.114 # default value - 1.1.1.1 # default value - tls://1.1.1.1:853 # DNS over TLS - - https://1.1.1.1/dns-query # DNS over HTTPS # - dhcp://en0 # dns from dhcp allow-lan: true diff --git a/clash_lib/src/lib.rs b/clash_lib/src/lib.rs index c29e5550a..a6f412ca9 100644 --- a/clash_lib/src/lib.rs +++ b/clash_lib/src/lib.rs @@ -17,16 +17,17 @@ use common::http::new_http_client; use common::mmdb; use config::def::LogLevel; use proxy::tun::get_tun_runner; + use state::InitCell; use std::io; use std::path::PathBuf; -use tokio::task::JoinHandle; -use tracing::error; -use tracing::info; - use std::sync::Arc; use thiserror::Error; use tokio::sync::{broadcast, mpsc, Mutex}; +use tokio::task::JoinHandle; +use tracing::debug; +use tracing::error; +use tracing::info; mod app; mod common; @@ -161,9 +162,12 @@ async fn start_async(opts: Options) -> Result<(), Error> { let mut tasks = Vec::::new(); let mut runners = Vec::new(); + debug!("initializing dns resolver"); let system_resolver = Arc::new(SystemResolver::new().map_err(|x| Error::DNSError(x.to_string()))?); let client = new_http_client(system_resolver).map_err(|x| Error::DNSError(x.to_string()))?; + + debug!("initializing mmdb"); let mmdb = Arc::new( mmdb::MMDB::new( cwd.join(&config.general.mmdb), @@ -173,6 +177,7 @@ async fn start_async(opts: Options) -> Result<(), Error> { .await?, ); + debug!("initializing cache store"); let cache_store = profile::ThreadSafeCacheFile::new( cwd.join("cache.db").as_path().to_str().unwrap(), config.profile.store_selected, @@ -180,6 +185,7 @@ async fn start_async(opts: Options) -> Result<(), Error> { let dns_resolver = dns::Resolver::new(&config.dns, cache_store.clone(), mmdb.clone()).await; + debug!("initializing outbound manager"); let outbound_manager = Arc::new( OutboundManager::new( config @@ -207,6 +213,7 @@ async fn start_async(opts: Options) -> Result<(), Error> { .await?, ); + debug!("initializing router"); let router = Arc::new( Router::new( config.rules, @@ -230,6 +237,7 @@ async fn start_async(opts: Options) -> Result<(), Error> { let authenticator = Arc::new(auth::PlainAuthenticator::new(config.users)); + debug!("initializing inbound manager"); let inbound_manager = Arc::new(Mutex::new(InboundManager::new( config.general.inbound, dispatcher.clone(), @@ -244,6 +252,7 @@ async fn start_async(opts: Options) -> Result<(), Error> { runners.push(tun_runner); } + debug!("initializing dns listener"); let dns_listener_handle = dns::get_dns_listener(config.dns, dns_resolver.clone()) .await .map(|l| tokio::spawn(l)); diff --git a/clash_lib/src/proxy/transport/grpc.rs b/clash_lib/src/proxy/transport/grpc.rs index 09e0fdfdc..6442320a7 100644 --- a/clash_lib/src/proxy/transport/grpc.rs +++ b/clash_lib/src/proxy/transport/grpc.rs @@ -50,7 +50,9 @@ impl GrpcStreamBuilder { } pub async fn proxy_stream(&self, stream: AnyStream) -> io::Result { - let (mut client, h2) = h2::client::handshake(stream).await.map_err(map_io_error)?; + let (client, h2) = h2::client::handshake(stream).await.map_err(map_io_error)?; + let mut client = client.ready().await.map_err(map_io_error)?; + let req = self.req()?; let (resp, send_stream) = client.send_request(req, false).map_err(map_io_error)?; tokio::spawn(async move { @@ -92,13 +94,6 @@ impl GrpcStream { } } - fn reserve_send_capacity(&mut self, data: &[u8]) { - let mut buf = [0u8; 10]; - let mut buf = &mut buf[..]; - encode_varint(data.len() as u64, &mut buf); - self.send.reserve_capacity(6 + 10 - buf.len() + data.len()); - } - fn encode_buf(&self, data: &[u8]) -> Bytes { let mut buf = BytesMut::with_capacity(16 + data.len()); let grpc_header = [0u8; 5]; @@ -128,8 +123,8 @@ impl AsyncRead for GrpcStream { log::debug!("receive grpc recv stream"); } - if !self.buffer.is_empty() { - let to_read = std::cmp::min(buf.remaining(), self.buffer.len()); + if self.payload_len > 0 { + let to_read = std::cmp::min(buf.remaining(), self.payload_len as usize); let data = self.buffer.split_to(to_read); self.payload_len -= to_read as u64; buf.put_slice(&data[..to_read]); @@ -138,14 +133,18 @@ impl AsyncRead for GrpcStream { Poll::Ready( match ready!(Pin::new(&mut self.recv).as_pin_mut().unwrap().poll_data(cx)) { - Some(Ok(mut data)) => { - let before_parse_data_len = data.len(); + Some(Ok(b)) => { + let mut data = BytesMut::with_capacity(self.buffer.len() + b.len()); + data.extend_from_slice(&self.buffer[..]); + data.extend_from_slice(&b[..]); + self.buffer.clear(); + while self.payload_len > 0 || data.len() > 6 { if self.payload_len == 0 { data.advance(6); self.payload_len = decode_varint(&mut data).map_err(map_io_error)?; } - let to_read = std::cmp::min(buf.remaining(), data.len()); + let to_read = std::cmp::min(buf.remaining(), b.len()); let to_read = std::cmp::min(self.payload_len as usize, to_read); if to_read == 0 { self.buffer.extend_from_slice(&data[..]); @@ -156,12 +155,12 @@ impl AsyncRead for GrpcStream { self.payload_len -= to_read as u64; data.advance(to_read); } - // increase recv window + self.recv .as_mut() .unwrap() .flow_control() - .release_capacity(before_parse_data_len - data.len()) + .release_capacity(b.len()) .map_or_else( |e| { debug!("grpc flow control error: {}", e); @@ -170,9 +169,6 @@ impl AsyncRead for GrpcStream { |_| Ok(()), ) } - // no more data frames - // maybe trailer - // or cancelled _ => Ok(()), }, ) @@ -186,17 +182,18 @@ impl AsyncWrite for GrpcStream { cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - self.reserve_send_capacity(buf); + let encoded_buf = self.encode_buf(buf); + self.send.reserve_capacity(encoded_buf.len()); Poll::Ready(match ready!(self.send.poll_capacity(cx)) { - Some(Ok(to_write)) => { - let encoded_buf = self.encode_buf(buf); + Some(Ok(cap)) => { + let overhead_len = encoded_buf.len() - buf.len(); self.send.send_data(encoded_buf, false).map_or_else( |e| { debug!("grpc write error: {}", e); Err(Error::new(ErrorKind::BrokenPipe, e)) }, - |_| Ok(to_write), + |_| Ok(cap - overhead_len), ) } Some(Err(e)) => { From e3975aedd19310b1dc2b46bf00b3f04e64515976 Mon Sep 17 00:00:00 2001 From: dev0 Date: Mon, 4 Dec 2023 06:24:29 +1100 Subject: [PATCH 11/22] WIP --- clash/tests/data/config/rules.yaml | 4 +- clash_lib/src/proxy/transport/grpc.rs | 111 ++++++++++++++++++-------- clash_lib/src/proxy/transport/tls.rs | 2 + 3 files changed, 82 insertions(+), 35 deletions(-) diff --git a/clash/tests/data/config/rules.yaml b/clash/tests/data/config/rules.yaml index f31ef8f72..af79e283a 100644 --- a/clash/tests/data/config/rules.yaml +++ b/clash/tests/data/config/rules.yaml @@ -230,11 +230,11 @@ rule-providers: behavior: domain rules: - - DOMAIN,ipinfo.io,grpc-vmess + - DOMAIN,google.com,grpc-vmess # - RULE-SET,file-provider,trojan - GEOIP,CN,relay - DOMAIN-SUFFIX,facebook.com,REJECT - - DOMAIN-KEYWORD,google,select + - DOMAIN-KEYWORD,google,grpc-vmess - DOMAIN,google.com,select - SRC-IP-CIDR,192.168.1.1/24,DIRECT - GEOIP,CN,DIRECT diff --git a/clash_lib/src/proxy/transport/grpc.rs b/clash_lib/src/proxy/transport/grpc.rs index 6442320a7..fc2690575 100644 --- a/clash_lib/src/proxy/transport/grpc.rs +++ b/clash_lib/src/proxy/transport/grpc.rs @@ -5,18 +5,19 @@ use bytes::Buf; use bytes::{BufMut, Bytes, BytesMut}; use futures::ready; -use futures::Future; use h2::{RecvStream, SendStream}; use http::{Request, Uri, Version}; use prost::encoding::decode_varint; use prost::encoding::encode_varint; -use tracing::debug; -use tracing::log; +use tokio::sync::{mpsc, Mutex}; +use tracing::warn; +use tracing::{debug, trace}; use std::fmt::Debug; use std::io; use std::io::{Error, ErrorKind}; use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; use tokio::io::{AsyncRead, AsyncWrite}; @@ -50,23 +51,52 @@ impl GrpcStreamBuilder { } pub async fn proxy_stream(&self, stream: AnyStream) -> io::Result { - let (client, h2) = h2::client::handshake(stream).await.map_err(map_io_error)?; + let (client, h2) = h2::client::Builder::new() + .enable_push(false) + .handshake(stream) + .await + .map_err(map_io_error)?; let mut client = client.ready().await.map_err(map_io_error)?; let req = self.req()?; let (resp, send_stream) = client.send_request(req, false).map_err(map_io_error)?; tokio::spawn(async move { if let Err(e) = h2.await { - log::error!("http2 got err:{:?}", e); + //TODO: collect this somewhere? + warn!("http2 got err:{:?}", e); } }); - return Ok(Box::new(GrpcStream::new(resp, send_stream))); + + let (init_sender, init_ready) = mpsc::channel(1); + let recv_stream = Arc::new(Mutex::new(None)); + + { + let recv_stream = recv_stream.clone(); + tokio::spawn(async move { + match resp.await { + Ok(resp) => { + debug!("grpc resp: {:?}", resp); + recv_stream.lock().await.replace(resp.into_body()); + } + Err(e) => { + debug!("grpc resp err: {:?}", e); + } + } + let _ = init_sender.send(()); + }); + } + + return Ok(Box::new(GrpcStream::new( + init_ready, + recv_stream, + send_stream, + ))); } } pub struct GrpcStream { - resp_fut: h2::client::ResponseFuture, - recv: Option, + init_ready: mpsc::Receiver<()>, + recv: Arc>>, send: SendStream, buffer: BytesMut, payload_len: u64, @@ -75,7 +105,6 @@ pub struct GrpcStream { impl Debug for GrpcStream { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("GrpcStream") - .field("recv", &self.recv) .field("send", &self.send) .field("buffer", &self.buffer) .field("payload_len", &self.payload_len) @@ -84,10 +113,14 @@ impl Debug for GrpcStream { } impl GrpcStream { - pub fn new(resp_fut: h2::client::ResponseFuture, send: SendStream) -> Self { + pub fn new( + init_ready: mpsc::Receiver<()>, + recv: Arc>>, + send: SendStream, + ) -> Self { Self { - resp_fut, - recv: None, + init_ready, + recv, send, buffer: BytesMut::with_capacity(1024 * 4), payload_len: 0, @@ -114,16 +147,23 @@ impl AsyncRead for GrpcStream { cx: &mut Context<'_>, buf: &mut tokio::io::ReadBuf<'_>, ) -> Poll> { - if self.recv.is_none() { - self.recv = Some( - ready!(Pin::new(&mut self.resp_fut).poll(cx)) - .map_err(map_io_error)? - .into_body(), - ); - log::debug!("receive grpc recv stream"); + trace!("grpc poll_read: {:?}", buf); + + ready!(self.init_ready.poll_recv(cx)); + + let recv = self.recv.clone(); + + let mut recv = recv.try_lock().unwrap(); + if recv.is_none() { + trace!("initialization error"); + return Poll::Ready(Err(Error::new( + ErrorKind::ConnectionReset, + "initialization error", + ))); } if self.payload_len > 0 { + trace!("grpc poll_read data left payload_len: {}", self.payload_len); let to_read = std::cmp::min(buf.remaining(), self.payload_len as usize); let data = self.buffer.split_to(to_read); self.payload_len -= to_read as u64; @@ -131,8 +171,13 @@ impl AsyncRead for GrpcStream { return Poll::Ready(Ok(())); }; + trace!( + "no decoded data left, grpc poll_read data left buffer: {}", + self.buffer.len() + ); + Poll::Ready( - match ready!(Pin::new(&mut self.recv).as_pin_mut().unwrap().poll_data(cx)) { + match ready!(Pin::new(&mut recv.as_mut().unwrap()).poll_data(cx)) { Some(Ok(b)) => { let mut data = BytesMut::with_capacity(self.buffer.len() + b.len()); data.extend_from_slice(&self.buffer[..]); @@ -144,7 +189,7 @@ impl AsyncRead for GrpcStream { data.advance(6); self.payload_len = decode_varint(&mut data).map_err(map_io_error)?; } - let to_read = std::cmp::min(buf.remaining(), b.len()); + let to_read = std::cmp::min(buf.remaining(), data.len()); let to_read = std::cmp::min(self.payload_len as usize, to_read); if to_read == 0 { self.buffer.extend_from_slice(&data[..]); @@ -156,8 +201,8 @@ impl AsyncRead for GrpcStream { data.advance(to_read); } - self.recv - .as_mut() + trace!("released grpc flow control capacity: {}", b.len()); + recv.as_mut() .unwrap() .flow_control() .release_capacity(b.len()) @@ -182,11 +227,15 @@ impl AsyncWrite for GrpcStream { cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { + trace!("grpc poll_write: {:?}", buf.len()); + let encoded_buf = self.encode_buf(buf); + trace!("requesting capacity: {} bytes", encoded_buf.len()); self.send.reserve_capacity(encoded_buf.len()); Poll::Ready(match ready!(self.send.poll_capacity(cx)) { Some(Ok(cap)) => { + trace!("grpc got capacity: {} bytes", cap); let overhead_len = encoded_buf.len() - buf.len(); self.send.send_data(encoded_buf, false).map_or_else( |e| { @@ -197,7 +246,7 @@ impl AsyncWrite for GrpcStream { ) } Some(Err(e)) => { - debug!("grpc poll_capacity error: {}", e); + warn!("grpc poll_capacity error: {}", e); Err(Error::new(ErrorKind::BrokenPipe, e)) } _ => { @@ -214,14 +263,10 @@ impl AsyncWrite for GrpcStream { #[inline] fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.send.reserve_capacity(0); - Poll::Ready(ready!(self.send.poll_capacity(cx)).map_or( - Err(Error::new(ErrorKind::BrokenPipe, "broken pipe")), - |_| { - self.send - .send_data(Bytes::new(), true) - .map_or_else(|e| Err(Error::new(ErrorKind::BrokenPipe, e)), |_| Ok(())) - }, - )) + self.send.send_reset(h2::Reason::NO_ERROR); + self.send + .poll_reset(cx) + .map_err(map_io_error) + .map(|_| Ok(())) } } diff --git a/clash_lib/src/proxy/transport/tls.rs b/clash_lib/src/proxy/transport/tls.rs index f70826f90..e266425c3 100644 --- a/clash_lib/src/proxy/transport/tls.rs +++ b/clash_lib/src/proxy/transport/tls.rs @@ -34,6 +34,8 @@ pub async fn wrap_stream(stream: AnyStream, opt: TLSOptions) -> io::Result Date: Tue, 5 Dec 2023 00:37:04 +1100 Subject: [PATCH 12/22] WIP --- Cargo.lock | 6 ++--- clash_lib/Cargo.toml | 4 +-- clash_lib/src/proxy/transport/grpc.rs | 37 +++++++++++++-------------- 3 files changed, 23 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c9535e930..deba81489 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1582,9 +1582,9 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "h2" -version = "0.3.21" +version = "0.3.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91fc23aa11be92976ef4729127f1a74adf36d8436f7816b185d18df956790833" +checksum = "4d6250322ef6e60f93f9a2162799302cd6f68f79f6e5d85c8c16f14d1d958178" dependencies = [ "bytes", "fnv", @@ -1592,7 +1592,7 @@ dependencies = [ "futures-sink", "futures-util", "http", - "indexmap 1.9.3", + "indexmap 2.0.0", "slab", "tokio", "tokio-util", diff --git a/clash_lib/Cargo.toml b/clash_lib/Cargo.toml index 14307a64b..c960cd2a5 100644 --- a/clash_lib/Cargo.toml +++ b/clash_lib/Cargo.toml @@ -26,9 +26,9 @@ byteorder = "1.5" state = "0.6" lru_time_cache = "0.11" hyper = { version = "0.14", features = ["http1","http2","client", "server", "tcp"] } -http = { version = "0.2" } +http = { version = "0.2.11" } httparse = "1.8.0" -h2 = "0.3" +h2 = "0.3.22" prost = "0.12" tower = { version = "0.4", features = ["util"] } libc = "0.2" diff --git a/clash_lib/src/proxy/transport/grpc.rs b/clash_lib/src/proxy/transport/grpc.rs index fc2690575..2935825d9 100644 --- a/clash_lib/src/proxy/transport/grpc.rs +++ b/clash_lib/src/proxy/transport/grpc.rs @@ -73,6 +73,7 @@ impl GrpcStreamBuilder { { let recv_stream = recv_stream.clone(); tokio::spawn(async move { + trace!("initiating grpc recv stream"); match resp.await { Ok(resp) => { debug!("grpc resp: {:?}", resp); @@ -128,13 +129,17 @@ impl GrpcStream { } fn encode_buf(&self, data: &[u8]) -> Bytes { - let mut buf = BytesMut::with_capacity(16 + data.len()); - let grpc_header = [0u8; 5]; + let mut protobuf_header = BytesMut::with_capacity(10 + 1); + protobuf_header.put_u8(0x0a); + encode_varint(data.len() as u64, &mut protobuf_header); + let mut grpc_header = [0u8; 5]; + let grpc_payload_len = (protobuf_header.len() + data.len()) as u32; + grpc_header[1..5].copy_from_slice(&grpc_payload_len.to_be_bytes()); + + let mut buf = + BytesMut::with_capacity(grpc_header.len() + protobuf_header.len() + data.len()); buf.put_slice(&grpc_header[..]); - buf.put_u8(0x0a); - encode_varint(data.len() as u64, &mut buf); - let payload_len = ((buf.len() - 5 + data.len()) as u32).to_be_bytes(); - buf[1..5].copy_from_slice(&payload_len[..4]); + buf.put_slice(&protobuf_header.freeze()[..]); buf.put_slice(data); buf.freeze() } @@ -147,15 +152,13 @@ impl AsyncRead for GrpcStream { cx: &mut Context<'_>, buf: &mut tokio::io::ReadBuf<'_>, ) -> Poll> { - trace!("grpc poll_read: {:?}", buf); - ready!(self.init_ready.poll_recv(cx)); let recv = self.recv.clone(); let mut recv = recv.try_lock().unwrap(); if recv.is_none() { - trace!("initialization error"); + warn!("grpc initialization error"); return Poll::Ready(Err(Error::new( ErrorKind::ConnectionReset, "initialization error", @@ -171,11 +174,6 @@ impl AsyncRead for GrpcStream { return Poll::Ready(Ok(())); }; - trace!( - "no decoded data left, grpc poll_read data left buffer: {}", - self.buffer.len() - ); - Poll::Ready( match ready!(Pin::new(&mut recv.as_mut().unwrap()).poll_data(cx)) { Some(Ok(b)) => { @@ -227,22 +225,23 @@ impl AsyncWrite for GrpcStream { cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - trace!("grpc poll_write: {:?}", buf.len()); - let encoded_buf = self.encode_buf(buf); trace!("requesting capacity: {} bytes", encoded_buf.len()); self.send.reserve_capacity(encoded_buf.len()); Poll::Ready(match ready!(self.send.poll_capacity(cx)) { Some(Ok(cap)) => { - trace!("grpc got capacity: {} bytes", cap); - let overhead_len = encoded_buf.len() - buf.len(); + trace!( + "grpc got capacity: {} bytes, payload size: {}", + cap, + encoded_buf.len() + ); self.send.send_data(encoded_buf, false).map_or_else( |e| { debug!("grpc write error: {}", e); Err(Error::new(ErrorKind::BrokenPipe, e)) }, - |_| Ok(cap - overhead_len), + |_| Ok(buf.len()), ) } Some(Err(e)) => { From 9b3543f9d95457a97a4a1f09018ec1e2f07a57fe Mon Sep 17 00:00:00 2001 From: dev0 Date: Tue, 5 Dec 2023 03:48:58 +1100 Subject: [PATCH 13/22] test with trojan + grpc --- clash_lib/src/proxy/transport/grpc.rs | 84 ++++++++++++++------------- clash_lib/src/proxy/trojan/mod.rs | 34 ++++++++++- clash_lib/src/proxy/trojan/stream.rs | 1 - scripts/gun.proto | 11 ++++ 4 files changed, 87 insertions(+), 43 deletions(-) delete mode 100644 clash_lib/src/proxy/trojan/stream.rs create mode 100644 scripts/gun.proto diff --git a/clash_lib/src/proxy/transport/grpc.rs b/clash_lib/src/proxy/transport/grpc.rs index 2935825d9..3f54dbc36 100644 --- a/clash_lib/src/proxy/transport/grpc.rs +++ b/clash_lib/src/proxy/transport/grpc.rs @@ -52,6 +52,9 @@ impl GrpcStreamBuilder { pub async fn proxy_stream(&self, stream: AnyStream) -> io::Result { let (client, h2) = h2::client::Builder::new() + .initial_connection_window_size(0x7FFFFFFF) + .initial_window_size(0x7FFFFFFF) + .initial_max_send_streams(1024) .enable_push(false) .handshake(stream) .await @@ -77,7 +80,8 @@ impl GrpcStreamBuilder { match resp.await { Ok(resp) => { debug!("grpc resp: {:?}", resp); - recv_stream.lock().await.replace(resp.into_body()); + let handle = resp.into_body(); + recv_stream.lock().await.replace(handle); } Err(e) => { debug!("grpc resp err: {:?}", e); @@ -128,6 +132,7 @@ impl GrpcStream { } } + // encode data to grpc + protobuf format fn encode_buf(&self, data: &[u8]) -> Bytes { let mut protobuf_header = BytesMut::with_capacity(10 + 1); protobuf_header.put_u8(0x0a); @@ -171,50 +176,46 @@ impl AsyncRead for GrpcStream { let data = self.buffer.split_to(to_read); self.payload_len -= to_read as u64; buf.put_slice(&data[..to_read]); + assert_ne!(to_read, 0); return Poll::Ready(Ok(())); }; - Poll::Ready( - match ready!(Pin::new(&mut recv.as_mut().unwrap()).poll_data(cx)) { - Some(Ok(b)) => { - let mut data = BytesMut::with_capacity(self.buffer.len() + b.len()); - data.extend_from_slice(&self.buffer[..]); - data.extend_from_slice(&b[..]); - self.buffer.clear(); + match ready!(Pin::new(&mut recv.as_mut().unwrap()).poll_data(cx)) { + Some(Ok(b)) => { + self.buffer.reserve(b.len()); + self.buffer.extend_from_slice(&b[..]); - while self.payload_len > 0 || data.len() > 6 { - if self.payload_len == 0 { - data.advance(6); - self.payload_len = decode_varint(&mut data).map_err(map_io_error)?; - } - let to_read = std::cmp::min(buf.remaining(), data.len()); - let to_read = std::cmp::min(self.payload_len as usize, to_read); - if to_read == 0 { - self.buffer.extend_from_slice(&data[..]); - data.clear(); - break; - } - buf.put_slice(&data[..to_read]); - self.payload_len -= to_read as u64; - data.advance(to_read); + while self.payload_len > 0 || self.buffer.len() > 6 { + if self.payload_len == 0 { + self.buffer.advance(6); + self.payload_len = decode_varint(&mut self.buffer).map_err(map_io_error)?; } - - trace!("released grpc flow control capacity: {}", b.len()); - recv.as_mut() - .unwrap() - .flow_control() - .release_capacity(b.len()) - .map_or_else( - |e| { - debug!("grpc flow control error: {}", e); - Err(Error::new(ErrorKind::ConnectionReset, e)) - }, - |_| Ok(()), - ) + let to_read = std::cmp::min(buf.remaining(), self.payload_len as usize); + if to_read == 0 { + break; + } + buf.put_slice(self.buffer.split_to(to_read).freeze().as_ref()); + self.payload_len -= to_read as u64; } - _ => Ok(()), - }, - ) + + trace!("releasing grpc flow control capacity: {}", b.len()); + recv.as_mut() + .unwrap() + .flow_control() + .release_capacity(b.len()) + .map_or_else( + |e| { + debug!("grpc flow control error: {}", e); + Poll::Ready(Err(Error::new(ErrorKind::ConnectionReset, e))) + }, + |_| Poll::Ready(Ok(())), + ) + } + _ => { + assert_eq!(self.payload_len, 0); + Poll::Ready(Ok(())) + } + } } } @@ -241,7 +242,10 @@ impl AsyncWrite for GrpcStream { debug!("grpc write error: {}", e); Err(Error::new(ErrorKind::BrokenPipe, e)) }, - |_| Ok(buf.len()), + |_| { + debug!("grpc wrote {} bytes", buf.len()); + Ok(buf.len()) + }, ) } Some(Err(e)) => { diff --git a/clash_lib/src/proxy/trojan/mod.rs b/clash_lib/src/proxy/trojan/mod.rs index 42256317f..2f6758076 100644 --- a/clash_lib/src/proxy/trojan/mod.rs +++ b/clash_lib/src/proxy/trojan/mod.rs @@ -30,7 +30,6 @@ use super::{ }; mod datagram; -mod stream; static DEFAULT_ALPN: [&str; 2] = ["h2", "http/1.1"]; @@ -81,7 +80,38 @@ impl Handler { )), }; - let mut s = transport::tls::wrap_stream(s, tls_opt.to_owned()).await?; + let s = transport::tls::wrap_stream(s, tls_opt.to_owned()).await?; + + let mut s = if let Some(transport) = self.opts.transport.as_ref() { + match transport { + Transport::Ws(ws_opts) => { + let ws_builder = transport::WebsocketStreamBuilder::new( + self.opts.server.clone(), + self.opts.port, + ws_opts.path.clone(), + ws_opts.headers.clone(), + None, + ws_opts.max_early_data, + ws_opts.early_data_header_name.clone(), + ); + + ws_builder.proxy_stream(s).await? + } + Transport::Grpc(grpc_opts) => { + let grpc_builder = transport::GrpcStreamBuilder::new( + self.opts.server.clone(), + grpc_opts + .service_name + .to_owned() + .try_into() + .expect("invalid gRPC service path"), + ); + grpc_builder.proxy_stream(s).await? + } + } + } else { + s + }; let mut buf = BytesMut::new(); let password = Sha224::digest(self.opts.password.as_bytes()); diff --git a/clash_lib/src/proxy/trojan/stream.rs b/clash_lib/src/proxy/trojan/stream.rs deleted file mode 100644 index 8b1378917..000000000 --- a/clash_lib/src/proxy/trojan/stream.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/scripts/gun.proto b/scripts/gun.proto new file mode 100644 index 000000000..1d127b4c1 --- /dev/null +++ b/scripts/gun.proto @@ -0,0 +1,11 @@ +syntax = "proto3"; +option go_package = "github.com/Qv2ray/gun/pkg/proto"; + +message Hunk { + bytes data = 1; +} + +service GunService { + rpc Tun (stream Hunk) returns (stream Hunk); + rpc TunDatagram (stream Hunk) returns (stream Hunk); +} \ No newline at end of file From cbd72f8b6ff459512fb78cef565869360ba9e4e7 Mon Sep 17 00:00:00 2001 From: dev0 Date: Wed, 6 Dec 2023 21:21:06 +1100 Subject: [PATCH 14/22] up --- docker/nginx/nginx.conf | 23 ++++++++++++++++++ docker/trojan/config.json | 51 --------------------------------------- docker/v2ray/config.json | 43 +++++++++++++++++++++++++++++++++ 3 files changed, 66 insertions(+), 51 deletions(-) delete mode 100644 docker/trojan/config.json diff --git a/docker/nginx/nginx.conf b/docker/nginx/nginx.conf index aadf3fbfd..f36206fba 100644 --- a/docker/nginx/nginx.conf +++ b/docker/nginx/nginx.conf @@ -36,5 +36,28 @@ http { grpc_pass grpc://127.0.0.1:16825; } + + + location /def { + # 网上参考级配置 + client_max_body_size 0; + client_body_timeout 60m; + send_timeout 60m; + lingering_close always; + + keepalive_time 2h; + keepalive_timeout 30s; + keepalive_requests 256; + grpc_socket_keepalive on; + + # 实践级配置 + grpc_read_timeout 3m; + grpc_send_timeout 2m; + grpc_set_header Host $host; + grpc_set_header X-Real-IP $remote_addr; + grpc_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + + grpc_pass grpc://127.0.0.1:9444; + } } } \ No newline at end of file diff --git a/docker/trojan/config.json b/docker/trojan/config.json deleted file mode 100644 index 6ce146e8d..000000000 --- a/docker/trojan/config.json +++ /dev/null @@ -1,51 +0,0 @@ -{ - "run_type": "server", - "local_addr": "0.0.0.0", - "local_port": 9443, - "remote_addr": "127.0.0.1", - "remote_port": 80, - "password": [ - "password1", - "password2" - ], - "log_level": 1, - "ssl": { - "cert": "/etc/v2ray/v2ray.crt", - "key": "/etc/v2ray/v2ray.key", - "key_password": "", - "cipher": "ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:DHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES256-GCM-SHA384", - "cipher_tls13": "TLS_AES_128_GCM_SHA256:TLS_CHACHA20_POLY1305_SHA256:TLS_AES_256_GCM_SHA384", - "prefer_server_cipher": true, - "alpn": [ - "http/1.1" - ], - "alpn_port_override": { - "h2": 81 - }, - "reuse_session": true, - "session_ticket": false, - "session_timeout": 600, - "plain_http_response": "", - "curves": "", - "dhparam": "" - }, - "tcp": { - "prefer_ipv4": false, - "no_delay": true, - "keep_alive": true, - "reuse_port": false, - "fast_open": false, - "fast_open_qlen": 20 - }, - "mysql": { - "enabled": false, - "server_addr": "127.0.0.1", - "server_port": 3306, - "database": "trojan", - "username": "trojan", - "password": "", - "key": "", - "cert": "", - "ca": "" - } -} \ No newline at end of file diff --git a/docker/v2ray/config.json b/docker/v2ray/config.json index c1ca78078..1198828bb 100644 --- a/docker/v2ray/config.json +++ b/docker/v2ray/config.json @@ -39,6 +39,49 @@ } } }, + { + "port": 9443, + "protocol": "trojan", + "settings": { + "clients": [ + { + "password": "password1", + } + ] + }, + "streamSettings": { + "network": "tcp", + "security": "tls", + "tlsSettings": { + "alpn": [ + "http/1.1" + ], + "certificates": [ + { + "certificateFile": "/etc/v2ray/v2ray.crt", // 证书文件 + "keyFile": "/etc/v2ray/v2ray.key" // 密钥文件 + } + ] + } + } + }, + { + "port": 9444, + "protocol": "trojan", + "settings": { + "clients": [ + { + "password": "password1" // 填写你的 password + } + ] + }, + "streamSettings": { + "network": "grpc", + "grpcSettings": { + "serviceName": "def" // 填写你的 ServiceName + } + } + } { "port": 16824, "protocol": "vmess", From a9bf43cc99887ce7747ef849d0586697f65e0d8a Mon Sep 17 00:00:00 2001 From: dev0 Date: Wed, 6 Dec 2023 21:22:35 +1100 Subject: [PATCH 15/22] up --- docker/docker-compose.yml | 14 -------------- docker/v2ray/config.json | 2 +- 2 files changed, 1 insertion(+), 15 deletions(-) diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 2df22d138..a9e77e3f2 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -24,20 +24,6 @@ services: source: ./v2ray/key.pem target: /etc/v2ray/v2ray.key - trojan: - image: trojangfw/trojan - network_mode: "host" - volumes: - - type: bind - source: ./trojan/config.json - target: /config/config.json - - type: bind - source: ./v2ray/cert.pem - target: /etc/v2ray/v2ray.crt - - type: bind - source: ./v2ray/key.pem - target: /etc/v2ray/v2ray.key - nginx: image: nginx network_mode: "host" diff --git a/docker/v2ray/config.json b/docker/v2ray/config.json index 1198828bb..cb2ca173c 100644 --- a/docker/v2ray/config.json +++ b/docker/v2ray/config.json @@ -81,7 +81,7 @@ "serviceName": "def" // 填写你的 ServiceName } } - } + }, { "port": 16824, "protocol": "vmess", From f7064fb410850559aea1bd5519ba903128b07d57 Mon Sep 17 00:00:00 2001 From: dev0 Date: Wed, 6 Dec 2023 21:27:06 +1100 Subject: [PATCH 16/22] up --- docker/v2ray/config.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/v2ray/config.json b/docker/v2ray/config.json index cb2ca173c..cf74028f4 100644 --- a/docker/v2ray/config.json +++ b/docker/v2ray/config.json @@ -45,7 +45,7 @@ "settings": { "clients": [ { - "password": "password1", + "password": "password1" } ] }, From 8d4122125e1e03a283d9463edb6f4acbccd11378 Mon Sep 17 00:00:00 2001 From: dev0 Date: Wed, 6 Dec 2023 21:40:26 +1100 Subject: [PATCH 17/22] up --- clash/tests/data/config/rules.yaml | 16 +++++++++++++++- docker/nginx/nginx.conf | 4 ++-- docker/v2ray/config.json | 3 ++- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/clash/tests/data/config/rules.yaml b/clash/tests/data/config/rules.yaml index af79e283a..e277f7e34 100644 --- a/clash/tests/data/config/rules.yaml +++ b/clash/tests/data/config/rules.yaml @@ -211,6 +211,20 @@ proxies: - h2 - http/1.1 skip-cert-verify: true + network: grpc + grpc-opts: + grpc-service-name: def + + - name: "trojan-grpc" + type: trojan + server: 10.0.0.13 + port: 19443 + password: password1 + udp: true + # sni: example.com # aka server name + alpn: + - h2 + skip-cert-verify: true proxy-providers: file-provider: @@ -230,7 +244,7 @@ rule-providers: behavior: domain rules: - - DOMAIN,google.com,grpc-vmess + - DOMAIN,google.com,trojan-grpc # - RULE-SET,file-provider,trojan - GEOIP,CN,relay - DOMAIN-SUFFIX,facebook.com,REJECT diff --git a/docker/nginx/nginx.conf b/docker/nginx/nginx.conf index f36206fba..46b84ec17 100644 --- a/docker/nginx/nginx.conf +++ b/docker/nginx/nginx.conf @@ -15,7 +15,7 @@ http { ssl_ciphers ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:DHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES256-GCM-SHA384; - location /abc { + location /abc/Tun { # 网上参考级配置 client_max_body_size 0; client_body_timeout 60m; @@ -38,7 +38,7 @@ http { } - location /def { + location /def/Tun { # 网上参考级配置 client_max_body_size 0; client_body_timeout 60m; diff --git a/docker/v2ray/config.json b/docker/v2ray/config.json index cf74028f4..b0927064f 100644 --- a/docker/v2ray/config.json +++ b/docker/v2ray/config.json @@ -79,7 +79,8 @@ "network": "grpc", "grpcSettings": { "serviceName": "def" // 填写你的 ServiceName - } + }, + "security": "none" } }, { From 2b373067d2ce608cec1107732090254b6758b315 Mon Sep 17 00:00:00 2001 From: dev0 Date: Wed, 6 Dec 2023 22:38:43 +1100 Subject: [PATCH 18/22] up --- clash/tests/data/config/rules.yaml | 1 + clash_lib/src/proxy/converters/trojan.rs | 7 ++++++- clash_lib/src/proxy/trojan/mod.rs | 2 +- docker/nginx/nginx.conf | 10 ++++++++++ 4 files changed, 18 insertions(+), 2 deletions(-) diff --git a/clash/tests/data/config/rules.yaml b/clash/tests/data/config/rules.yaml index e277f7e34..e453c5f0f 100644 --- a/clash/tests/data/config/rules.yaml +++ b/clash/tests/data/config/rules.yaml @@ -222,6 +222,7 @@ proxies: password: password1 udp: true # sni: example.com # aka server name + tls: true alpn: - h2 skip-cert-verify: true diff --git a/clash_lib/src/proxy/converters/trojan.rs b/clash_lib/src/proxy/converters/trojan.rs index abcaac806..40f322dae 100644 --- a/clash_lib/src/proxy/converters/trojan.rs +++ b/clash_lib/src/proxy/converters/trojan.rs @@ -82,7 +82,12 @@ impl TryFrom<&OutboundTrojan> for AnyOutboundHandler { .ok_or(Error::InvalidConfig( "grpc_opts is required for grpc".to_owned(), )), - _ => return Err(Error::InvalidConfig(format!("unsupported network: {}", x))), + _ => { + return Err(Error::InvalidConfig(format!( + "unsupported trojan network: {}", + x + ))) + } }) .transpose()?, }); diff --git a/clash_lib/src/proxy/trojan/mod.rs b/clash_lib/src/proxy/trojan/mod.rs index 2f6758076..70808f8f6 100644 --- a/clash_lib/src/proxy/trojan/mod.rs +++ b/clash_lib/src/proxy/trojan/mod.rs @@ -80,7 +80,7 @@ impl Handler { )), }; - let s = transport::tls::wrap_stream(s, tls_opt.to_owned()).await?; + let s = transport::tls::wrap_stream(s, tls_opt).await?; let mut s = if let Some(transport) = self.opts.transport.as_ref() { match transport { diff --git a/docker/nginx/nginx.conf b/docker/nginx/nginx.conf index 46b84ec17..b05c9ec63 100644 --- a/docker/nginx/nginx.conf +++ b/docker/nginx/nginx.conf @@ -37,7 +37,17 @@ http { grpc_pass grpc://127.0.0.1:16825; } + } + + + server { + listen 19444 http2 ssl; + server_name localhost; + ssl_certificate /etc/v2ray/v2ray.crt; + ssl_certificate_key /etc/v2ray/v2ray.key; + ssl_protocols TLSv1.2 TLSv1.3; + ssl_ciphers ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:DHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES256-GCM-SHA384; location /def/Tun { # 网上参考级配置 client_max_body_size 0; From a2bde0700f5a1c8b36273e1a9027939763e9ad85 Mon Sep 17 00:00:00 2001 From: dev0 Date: Wed, 6 Dec 2023 23:23:59 +1100 Subject: [PATCH 19/22] up --- clash/tests/data/config/rules.yaml | 9 +++--- docker/nginx/nginx.conf | 50 ++---------------------------- 2 files changed, 6 insertions(+), 53 deletions(-) diff --git a/clash/tests/data/config/rules.yaml b/clash/tests/data/config/rules.yaml index e453c5f0f..e8464f941 100644 --- a/clash/tests/data/config/rules.yaml +++ b/clash/tests/data/config/rules.yaml @@ -211,21 +211,20 @@ proxies: - h2 - http/1.1 skip-cert-verify: true - network: grpc - grpc-opts: - grpc-service-name: def - name: "trojan-grpc" type: trojan server: 10.0.0.13 - port: 19443 + port: 19444 password: password1 udp: true # sni: example.com # aka server name - tls: true alpn: - h2 skip-cert-verify: true + network: grpc + grpc-opts: + grpc-service-name: def proxy-providers: file-provider: diff --git a/docker/nginx/nginx.conf b/docker/nginx/nginx.conf index b05c9ec63..d794b730f 100644 --- a/docker/nginx/nginx.conf +++ b/docker/nginx/nginx.conf @@ -6,8 +6,9 @@ http { error_log /tmp/error.log debug; server { - listen 19443 http2 ssl; + listen 19443 ssl; server_name localhost; + http2 on; ssl_certificate /etc/v2ray/v2ray.crt; ssl_certificate_key /etc/v2ray/v2ray.key; @@ -16,57 +17,10 @@ http { location /abc/Tun { - # 网上参考级配置 - client_max_body_size 0; - client_body_timeout 60m; - send_timeout 60m; - lingering_close always; - - keepalive_time 2h; - keepalive_timeout 30s; - keepalive_requests 256; - grpc_socket_keepalive on; - - # 实践级配置 - grpc_read_timeout 3m; - grpc_send_timeout 2m; - grpc_set_header Host $host; - grpc_set_header X-Real-IP $remote_addr; - grpc_set_header X-Forwarded-For $proxy_add_x_forwarded_for; - grpc_pass grpc://127.0.0.1:16825; } - } - - - server { - listen 19444 http2 ssl; - server_name localhost; - - ssl_certificate /etc/v2ray/v2ray.crt; - ssl_certificate_key /etc/v2ray/v2ray.key; - ssl_protocols TLSv1.2 TLSv1.3; - ssl_ciphers ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:DHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES256-GCM-SHA384; location /def/Tun { - # 网上参考级配置 - client_max_body_size 0; - client_body_timeout 60m; - send_timeout 60m; - lingering_close always; - - keepalive_time 2h; - keepalive_timeout 30s; - keepalive_requests 256; - grpc_socket_keepalive on; - - # 实践级配置 - grpc_read_timeout 3m; - grpc_send_timeout 2m; - grpc_set_header Host $host; - grpc_set_header X-Real-IP $remote_addr; - grpc_set_header X-Forwarded-For $proxy_add_x_forwarded_for; - grpc_pass grpc://127.0.0.1:9444; } } From 82b622ca738d2266aac65b83efe6f83591d03921 Mon Sep 17 00:00:00 2001 From: dev0 Date: Wed, 6 Dec 2023 23:45:09 +1100 Subject: [PATCH 20/22] up --- clash/tests/data/config/rules.yaml | 2 +- clash_lib/src/proxy/transport/grpc.rs | 15 ++++++++++----- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/clash/tests/data/config/rules.yaml b/clash/tests/data/config/rules.yaml index e8464f941..3dbd7b087 100644 --- a/clash/tests/data/config/rules.yaml +++ b/clash/tests/data/config/rules.yaml @@ -215,7 +215,7 @@ proxies: - name: "trojan-grpc" type: trojan server: 10.0.0.13 - port: 19444 + port: 19443 password: password1 udp: true # sni: example.com # aka server name diff --git a/clash_lib/src/proxy/transport/grpc.rs b/clash_lib/src/proxy/transport/grpc.rs index 3f54dbc36..40e7c4ae7 100644 --- a/clash_lib/src/proxy/transport/grpc.rs +++ b/clash_lib/src/proxy/transport/grpc.rs @@ -4,12 +4,13 @@ use crate::proxy::AnyStream; use bytes::Buf; use bytes::{BufMut, Bytes, BytesMut}; -use futures::ready; +use futures::{ready, StreamExt}; use h2::{RecvStream, SendStream}; use http::{Request, Uri, Version}; use prost::encoding::decode_varint; use prost::encoding::encode_varint; use tokio::sync::{mpsc, Mutex}; +use tokio_util::io::StreamReader; use tracing::warn; use tracing::{debug, trace}; @@ -80,8 +81,12 @@ impl GrpcStreamBuilder { match resp.await { Ok(resp) => { debug!("grpc resp: {:?}", resp); - let handle = resp.into_body(); - recv_stream.lock().await.replace(handle); + let stream = resp + .into_body() + .map(|result| result.map_err(map_io_error)) + .into(); + let reader = StreamReader::new(stream); + recv_stream.lock().await.replace(reader); } Err(e) => { debug!("grpc resp err: {:?}", e); @@ -101,7 +106,7 @@ impl GrpcStreamBuilder { pub struct GrpcStream { init_ready: mpsc::Receiver<()>, - recv: Arc>>, + recv: Arc>>>, send: SendStream, buffer: BytesMut, payload_len: u64, @@ -120,7 +125,7 @@ impl Debug for GrpcStream { impl GrpcStream { pub fn new( init_ready: mpsc::Receiver<()>, - recv: Arc>>, + recv: Arc>>>, send: SendStream, ) -> Self { Self { From 3c3980225c5af952d842ffb7be38fefd3e0104d3 Mon Sep 17 00:00:00 2001 From: dev0 Date: Thu, 7 Dec 2023 01:43:22 +1100 Subject: [PATCH 21/22] trojan grpc working --- Cargo.lock | 1 + clash/tests/data/config/rules.yaml | 2 +- clash_lib/Cargo.toml | 2 +- clash_lib/src/app/outbound/manager.rs | 4 +++ .../app/remote_content_manager/healthcheck.rs | 16 +++++----- clash_lib/src/proxy/transport/grpc.rs | 31 +++++++++---------- 6 files changed, 31 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index deba81489..80e157198 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3872,6 +3872,7 @@ checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", "pin-project-lite", "tokio", diff --git a/clash/tests/data/config/rules.yaml b/clash/tests/data/config/rules.yaml index 3dbd7b087..ff939d77d 100644 --- a/clash/tests/data/config/rules.yaml +++ b/clash/tests/data/config/rules.yaml @@ -244,7 +244,7 @@ rule-providers: behavior: domain rules: - - DOMAIN,google.com,trojan-grpc + - DOMAIN,google.com,grpc-vmess # - RULE-SET,file-provider,trojan - GEOIP,CN,relay - DOMAIN-SUFFIX,facebook.com,REJECT diff --git a/clash_lib/Cargo.toml b/clash_lib/Cargo.toml index c960cd2a5..767ae16f9 100644 --- a/clash_lib/Cargo.toml +++ b/clash_lib/Cargo.toml @@ -11,7 +11,7 @@ bench = ["criterion"] [dependencies] tokio = { version = "1", features = ["full"] } -tokio-util = { version = "0.7", features = ["net", "codec", "io"] } +tokio-util = { version = "0.7", features = ["net", "codec", "io", "compat"] } tokio-rustls = "0.24" thiserror = "1.0" async-trait = "0.1" diff --git a/clash_lib/src/app/outbound/manager.rs b/clash_lib/src/app/outbound/manager.rs index e2758e732..29a60a8e4 100644 --- a/clash_lib/src/app/outbound/manager.rs +++ b/clash_lib/src/app/outbound/manager.rs @@ -6,6 +6,7 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use tokio::sync::{Mutex, RwLock}; +use tracing::debug; use tracing::error; use tracing::info; @@ -65,6 +66,7 @@ impl OutboundManager { let mut selector_control = HashMap::new(); let proxy_manager = ProxyManager::new(dns_resolver.clone()); + debug!("initializing proxy providers"); Self::load_proxy_providers( cwd, proxy_providers, @@ -74,6 +76,7 @@ impl OutboundManager { ) .await?; + debug!("initializing handlers"); Self::load_handlers( outbounds, outbound_groups, @@ -628,6 +631,7 @@ impl OutboundManager { error!("failed to initialize proxy provider {}: {}", p.name(), err); } } + info!("initialized provider {}", p.name()); } Ok(()) diff --git a/clash_lib/src/app/remote_content_manager/healthcheck.rs b/clash_lib/src/app/remote_content_manager/healthcheck.rs index 11298faee..32756792d 100644 --- a/clash_lib/src/app/remote_content_manager/healthcheck.rs +++ b/clash_lib/src/app/remote_content_manager/healthcheck.rs @@ -49,13 +49,15 @@ impl HealthCheck { let lazy = self.lazy; let proxies = self.inner.read().await.proxies.clone(); - let url = self.url.clone(); - tokio::spawn(async move { - proxy_manager.check(&proxies, &url, None).await; - }); + { + let url = self.url.clone(); + let proxies = proxies.clone(); + tokio::spawn(async move { + proxy_manager.check(&proxies, &url, None).await; + }); + } let inner = self.inner.clone(); - let proxies = self.inner.read().await.proxies.clone(); let proxy_manager = self.proxy_manager.clone(); let url = self.url.clone(); let task_handle = tokio::spawn(async move { @@ -65,8 +67,8 @@ impl HealthCheck { _ = ticker.tick() => { pm_debug!("healthcheck ticking: {}, lazy: {}", url, lazy); let now = tokio::time::Instant::now(); - let r = inner.read().await; - if !lazy || now.duration_since(r.last_check).as_secs() >= interval { + let last_check = inner.read().await.last_check; + if !lazy || now.duration_since(last_check).as_secs() >= interval { proxy_manager.check(&proxies, &url, None).await; let mut w = inner.write().await; w.last_check = now; diff --git a/clash_lib/src/proxy/transport/grpc.rs b/clash_lib/src/proxy/transport/grpc.rs index 40e7c4ae7..70d8ed192 100644 --- a/clash_lib/src/proxy/transport/grpc.rs +++ b/clash_lib/src/proxy/transport/grpc.rs @@ -4,13 +4,12 @@ use crate::proxy::AnyStream; use bytes::Buf; use bytes::{BufMut, Bytes, BytesMut}; -use futures::{ready, StreamExt}; +use futures::ready; use h2::{RecvStream, SendStream}; use http::{Request, Uri, Version}; use prost::encoding::decode_varint; use prost::encoding::encode_varint; use tokio::sync::{mpsc, Mutex}; -use tokio_util::io::StreamReader; use tracing::warn; use tracing::{debug, trace}; @@ -81,12 +80,9 @@ impl GrpcStreamBuilder { match resp.await { Ok(resp) => { debug!("grpc resp: {:?}", resp); - let stream = resp - .into_body() - .map(|result| result.map_err(map_io_error)) - .into(); - let reader = StreamReader::new(stream); - recv_stream.lock().await.replace(reader); + let stream = resp.into_body(); + + recv_stream.lock().await.replace(stream); } Err(e) => { debug!("grpc resp err: {:?}", e); @@ -106,10 +102,10 @@ impl GrpcStreamBuilder { pub struct GrpcStream { init_ready: mpsc::Receiver<()>, - recv: Arc>>>, + recv: Arc>>, send: SendStream, buffer: BytesMut, - payload_len: u64, + payload_len: usize, } impl Debug for GrpcStream { @@ -125,7 +121,7 @@ impl Debug for GrpcStream { impl GrpcStream { pub fn new( init_ready: mpsc::Receiver<()>, - recv: Arc>>>, + recv: Arc>>, send: SendStream, ) -> Self { Self { @@ -175,11 +171,12 @@ impl AsyncRead for GrpcStream { ))); } - if self.payload_len > 0 { + if self.payload_len > 0 && self.buffer.len() > 0 { trace!("grpc poll_read data left payload_len: {}", self.payload_len); let to_read = std::cmp::min(buf.remaining(), self.payload_len as usize); + let to_read = std::cmp::min(to_read, self.buffer.len()); let data = self.buffer.split_to(to_read); - self.payload_len -= to_read as u64; + self.payload_len -= to_read; buf.put_slice(&data[..to_read]); assert_ne!(to_read, 0); return Poll::Ready(Ok(())); @@ -193,14 +190,16 @@ impl AsyncRead for GrpcStream { while self.payload_len > 0 || self.buffer.len() > 6 { if self.payload_len == 0 { self.buffer.advance(6); - self.payload_len = decode_varint(&mut self.buffer).map_err(map_io_error)?; + let payload_len = decode_varint(&mut self.buffer).map_err(map_io_error)?; + self.payload_len = payload_len as usize; } - let to_read = std::cmp::min(buf.remaining(), self.payload_len as usize); + let to_read = std::cmp::min(self.buffer.len(), self.payload_len as usize); + let to_read = std::cmp::min(buf.remaining(), to_read); if to_read == 0 { break; } buf.put_slice(self.buffer.split_to(to_read).freeze().as_ref()); - self.payload_len -= to_read as u64; + self.payload_len -= to_read; } trace!("releasing grpc flow control capacity: {}", b.len()); From a00f269c70580491adcba1560f4c4b6c1a3ffb47 Mon Sep 17 00:00:00 2001 From: dev0 Date: Thu, 7 Dec 2023 03:24:20 +1100 Subject: [PATCH 22/22] it worked --- clash/src/main.rs | 10 ++-- clash/tests/data/config/rules.yaml | 3 ++ clash_lib/src/proxy/transport/grpc.rs | 50 +++++++++++++++++-- clash_lib/src/proxy/transport/tls.rs | 27 ++++++++-- clash_lib/src/proxy/trojan/mod.rs | 2 +- clash_lib/src/proxy/vmess/mod.rs | 17 +++---- .../src/proxy/vmess/vmess_impl/stream.rs | 21 ++++---- 7 files changed, 94 insertions(+), 36 deletions(-) diff --git a/clash/src/main.rs b/clash/src/main.rs index b2d6fbd34..44c9f70ac 100644 --- a/clash/src/main.rs +++ b/clash/src/main.rs @@ -59,11 +59,15 @@ fn main() { } } } - clash::start(clash::Options { + match clash::start(clash::Options { config: clash::Config::File(file), cwd: cli.directory.map(|x| x.to_string_lossy().to_string()), rt: Some(TokioRuntime::MultiThread), log_file: None, - }) - .unwrap(); + }) { + Ok(_) => {} + Err(_) => { + exit(1); + } + } } diff --git a/clash/tests/data/config/rules.yaml b/clash/tests/data/config/rules.yaml index ff939d77d..567378372 100644 --- a/clash/tests/data/config/rules.yaml +++ b/clash/tests/data/config/rules.yaml @@ -165,6 +165,7 @@ proxies: network: h2 h2-opts: path: /ray + - name: grpc-vmess type: vmess server: 10.0.0.13 @@ -245,6 +246,8 @@ rule-providers: rules: - DOMAIN,google.com,grpc-vmess + - DOMAIN-KEYWORD,httpbin,trojan-grpc + - DOMAIN,ipinfo.io,trojan-grpc # - RULE-SET,file-provider,trojan - GEOIP,CN,relay - DOMAIN-SUFFIX,facebook.com,REJECT diff --git a/clash_lib/src/proxy/transport/grpc.rs b/clash_lib/src/proxy/transport/grpc.rs index 70d8ed192..e75fd1442 100644 --- a/clash_lib/src/proxy/transport/grpc.rs +++ b/clash_lib/src/proxy/transport/grpc.rs @@ -171,19 +171,46 @@ impl AsyncRead for GrpcStream { ))); } - if self.payload_len > 0 && self.buffer.len() > 0 { + if (self.payload_len > 0 && self.buffer.len() > 0) + || (self.payload_len == 0 && self.buffer.len() > 6) + { + if self.payload_len == 0 { + self.buffer.advance(6); + let payload_len = decode_varint(&mut self.buffer).map_err(map_io_error)?; + self.payload_len = payload_len as usize; + } + trace!("grpc poll_read data left payload_len: {}", self.payload_len); let to_read = std::cmp::min(buf.remaining(), self.payload_len as usize); let to_read = std::cmp::min(to_read, self.buffer.len()); + + if to_read == 0 { + assert!(buf.remaining() > 0); + trace!("no data left in buffer"); + return Poll::Pending; + } + let data = self.buffer.split_to(to_read); + trace!( + "consuming {} data, payload left: {}, buffer left: {}", + to_read, + self.payload_len - to_read, + self.buffer.len() + ); self.payload_len -= to_read; - buf.put_slice(&data[..to_read]); - assert_ne!(to_read, 0); + buf.put_slice(&data[..]); return Poll::Ready(Ok(())); - }; + } + + trace!( + "no more data left, polling recv stream, current capacity: {}", + recv.as_mut().unwrap().flow_control().available_capacity() + ); match ready!(Pin::new(&mut recv.as_mut().unwrap()).poll_data(cx)) { Some(Ok(b)) => { + trace!("got data from recv stream: {}", b.len()); + self.buffer.reserve(b.len()); self.buffer.extend_from_slice(&b[..]); @@ -198,6 +225,13 @@ impl AsyncRead for GrpcStream { if to_read == 0 { break; } + trace!( + "consuming {} data, payload left: {}, buffer left: {}", + to_read, + self.payload_len - to_read, + self.buffer.len() - to_read + ); + buf.put_slice(self.buffer.split_to(to_read).freeze().as_ref()); self.payload_len -= to_read; } @@ -217,7 +251,13 @@ impl AsyncRead for GrpcStream { } _ => { assert_eq!(self.payload_len, 0); - Poll::Ready(Ok(())) + if recv.as_mut().unwrap().is_end_stream() { + trace!("no more data left, recv stream closed"); + Poll::Ready(Ok(())) + } else { + trace!("no more data left, recv stream pending"); + Poll::Pending + } } } } diff --git a/clash_lib/src/proxy/transport/tls.rs b/clash_lib/src/proxy/transport/tls.rs index e266425c3..7d6abfa19 100644 --- a/clash_lib/src/proxy/transport/tls.rs +++ b/clash_lib/src/proxy/transport/tls.rs @@ -16,7 +16,11 @@ pub struct TLSOptions { pub alpn: Option>, } -pub async fn wrap_stream(stream: AnyStream, opt: TLSOptions) -> io::Result { +pub async fn wrap_stream( + stream: AnyStream, + opt: TLSOptions, + expected_alpn: Option<&str>, +) -> io::Result { let mut tls_config = ClientConfig::builder() .with_safe_defaults() .with_root_certificates(GLOBAL_ROOT_STORE.clone()) @@ -40,8 +44,21 @@ pub async fn wrap_stream(stream: AnyStream, opt: TLSOptions) -> io::Result { - debug!(target: "vmess", $($arg)*) - }; -} - pub enum VmessTransport { Ws(WsOption), H2(Http2Option), @@ -84,7 +77,7 @@ impl Handler { ); if let Some(tls_opt) = &self.opts.tls { - stream = transport::tls::wrap_stream(stream, tls_opt.to_owned()).await?; + stream = transport::tls::wrap_stream(stream, tls_opt.to_owned(), None).await?; } ws_builder.proxy_stream(stream).await? @@ -97,7 +90,7 @@ impl Handler { .expect("H2 conn must have tls opt") .clone(); tls_opt.alpn = Some(vec!["h2".to_string()]); - stream = transport::tls::wrap_stream(stream, tls_opt.to_owned()).await?; + stream = transport::tls::wrap_stream(stream, tls_opt.to_owned(), None).await?; let h2_builder = Http2Config { hosts: vec![self.opts.server.clone()], @@ -110,7 +103,9 @@ impl Handler { } Some(VmessTransport::Grpc(ref opt)) => { let tls_opt = self.opts.tls.as_ref().expect("gRPC conn must have tls opt"); - stream = transport::tls::wrap_stream(stream, tls_opt.to_owned()).await?; + stream = + transport::tls::wrap_stream(stream, tls_opt.to_owned(), Some("h2")).await?; + let grpc_builder = transport::GrpcStreamBuilder::new( self.opts.server.clone(), opt.service_name @@ -125,7 +120,7 @@ impl Handler { } None => { if let Some(tls_opt) = self.opts.tls.as_ref() { - stream = transport::tls::wrap_stream(stream, tls_opt.to_owned()).await?; + stream = transport::tls::wrap_stream(stream, tls_opt.to_owned(), None).await?; } stream } diff --git a/clash_lib/src/proxy/vmess/vmess_impl/stream.rs b/clash_lib/src/proxy/vmess/vmess_impl/stream.rs index 3ddd4dfde..3a1bd785e 100644 --- a/clash_lib/src/proxy/vmess/vmess_impl/stream.rs +++ b/clash_lib/src/proxy/vmess/vmess_impl/stream.rs @@ -15,7 +15,6 @@ use crate::{ }, proxy::vmess::vmess_impl::MAX_CHUNK_SIZE, session::SocksAddr, - vmess_debug, }; use super::{ @@ -94,7 +93,7 @@ impl ReadExt for VmessStream { ) -> Poll> { self.read_buf.reserve(size); unsafe { self.read_buf.set_len(size) } - vmess_debug!( + debug!( "poll read exact: {}, read_pos: {}, buf: {}", size, self.read_pos, @@ -309,12 +308,12 @@ where mbuf.put_slice(data.as_slice()); let out = mbuf.freeze(); - vmess_debug!("send non aead handshake request for user {}", id.uuid); + debug!("send non aead handshake request for user {}", id.uuid); stream.write_all(&out).await?; } else { let out = header::seal_vmess_aead_header(id.cmd_key, buf.freeze().to_vec(), now) .map_err(map_io_error)?; - vmess_debug!("send aead handshake request for user {}", id.uuid); + debug!("send aead handshake request for user {}", id.uuid); stream.write_all(&out).await?; } @@ -334,12 +333,12 @@ where cx: &mut std::task::Context<'_>, buf: &mut tokio::io::ReadBuf<'_>, ) -> std::task::Poll> { - vmess_debug!("poll read with aead"); + debug!("poll read with aead"); loop { match self.read_state { ReadState::AeadWaitingHeaderSize => { - vmess_debug!("recv handshake response header"); + debug!("recv handshake response header"); let this = &mut *self; let resp_body_key = this.resp_body_key.clone(); let resp_body_iv = this.resp_body_iv.clone(); @@ -366,7 +365,7 @@ where this.read_state = ReadState::StreamWaitingLength; } else { - vmess_debug!("recv handshake response header length"); + debug!("recv handshake response header length"); ready!(this.poll_read_exact(cx, 18))?; let aead_response_header_length_encryption_key = &kdf::vmess_kdf_1_one_shot( @@ -402,7 +401,7 @@ where } ReadState::AeadWaitingHeader(header_size) => { - vmess_debug!("recv handshake header body: {}", header_size); + debug!("recv handshake header body: {}", header_size); let this = &mut *self; ready!(this.poll_read_exact(cx, header_size + 16))?; @@ -452,7 +451,7 @@ where } ReadState::StreamWaitingLength => { - vmess_debug!("recv stream length"); + debug!("checking recv stream data length"); let this = &mut *self; ready!(this.poll_read_exact(cx, 2))?; let len = u16::from_be_bytes(this.read_buf.split().as_ref().try_into().unwrap()) @@ -469,7 +468,7 @@ where } ReadState::StreamWaitingData(size) => { - vmess_debug!("recv stream data: {}", size); + debug!("got recv stream data length: {}", size); let this = &mut *self; ready!(this.poll_read_exact(cx, size))?; @@ -484,7 +483,7 @@ where } ReadState::StreamFlushingData(size) => { - vmess_debug!("flush stream data: {}", size); + debug!("chunking stream data: {}", size); let to_read = std::cmp::min(buf.remaining(), size); let payload = self.read_buf.split_to(to_read); buf.put_slice(&payload);