Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ibigbug committed Dec 2, 2023
1 parent 6ea6f8b commit 19d2810
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 27 deletions.
1 change: 0 additions & 1 deletion clash/tests/data/config/rules.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ dns:
- 114.114.114.114 # default value
- 1.1.1.1 # default value
- tls://1.1.1.1:853 # DNS over TLS
- https://1.1.1.1/dns-query # DNS over HTTPS
# - dhcp://en0 # dns from dhcp

allow-lan: true
Expand Down
17 changes: 13 additions & 4 deletions clash_lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@ use common::http::new_http_client;
use common::mmdb;
use config::def::LogLevel;
use proxy::tun::get_tun_runner;

use state::InitCell;
use std::io;
use std::path::PathBuf;
use tokio::task::JoinHandle;
use tracing::error;
use tracing::info;

use std::sync::Arc;
use thiserror::Error;
use tokio::sync::{broadcast, mpsc, Mutex};
use tokio::task::JoinHandle;
use tracing::debug;
use tracing::error;
use tracing::info;

mod app;
mod common;
Expand Down Expand Up @@ -161,9 +162,12 @@ async fn start_async(opts: Options) -> Result<(), Error> {
let mut tasks = Vec::<Runner>::new();
let mut runners = Vec::new();

debug!("initializing dns resolver");
let system_resolver =
Arc::new(SystemResolver::new().map_err(|x| Error::DNSError(x.to_string()))?);
let client = new_http_client(system_resolver).map_err(|x| Error::DNSError(x.to_string()))?;

debug!("initializing mmdb");
let mmdb = Arc::new(
mmdb::MMDB::new(
cwd.join(&config.general.mmdb),
Expand All @@ -173,13 +177,15 @@ async fn start_async(opts: Options) -> Result<(), Error> {
.await?,
);

debug!("initializing cache store");
let cache_store = profile::ThreadSafeCacheFile::new(
cwd.join("cache.db").as_path().to_str().unwrap(),
config.profile.store_selected,
);

let dns_resolver = dns::Resolver::new(&config.dns, cache_store.clone(), mmdb.clone()).await;

debug!("initializing outbound manager");
let outbound_manager = Arc::new(
OutboundManager::new(
config
Expand Down Expand Up @@ -207,6 +213,7 @@ async fn start_async(opts: Options) -> Result<(), Error> {
.await?,
);

debug!("initializing router");
let router = Arc::new(
Router::new(
config.rules,
Expand All @@ -230,6 +237,7 @@ async fn start_async(opts: Options) -> Result<(), Error> {

let authenticator = Arc::new(auth::PlainAuthenticator::new(config.users));

debug!("initializing inbound manager");
let inbound_manager = Arc::new(Mutex::new(InboundManager::new(
config.general.inbound,
dispatcher.clone(),
Expand All @@ -244,6 +252,7 @@ async fn start_async(opts: Options) -> Result<(), Error> {
runners.push(tun_runner);
}

debug!("initializing dns listener");
let dns_listener_handle = dns::get_dns_listener(config.dns, dns_resolver.clone())
.await
.map(|l| tokio::spawn(l));
Expand Down
41 changes: 19 additions & 22 deletions clash_lib/src/proxy/transport/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ impl GrpcStreamBuilder {
}

pub async fn proxy_stream(&self, stream: AnyStream) -> io::Result<AnyStream> {
let (mut client, h2) = h2::client::handshake(stream).await.map_err(map_io_error)?;
let (client, h2) = h2::client::handshake(stream).await.map_err(map_io_error)?;
let mut client = client.ready().await.map_err(map_io_error)?;

let req = self.req()?;
let (resp, send_stream) = client.send_request(req, false).map_err(map_io_error)?;
tokio::spawn(async move {
Expand Down Expand Up @@ -92,13 +94,6 @@ impl GrpcStream {
}
}

fn reserve_send_capacity(&mut self, data: &[u8]) {
let mut buf = [0u8; 10];
let mut buf = &mut buf[..];
encode_varint(data.len() as u64, &mut buf);
self.send.reserve_capacity(6 + 10 - buf.len() + data.len());
}

fn encode_buf(&self, data: &[u8]) -> Bytes {
let mut buf = BytesMut::with_capacity(16 + data.len());
let grpc_header = [0u8; 5];
Expand Down Expand Up @@ -128,8 +123,8 @@ impl AsyncRead for GrpcStream {
log::debug!("receive grpc recv stream");
}

if !self.buffer.is_empty() {
let to_read = std::cmp::min(buf.remaining(), self.buffer.len());
if self.payload_len > 0 {
let to_read = std::cmp::min(buf.remaining(), self.payload_len as usize);
let data = self.buffer.split_to(to_read);
self.payload_len -= to_read as u64;
buf.put_slice(&data[..to_read]);
Expand All @@ -138,14 +133,18 @@ impl AsyncRead for GrpcStream {

Poll::Ready(
match ready!(Pin::new(&mut self.recv).as_pin_mut().unwrap().poll_data(cx)) {
Some(Ok(mut data)) => {
let before_parse_data_len = data.len();
Some(Ok(b)) => {
let mut data = BytesMut::with_capacity(self.buffer.len() + b.len());
data.extend_from_slice(&self.buffer[..]);
data.extend_from_slice(&b[..]);
self.buffer.clear();

while self.payload_len > 0 || data.len() > 6 {
if self.payload_len == 0 {
data.advance(6);
self.payload_len = decode_varint(&mut data).map_err(map_io_error)?;
}
let to_read = std::cmp::min(buf.remaining(), data.len());
let to_read = std::cmp::min(buf.remaining(), b.len());
let to_read = std::cmp::min(self.payload_len as usize, to_read);
if to_read == 0 {
self.buffer.extend_from_slice(&data[..]);
Expand All @@ -156,12 +155,12 @@ impl AsyncRead for GrpcStream {
self.payload_len -= to_read as u64;
data.advance(to_read);
}
// increase recv window

self.recv
.as_mut()
.unwrap()
.flow_control()
.release_capacity(before_parse_data_len - data.len())
.release_capacity(b.len())
.map_or_else(
|e| {
debug!("grpc flow control error: {}", e);
Expand All @@ -170,9 +169,6 @@ impl AsyncRead for GrpcStream {
|_| Ok(()),
)
}
// no more data frames
// maybe trailer
// or cancelled
_ => Ok(()),
},
)
Expand All @@ -186,17 +182,18 @@ impl AsyncWrite for GrpcStream {
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.reserve_send_capacity(buf);
let encoded_buf = self.encode_buf(buf);
self.send.reserve_capacity(encoded_buf.len());

Poll::Ready(match ready!(self.send.poll_capacity(cx)) {
Some(Ok(to_write)) => {
let encoded_buf = self.encode_buf(buf);
Some(Ok(cap)) => {
let overhead_len = encoded_buf.len() - buf.len();
self.send.send_data(encoded_buf, false).map_or_else(
|e| {
debug!("grpc write error: {}", e);
Err(Error::new(ErrorKind::BrokenPipe, e))
},
|_| Ok(to_write),
|_| Ok(cap - overhead_len),
)
}
Some(Err(e)) => {
Expand Down

0 comments on commit 19d2810

Please sign in to comment.