Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

let runner report errors #150

Merged
merged 1 commit into from
Nov 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions clash_lib/src/app/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use http::Method;
use tokio::sync::{broadcast::Sender, Mutex};
use tower_http::cors::{Any, CorsLayer};
use tower_http::services::ServeDir;
use tracing::info;
use tracing::{error, info};

use crate::{config::internal::config::Controller, GlobalState, Runner};

Expand Down Expand Up @@ -100,7 +100,10 @@ pub fn get_api_runner(
axum::Server::bind(&addr)
.serve(app.into_make_service_with_connect_info::<SocketAddr>())
.await
.unwrap();
.map_err(|x| {
error!("API server error: {}", x);
crate::Error::Operation(format!("API server error: {}", x))
})
};
Some(Box::pin(runner))
} else {
Expand Down
10 changes: 4 additions & 6 deletions clash_lib/src/app/dns/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,9 @@ pub async fn get_dns_listener(cfg: Config, resolver: ThreadSafeDNSResolver) -> O
let mut l = DnsListener { server: s };

Some(Box::pin(async move {
match l.server.block_until_done().await {
Ok(_) => {}
Err(e) => {
warn!("dns server error: {}", e);
}
}
l.server.block_until_done().await.map_err(|x| {
warn!("dns server error: {}", x);
crate::Error::DNSError(format!("dns server error: {}", x))
})
}))
}
2 changes: 1 addition & 1 deletion clash_lib/src/app/inbound/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl InboundManager {
}

Ok(Box::pin(async move {
futures::future::join_all(runners).await;
futures::future::select_all(runners).await.0
}))
}

Expand Down
10 changes: 6 additions & 4 deletions clash_lib/src/app/inbound/network_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,10 @@ impl NetworkInboundListener {
let tcp_listener = listener.clone();
runners.push(
async move {
if let Err(e) = tcp_listener.listen_tcp().await {
tcp_listener.listen_tcp().await.map_err(|e| {
warn!("handler tcp listen failed: {}", e);
}
e.into()
})
}
.boxed(),
);
Expand All @@ -133,9 +134,10 @@ impl NetworkInboundListener {
let udp_listener = listener.clone();
runners.push(
async move {
if let Err(e) = udp_listener.listen_udp().await {
udp_listener.listen_udp().await.map_err(|e| {
warn!("handler udp listen failed: {}", e);
}
e.into()
})
}
.boxed(),
);
Expand Down
16 changes: 10 additions & 6 deletions clash_lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub enum Error {
Operation(String),
}

pub type Runner = futures::future::BoxFuture<'static, ()>;
pub type Runner = futures::future::BoxFuture<'static, Result<(), Error>>;

pub struct Options {
pub config: Config,
Expand All @@ -80,9 +80,9 @@ pub enum Config {

pub struct GlobalState {
log_level: LogLevel,
inbound_listener_handle: Option<JoinHandle<()>>,
inbound_listener_handle: Option<JoinHandle<Result<(), Error>>>,
#[allow(dead_code)]
dns_listener_handle: Option<JoinHandle<()>>,
dns_listener_handle: Option<JoinHandle<Result<(), Error>>>,
}

pub struct RuntimeController {
Expand Down Expand Up @@ -268,18 +268,22 @@ async fn start_async(opts: Options) -> Result<(), Error> {
runners.push(Box::pin(async move {
info!("receive shutdown signal");
shutdown_rx.recv().await;
Ok(())
}));

tasks.push(Box::pin(async move {
futures::future::join_all(runners).await;
futures::future::select_all(runners).await.0
}));

tasks.push(Box::pin(async move {
let _ = tokio::signal::ctrl_c().await;
Ok(())
}));

futures::future::select_all(tasks).await;
Ok(())
futures::future::select_all(tasks).await.0.map_err(|x| {
error!("runtime error: {}, shutting down", x);
x
})
}

#[cfg(test)]
Expand Down
14 changes: 11 additions & 3 deletions clash_lib/src/proxy/tun/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ pub fn get_runner(
}
}
}

Err(Error::Operation("tun stopped unexpectedly 0".to_string()))
}));

// tun -> stack -> dispatcher
Expand All @@ -206,6 +208,8 @@ pub fn get_runner(
}
}
}

Err(Error::Operation("tun stopped unexpectedly 1".to_string()))
}));

let dsp = dispatcher.clone();
Expand All @@ -218,14 +222,18 @@ pub fn get_runner(
dsp.clone(),
));
}

Err(Error::Operation("tun stopped unexpectedly 2".to_string()))
}));

futs.push(Box::pin(async move {
handle_inbound_datagram(udp_socket, dispatcher, resolver).await;
Err(Error::Operation("tun stopped unexpectedly 3".to_string()))
}));

futures::future::join_all(futs).await;

warn!("tun at {} stopped", tun_name);
futures::future::select_all(futs).await.0.map_err(|x| {
error!("tun error: {}. stopped", x);
x
})
})))
}
Loading