Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ibigbug committed Dec 2, 2023
1 parent e84fd5d commit ab332f0
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 52 deletions.
2 changes: 1 addition & 1 deletion clash/tests/data/config/rules.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ proxies:
tls: true
network: grpc
grpc-opts:
grpc-service-name: abc/Tun
grpc-service-name: abc

- name: vmess-altid
type: vmess
Expand Down
89 changes: 38 additions & 51 deletions clash_lib/src/proxy/transport/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use prost::encoding::encode_varint;
use tracing::log;

use std::fmt::Debug;
use std::future::Future;
use std::io;
use std::io::{Error, ErrorKind};
use std::pin::Pin;
Expand All @@ -28,6 +27,7 @@ impl GrpcStreamBuilder {
pub fn new(host: String, path: http::uri::PathAndQuery) -> Self {
Self { host, path }
}

fn req(&self) -> io::Result<Request<()>> {
let uri: Uri = {
Uri::builder()
Expand Down Expand Up @@ -55,13 +55,13 @@ impl GrpcStreamBuilder {
log::error!("http2 got err:{:?}", e);
}
});
return Ok(Box::new(GrpcStream::new(resp, send_stream)));
let recv_stream = resp.await.map_err(map_io_error)?.into_body();
return Ok(Box::new(GrpcStream::new(recv_stream, send_stream)));
}
}

pub struct GrpcStream {
resp_fut: h2::client::ResponseFuture,
recv: Option<RecvStream>,
recv: RecvStream,
send: SendStream<Bytes>,
buffer: BytesMut,
payload_len: u64,
Expand All @@ -70,7 +70,6 @@ pub struct GrpcStream {
impl Debug for GrpcStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GrpcStream")
.field("resp_fut", &self.resp_fut)
.field("recv", &self.recv)
.field("send", &self.send)
.field("buffer", &self.buffer)
Expand All @@ -80,10 +79,9 @@ impl Debug for GrpcStream {
}

impl GrpcStream {
pub fn new(resp_fut: h2::client::ResponseFuture, send: SendStream<Bytes>) -> Self {
pub fn new(recv: RecvStream, send: SendStream<Bytes>) -> Self {
Self {
resp_fut,
recv: None,
recv,
send,
buffer: BytesMut::with_capacity(1024 * 4),
payload_len: 0,
Expand Down Expand Up @@ -117,58 +115,47 @@ impl AsyncRead for GrpcStream {
cx: &mut Context<'_>,
dst: &mut tokio::io::ReadBuf<'_>,
) -> Poll<io::Result<()>> {
if self.recv.is_none() {
self.recv = Some(
ready!(Pin::new(&mut self.resp_fut).poll(cx))
.map_err(map_io_error)?
.into_body(),
);
log::debug!("receive grpc recv stream");
}
if !self.buffer.is_empty() {
let to_read = std::cmp::min(dst.remaining(), self.buffer.len());
let data = self.buffer.split_to(to_read);
self.payload_len -= to_read as u64;
dst.put_slice(&data[..to_read]);
return Poll::Ready(Ok(()));
};
Poll::Ready(
match ready!(Pin::new(&mut self.recv).as_pin_mut().unwrap().poll_data(cx)) {
Some(Ok(mut data)) => {
let before_parse_data_len = data.len();
while self.payload_len > 0 || data.len() > 6 {
if self.payload_len == 0 {
data.advance(6);
self.payload_len = decode_varint(&mut data).map_err(map_io_error)?;
}
let to_read = std::cmp::min(dst.remaining(), data.len());
let to_read = std::cmp::min(self.payload_len as usize, to_read);
if to_read == 0 {
self.buffer.extend_from_slice(&data[..]);
data.clear();
break;
}
dst.put_slice(&data[..to_read]);
self.payload_len -= to_read as u64;
data.advance(to_read);

Poll::Ready(match ready!(Pin::new(&mut self.recv).poll_data(cx)) {
Some(Ok(mut data)) => {
let before_parse_data_len = data.len();
while self.payload_len > 0 || data.len() > 6 {
if self.payload_len == 0 {
data.advance(6);
self.payload_len = decode_varint(&mut data).map_err(map_io_error)?;
}
let to_read = std::cmp::min(dst.remaining(), data.len());
let to_read = std::cmp::min(self.payload_len as usize, to_read);
if to_read == 0 {
self.buffer.extend_from_slice(&data[..]);
data.clear();
break;
}
// increase recv window
self.recv
.as_mut()
.unwrap()
.flow_control()
.release_capacity(before_parse_data_len - data.len())
.map_or_else(
|e| Err(Error::new(ErrorKind::ConnectionReset, e)),
|_| Ok(()),
)
dst.put_slice(&data[..to_read]);
self.payload_len -= to_read as u64;
data.advance(to_read);
}
// no more data frames
// maybe trailer
// or cancelled
_ => Ok(()),
},
)
// increase recv window
self.recv
.flow_control()
.release_capacity(before_parse_data_len - data.len())
.map_or_else(
|e| Err(Error::new(ErrorKind::ConnectionReset, e)),
|_| Ok(()),
)
}
// no more data frames
// maybe trailer
// or cancelled
_ => Ok(()),
})
}
}

Expand Down

0 comments on commit ab332f0

Please sign in to comment.