From 001e068167591eeee556717307a21ce8e766f245 Mon Sep 17 00:00:00 2001 From: Yuwei Ba Date: Sat, 4 Nov 2023 19:09:09 +1100 Subject: [PATCH] let runner report errors (#150) --- clash_lib/src/app/api/mod.rs | 7 +++++-- clash_lib/src/app/dns/server/mod.rs | 10 ++++------ clash_lib/src/app/inbound/manager.rs | 2 +- clash_lib/src/app/inbound/network_listener.rs | 10 ++++++---- clash_lib/src/lib.rs | 16 ++++++++++------ clash_lib/src/proxy/tun/inbound.rs | 14 +++++++++++--- 6 files changed, 37 insertions(+), 22 deletions(-) diff --git a/clash_lib/src/app/api/mod.rs b/clash_lib/src/app/api/mod.rs index db396c852..a9e2d4e5c 100644 --- a/clash_lib/src/app/api/mod.rs +++ b/clash_lib/src/app/api/mod.rs @@ -8,7 +8,7 @@ use http::Method; use tokio::sync::{broadcast::Sender, Mutex}; use tower_http::cors::{Any, CorsLayer}; use tower_http::services::ServeDir; -use tracing::info; +use tracing::{error, info}; use crate::{config::internal::config::Controller, GlobalState, Runner}; @@ -100,7 +100,10 @@ pub fn get_api_runner( axum::Server::bind(&addr) .serve(app.into_make_service_with_connect_info::()) .await - .unwrap(); + .map_err(|x| { + error!("API server error: {}", x); + crate::Error::Operation(format!("API server error: {}", x)) + }) }; Some(Box::pin(runner)) } else { diff --git a/clash_lib/src/app/dns/server/mod.rs b/clash_lib/src/app/dns/server/mod.rs index cfaae0331..cae0af371 100644 --- a/clash_lib/src/app/dns/server/mod.rs +++ b/clash_lib/src/app/dns/server/mod.rs @@ -204,11 +204,9 @@ pub async fn get_dns_listener(cfg: Config, resolver: ThreadSafeDNSResolver) -> O let mut l = DnsListener { server: s }; Some(Box::pin(async move { - match l.server.block_until_done().await { - Ok(_) => {} - Err(e) => { - warn!("dns server error: {}", e); - } - } + l.server.block_until_done().await.map_err(|x| { + warn!("dns server error: {}", x); + crate::Error::DNSError(format!("dns server error: {}", x)) + }) })) } diff --git a/clash_lib/src/app/inbound/manager.rs b/clash_lib/src/app/inbound/manager.rs index 9e0084af3..531d3fbb9 100644 --- a/clash_lib/src/app/inbound/manager.rs +++ b/clash_lib/src/app/inbound/manager.rs @@ -65,7 +65,7 @@ impl InboundManager { } Ok(Box::pin(async move { - futures::future::join_all(runners).await; + futures::future::select_all(runners).await.0 })) } diff --git a/clash_lib/src/app/inbound/network_listener.rs b/clash_lib/src/app/inbound/network_listener.rs index d3e5b947b..043554ff3 100644 --- a/clash_lib/src/app/inbound/network_listener.rs +++ b/clash_lib/src/app/inbound/network_listener.rs @@ -120,9 +120,10 @@ impl NetworkInboundListener { let tcp_listener = listener.clone(); runners.push( async move { - if let Err(e) = tcp_listener.listen_tcp().await { + tcp_listener.listen_tcp().await.map_err(|e| { warn!("handler tcp listen failed: {}", e); - } + e.into() + }) } .boxed(), ); @@ -133,9 +134,10 @@ impl NetworkInboundListener { let udp_listener = listener.clone(); runners.push( async move { - if let Err(e) = udp_listener.listen_udp().await { + udp_listener.listen_udp().await.map_err(|e| { warn!("handler udp listen failed: {}", e); - } + e.into() + }) } .boxed(), ); diff --git a/clash_lib/src/lib.rs b/clash_lib/src/lib.rs index ea5703c36..237eaa2bb 100644 --- a/clash_lib/src/lib.rs +++ b/clash_lib/src/lib.rs @@ -57,7 +57,7 @@ pub enum Error { Operation(String), } -pub type Runner = futures::future::BoxFuture<'static, ()>; +pub type Runner = futures::future::BoxFuture<'static, Result<(), Error>>; pub struct Options { pub config: Config, @@ -80,9 +80,9 @@ pub enum Config { pub struct GlobalState { log_level: LogLevel, - inbound_listener_handle: Option>, + inbound_listener_handle: Option>>, #[allow(dead_code)] - dns_listener_handle: Option>, + dns_listener_handle: Option>>, } pub struct RuntimeController { @@ -268,18 +268,22 @@ async fn start_async(opts: Options) -> Result<(), Error> { runners.push(Box::pin(async move { info!("receive shutdown signal"); shutdown_rx.recv().await; + Ok(()) })); tasks.push(Box::pin(async move { - futures::future::join_all(runners).await; + futures::future::select_all(runners).await.0 })); tasks.push(Box::pin(async move { let _ = tokio::signal::ctrl_c().await; + Ok(()) })); - futures::future::select_all(tasks).await; - Ok(()) + futures::future::select_all(tasks).await.0.map_err(|x| { + error!("runtime error: {}, shutting down", x); + x + }) } #[cfg(test)] diff --git a/clash_lib/src/proxy/tun/inbound.rs b/clash_lib/src/proxy/tun/inbound.rs index d4b42d5b5..c1d22f730 100644 --- a/clash_lib/src/proxy/tun/inbound.rs +++ b/clash_lib/src/proxy/tun/inbound.rs @@ -188,6 +188,8 @@ pub fn get_runner( } } } + + Err(Error::Operation("tun stopped unexpectedly 0".to_string())) })); // tun -> stack -> dispatcher @@ -206,6 +208,8 @@ pub fn get_runner( } } } + + Err(Error::Operation("tun stopped unexpectedly 1".to_string())) })); let dsp = dispatcher.clone(); @@ -218,14 +222,18 @@ pub fn get_runner( dsp.clone(), )); } + + Err(Error::Operation("tun stopped unexpectedly 2".to_string())) })); futs.push(Box::pin(async move { handle_inbound_datagram(udp_socket, dispatcher, resolver).await; + Err(Error::Operation("tun stopped unexpectedly 3".to_string())) })); - futures::future::join_all(futs).await; - - warn!("tun at {} stopped", tun_name); + futures::future::select_all(futs).await.0.map_err(|x| { + error!("tun error: {}. stopped", x); + x + }) }))) }