Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perf #102

Merged
merged 2 commits into from
Sep 29, 2023
Merged

Perf #102

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions clash/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
extern crate clash_lib as clash;

use clap::Parser;
use clash::TokioRuntime;
use std::path::PathBuf;

#[derive(Parser)]
Expand All @@ -27,6 +28,7 @@ fn main() {
clash::start(clash::Options {
config: clash::Config::File("".to_string(), cli.config.to_string_lossy().to_string()),
cwd: cli.directory.map(|x| x.to_string_lossy().to_string()),
rt: Some(TokioRuntime::MultiThread),
})
.unwrap();
}
3 changes: 2 additions & 1 deletion clash_lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ hyper-boring = { git = "https://github.com/Watfaq/boring.git", rev = "24c006f" }
tokio-boring = { git = "https://github.com/Watfaq/boring.git", rev = "24c006f" }
ip_network_table-deps-treebitmap = "0.5.0"
once_cell = "1.18.0"
arc-swap = "1.6.0"

# opentelemetry
opentelemetry = "0.20"
Expand All @@ -60,7 +61,7 @@ tower-http = { version = "0.4.0", features = ["fs", "trace", "cors"] }
chrono = { version = "0.4.26", features = ["serde"] }

tun = { git = "https://github.com/Watfaq/rust-tun.git", rev = "28936b6", features = ["async"] }
netstack-lwip = { git = "https://github.com/Watfaq/netstack-lwip.git", rev = "8c8c0b0" }
netstack-lwip = { git = "https://github.com/Watfaq/netstack-lwip.git", rev = "5ad376f" }
boringtun = { version = "0.6.0", features = ["device"] }

serde = { version = "1.0", features=["derive"] }
Expand Down
8 changes: 4 additions & 4 deletions clash_lib/src/app/api/handlers/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub fn routes(outbound_manager: ThreadSafeOutboundManager) -> Router<Arc<AppStat
}

async fn get_providers(State(state): State<ProviderState>) -> impl IntoResponse {
let outbound_manager = state.outbound_manager.read().await;
let outbound_manager = state.outbound_manager.clone();
let mut res = HashMap::new();

let mut providers = HashMap::new();
Expand All @@ -76,7 +76,7 @@ async fn find_proxy_provider_by_name<B>(
mut req: Request<B>,
next: Next<B>,
) -> Response {
let outbound_manager = state.outbound_manager.read().await;
let outbound_manager = state.outbound_manager.clone();
if let Some(provider) = outbound_manager.get_proxy_provider(&name) {
req.extensions_mut().insert(provider);
next.run(req).await
Expand Down Expand Up @@ -154,7 +154,7 @@ async fn get_proxy(
Extension(proxy): Extension<AnyOutboundHandler>,
State(state): State<ProviderState>,
) -> impl IntoResponse {
let outbound_manager = state.outbound_manager.read().await;
let outbound_manager = state.outbound_manager.clone();
axum::response::Json(outbound_manager.get_proxy(&proxy).await)
}

Expand All @@ -168,7 +168,7 @@ async fn get_proxy_delay(
Extension(proxy): Extension<AnyOutboundHandler>,
Query(q): Query<DelayRequest>,
) -> impl IntoResponse {
let outbound_manager = state.outbound_manager.read().await;
let outbound_manager = state.outbound_manager.clone();
let timeout = Duration::from_millis(q.timeout.into());
let n = proxy.name().to_owned();
match outbound_manager.url_test(proxy, &q.url, timeout).await {
Expand Down
10 changes: 5 additions & 5 deletions clash_lib/src/app/api/handlers/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub fn routes(
}

async fn get_proxies(State(state): State<ProxyState>) -> impl IntoResponse {
let outbound_manager = state.outbound_manager.read().await;
let outbound_manager = state.outbound_manager.clone();
let mut res = HashMap::new();
let proxies = outbound_manager.get_proxies().await;
res.insert("proxies".to_owned(), proxies);
Expand All @@ -63,7 +63,7 @@ async fn find_proxy_by_name<B>(
mut req: Request<B>,
next: Next<B>,
) -> Response {
let outbound_manager = state.outbound_manager.read().await;
let outbound_manager = state.outbound_manager.clone();
if let Some(proxy) = outbound_manager.get_outbound(&name) {
req.extensions_mut().insert(proxy);
next.run(req).await
Expand All @@ -76,7 +76,7 @@ async fn get_proxy(
Extension(proxy): Extension<AnyOutboundHandler>,
State(state): State<ProxyState>,
) -> impl IntoResponse {
let outbound_manager = state.outbound_manager.read().await;
let outbound_manager = state.outbound_manager.clone();
axum::response::Json(outbound_manager.get_proxy(&proxy).await)
}

Expand All @@ -91,7 +91,7 @@ async fn update_proxy(
Extension(proxy): Extension<AnyOutboundHandler>,
Json(payload): Json<UpdateProxyRequest>,
) -> impl IntoResponse {
let outbound_manager = state.outbound_manager.read().await;
let outbound_manager = state.outbound_manager.clone();
if let Some(ctrl) = outbound_manager.get_selector_control(proxy.name()) {
match ctrl.lock().await.select(&payload.name).await {
Ok(_) => {
Expand Down Expand Up @@ -130,7 +130,7 @@ async fn get_proxy_delay(
Extension(proxy): Extension<AnyOutboundHandler>,
Query(q): Query<DelayRequest>,
) -> impl IntoResponse {
let outbound_manager = state.outbound_manager.read().await;
let outbound_manager = state.outbound_manager.clone();
let timeout = Duration::from_millis(q.timeout.into());
let n = proxy.name().to_owned();
match outbound_manager.url_test(proxy, &q.url, timeout).await {
Expand Down
37 changes: 18 additions & 19 deletions clash_lib/src/app/dispatcher/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::config::internal::proxy::PROXY_GLOBAL;
use crate::proxy::datagram::UdpPacket;
use crate::proxy::AnyInboundDatagram;
use crate::session::Session;
use arc_swap::ArcSwap;
use futures::SinkExt;
use futures::StreamExt;
use std::collections::HashMap;
Expand All @@ -17,7 +18,6 @@ use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use tokio::io::{copy_bidirectional, AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::sync::Mutex;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tracing::info_span;
Expand All @@ -34,7 +34,7 @@ pub struct Dispatcher {
outbound_manager: ThreadSafeOutboundManager,
router: ThreadSafeRouter,
resolver: ThreadSafeDNSResolver,
mode: Arc<RwLock<RunMode>>,
mode: ArcSwap<RunMode>,

manager: Arc<Manager>,
}
Expand All @@ -58,20 +58,19 @@ impl Dispatcher {
outbound_manager,
router,
resolver,
mode: Arc::new(RwLock::new(mode)),
mode: Arc::new(mode).into(),
manager: statistics_manager,
}
}

pub async fn set_mode(&self, mode: RunMode) {
info!("run mode switched to {}", mode);
let mut m = self.mode.write().await;
*m = mode;

self.mode.store(Arc::new(mode));
}

pub async fn get_mode(&self) -> RunMode {
let mode = self.mode.read().await;
mode.clone()
**self.mode.load()
}

#[instrument(skip(lhs))]
Expand Down Expand Up @@ -107,15 +106,15 @@ impl Dispatcher {
sess
};

let mode = self.mode.read().await;
let mode = **self.mode.load();
debug!("dispatching {} with mode {}", sess, mode);
let (outbound_name, rule) = match *mode {
let (outbound_name, rule) = match mode {
RunMode::Global => (PROXY_GLOBAL, None),
RunMode::Rule => self.router.match_route(&sess).await,
RunMode::Direct => (PROXY_DIRECT, None),
};

let mgr = self.outbound_manager.read().await;
let mgr = self.outbound_manager.clone();
let handler = mgr.get_outbound(outbound_name).unwrap_or_else(|| {
debug!("unknown rule: {}, fallback to direct", outbound_name);
mgr.get_outbound(PROXY_DIRECT).unwrap()
Expand Down Expand Up @@ -186,7 +185,7 @@ impl Dispatcher {
let router = self.router.clone();
let outbound_manager = self.outbound_manager.clone();
let resolver = self.resolver.clone();
let mode = self.mode.clone();
let mode = **self.mode.load();
let manager = self.manager.clone();

let (mut local_w, mut local_r) = udp_inbound.split();
Expand Down Expand Up @@ -236,10 +235,10 @@ impl Dispatcher {
let mut packet = packet;
packet.dst_addr = sess.destination.clone();

let mode = mode.read().await;
let mode = mode.clone();
trace!("dispatching {} with mode {}", sess, mode);

let (outbound_name, rule) = match *mode {
let (outbound_name, rule) = match mode {
RunMode::Global => (PROXY_GLOBAL, None),
RunMode::Rule => router.match_route(&sess).await,
RunMode::Direct => (PROXY_DIRECT, None),
Expand All @@ -249,7 +248,7 @@ impl Dispatcher {

let remote_receiver_w = remote_receiver_w.clone();

let mgr = outbound_manager.read().await;
let mgr = outbound_manager.clone();
let handler = mgr.get_outbound(&outbound_name).unwrap_or_else(|| {
debug!("unknown rule: {}, fallback to direct", outbound_name);
mgr.get_outbound(PROXY_DIRECT).unwrap()
Expand Down Expand Up @@ -381,7 +380,7 @@ impl Dispatcher {
type OutboundPacketSender = tokio::sync::mpsc::Sender<UdpPacket>; // outbound packet sender

struct TimeoutUdpSessionManager {
map: Arc<Mutex<OutboundHandleMap>>,
map: Arc<RwLock<OutboundHandleMap>>,

cleaner: Option<JoinHandle<()>>,
}
Expand All @@ -395,7 +394,7 @@ impl Drop for TimeoutUdpSessionManager {

impl TimeoutUdpSessionManager {
fn new() -> Self {
let map = Arc::new(Mutex::new(OutboundHandleMap::new()));
let map = Arc::new(RwLock::new(OutboundHandleMap::new()));
let timeout = Duration::from_secs(10);

let map_cloned = map.clone();
Expand All @@ -405,7 +404,7 @@ impl TimeoutUdpSessionManager {
tokio::time::sleep(Duration::from_secs(10)).await;

trace!("timeout udp session cleaner scanning");
let mut g = map_cloned.lock().await;
let mut g = map_cloned.write().await;
let mut alived = 0;
let mut expired = 0;
g.0.retain(|k, x| {
Expand Down Expand Up @@ -445,7 +444,7 @@ impl TimeoutUdpSessionManager {
send_handle: JoinHandle<()>,
sender: OutboundPacketSender,
) {
let mut map = self.map.lock().await;
let mut map = self.map.write().await;
map.insert(outbound_name, src_addr, recv_handle, send_handle, sender);
}

Expand All @@ -454,7 +453,7 @@ impl TimeoutUdpSessionManager {
outbound_name: &str,
src_addr: SocketAddr,
) -> Option<OutboundPacketSender> {
let mut map = self.map.lock().await;
let mut map = self.map.write().await;
map.get_outbound_sender_mut(outbound_name, src_addr)
}
}
Expand Down
2 changes: 1 addition & 1 deletion clash_lib/src/app/outbound/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub struct OutboundManager {

static DEFAULT_LATENCY_TEST_URL: &str = "http://www.gstatic.com/generate_204";

pub type ThreadSafeOutboundManager = Arc<RwLock<OutboundManager>>;
pub type ThreadSafeOutboundManager = Arc<OutboundManager>;

impl OutboundManager {
pub async fn new(
Expand Down
11 changes: 10 additions & 1 deletion clash_lib/src/app/remote_content_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,16 @@ struct ProxyState {
pub struct ProxyManager {
proxy_state: Arc<RwLock<HashMap<String, ProxyState>>>,
dns_resolver: ThreadSafeDNSResolver,

connector_map: Arc<RwLock<HashMap<String, HttpsConnector<LocalConnector>>>>,
}

impl ProxyManager {
pub fn new(dns_resolver: ThreadSafeDNSResolver) -> Self {
Self {
dns_resolver,
proxy_state: Arc::new(RwLock::new(HashMap::new())),
connector_map: Arc::new(RwLock::new(HashMap::new())),
}
}

Expand Down Expand Up @@ -165,7 +168,13 @@ impl ProxyManager {
ssl.set_alpn_protos(b"\x02h2\x08http/1.1")
.map_err(map_io_error)?;

let connector = HttpsConnector::with_connector(connector, ssl).map_err(map_io_error)?;
let mut g = self.connector_map.write().await;
let connector = g
.entry(name.clone())
.or_insert(HttpsConnector::with_connector(connector, ssl).map_err(map_io_error)?);

let connector = connector.clone();

let client = hyper::Client::builder().build::<_, hyper::Body>(connector);

let req = Request::get(url)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::{collections::HashMap, sync::Arc};

use async_trait::async_trait;
use erased_serde::Serialize;
use tokio::sync::Mutex;
use tracing::debug;

use crate::{
Expand All @@ -16,14 +15,10 @@ use crate::{

use super::proxy_provider::ProxyProvider;

struct Inner {
hc: Arc<HealthCheck>,
}

pub struct PlainProvider {
name: String,
proxies: Vec<AnyOutboundHandler>,
inner: Arc<Mutex<Inner>>,
hc: Arc<HealthCheck>,
}

impl PlainProvider {
Expand All @@ -46,11 +41,7 @@ impl PlainProvider {
});
}

Ok(Self {
name,
proxies,
inner: Arc::new(Mutex::new(Inner { hc })),
})
Ok(Self { name, proxies, hc })
}
}

Expand Down Expand Up @@ -93,10 +84,10 @@ impl ProxyProvider for PlainProvider {
}

async fn touch(&self) {
self.inner.lock().await.hc.touch().await;
self.hc.touch().await;
}

async fn healthcheck(&self) {
self.inner.lock().await.hc.check().await;
self.hc.check().await;
}
}
Loading
Loading