Skip to content

Commit

Permalink
some progress (#137)
Browse files Browse the repository at this point in the history
  • Loading branch information
ibigbug authored Oct 27, 2023
1 parent 8e8b6b9 commit 4a82dec
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 27 deletions.
7 changes: 5 additions & 2 deletions clash_lib/src/app/api/handlers/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use axum::{
Json, Router,
};

use http::StatusCode;
use http::{header, HeaderMap, StatusCode};
use serde::Deserialize;

use crate::{
Expand Down Expand Up @@ -133,15 +133,18 @@ async fn get_proxy_delay(
let outbound_manager = state.outbound_manager.clone();
let timeout = Duration::from_millis(q.timeout.into());
let n = proxy.name().to_owned();
let mut headers = HeaderMap::new();
headers.insert(header::CONNECTION, "close".parse().unwrap());
match outbound_manager.url_test(proxy, &q.url, timeout).await {
Ok((delay, mean_delay)) => {
let mut r = HashMap::new();
r.insert("delay".to_owned(), delay);
r.insert("meanDelay".to_owned(), mean_delay);
axum::response::Json(delay).into_response()
(headers, axum::response::Json(delay)).into_response()
}
Err(err) => (
StatusCode::BAD_REQUEST,
headers,
format!("get delay for {} failed with error: {}", n, err),
)
.into_response(),
Expand Down
4 changes: 2 additions & 2 deletions clash_lib/src/app/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@ pub fn setup_logging(
.pretty()
.with_file(true)
.with_line_number(true)
.with_writer(std::io::stdout)
.with_writer(move || -> Box<dyn std::io::Write> {
Box::new(W(appender.clone()))
}),
})
.with_writer(std::io::stdout),
)
.with(ios_os_log);

Expand Down
1 change: 0 additions & 1 deletion clash_lib/src/app/outbound/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ impl OutboundManager {
timeout: Duration,
) -> std::io::Result<(u16, u16)> {
let proxy_manager = self.proxy_manager.clone();

proxy_manager.url_test(proxy, url, Some(timeout)).await
}

Expand Down
5 changes: 4 additions & 1 deletion clash_lib/src/app/profile/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ impl ThreadSafeCacheFile {
let store = store_clone;
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
let db = store.read().await.db.clone();
let r = store.read().await;
let db = r.db.clone();
drop(r);

let s = match serde_yaml::to_string(&db) {
Ok(s) => s,
Err(e) => {
Expand Down
7 changes: 4 additions & 3 deletions clash_lib/src/app/remote_content_manager/healthcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,11 @@ impl HealthCheck {
_ = ticker.tick() => {
pm_debug!("healthcheck ticking: {}, lazy: {}", url, lazy);
let now = tokio::time::Instant::now();
let mut inner = inner.write().await;
if !lazy || now.duration_since(inner.last_check).as_secs() >= interval {
let r = inner.read().await;
if !lazy || now.duration_since(r.last_check).as_secs() >= interval {
proxy_manager.check(&proxies, &url, None).await;
inner.last_check = now;
let mut w = inner.write().await;
w.last_check = now;
}
},
}
Expand Down
36 changes: 18 additions & 18 deletions clash_lib/src/app/remote_content_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use boring::ssl::{SslConnector, SslMethod};

use chrono::{DateTime, Utc};

use futures::{stream::FuturesUnordered, StreamExt};
use http::{Request, Version};
use hyper_boring::HttpsConnector;
use serde::Serialize;
Expand Down Expand Up @@ -85,24 +86,22 @@ impl ProxyManager {
url: &str,
timeout: Option<Duration>,
) {
for proxies in proxies.chunks(10) {
let mut futs = vec![];

for proxy in proxies {
let proxy = proxy.clone();
let url = url.to_owned();
let timeout = timeout.clone();
let manager = self.clone();
futs.push(tokio::spawn(async move {
manager
.url_test(proxy, url.as_str(), timeout)
.await
.map_err(|e| debug!("healthcheck failed: {}", e))
}));
}

futures::future::join_all(futs).await;
let mut futs = vec![];
for proxy in proxies {
let proxy = proxy.clone();
let url = url.to_owned();
let timeout = timeout.clone();
let manager = self.clone();
futs.push(tokio::spawn(async move {
manager
.url_test(proxy, url.as_str(), timeout)
.await
.map_err(|e| debug!("healthcheck failed: {}", e))
}));
}

let futs: FuturesUnordered<_> = futs.into_iter().collect();
let _: Vec<_> = futs.collect().await;
}

pub async fn alive(&self, name: &str) -> bool {
Expand Down Expand Up @@ -157,7 +156,7 @@ impl ProxyManager {
);
let name = proxy.name().to_owned();
let name_clone = name.clone();
let default_timeout = Duration::from_secs(30);
let default_timeout = Duration::from_secs(5);

let dns_resolver = self.dns_resolver.clone();
let tester = async move {
Expand All @@ -174,6 +173,7 @@ impl ProxyManager {
.or_insert(HttpsConnector::with_connector(connector, ssl).map_err(map_io_error)?);

let connector = connector.clone();
drop(g);

let client = hyper::Client::builder().build::<_, hyper::Body>(connector);

Expand Down

0 comments on commit 4a82dec

Please sign in to comment.