Skip to content

Commit

Permalink
display udp proxy chain names (#134)
Browse files Browse the repository at this point in the history
  • Loading branch information
ibigbug authored Oct 21, 2023
1 parent e3824b7 commit 59d85fd
Show file tree
Hide file tree
Showing 16 changed files with 189 additions and 83 deletions.
23 changes: 14 additions & 9 deletions clash/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ extern crate clash_lib as clash;

use clap::Parser;
use clash::TokioRuntime;
use std::path::PathBuf;
use std::path::{Path, PathBuf};

#[derive(Parser)]
#[clap(author, version, about, long_about = None)]
Expand All @@ -22,15 +22,20 @@ struct Cli {

fn main() {
let cli = Cli::parse();
let file = cli
.directory
.as_ref()
.unwrap_or(&std::env::current_dir().unwrap())
.join(cli.config)
.to_string_lossy()
.to_string();

if !Path::new(&file).exists() {
panic!("config file not found: {}", file);
}

clash::start(clash::Options {
config: clash::Config::File(
cli.directory
.as_ref()
.unwrap_or(&std::env::current_dir().unwrap())
.join(cli.config)
.to_string_lossy()
.to_string(),
),
config: clash::Config::File(file),
cwd: cli.directory.map(|x| x.to_string_lossy().to_string()),
rt: Some(TokioRuntime::MultiThread),
log_file: None,
Expand Down
3 changes: 3 additions & 0 deletions clash_lib/src/app/dispatcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ mod tracked;

pub use dispatcher::Dispatcher;
pub use statistics_manager::Manager as StatisticsManager;
pub use tracked::BoxedChainedDatagram;
pub use tracked::BoxedChainedStream;
pub use tracked::ChainedDatagram;
pub use tracked::ChainedDatagramWrapper;
pub use tracked::ChainedStream;
pub use tracked::ChainedStreamWrapper;
pub use tracked::TrackedDatagram;
Expand Down
92 changes: 89 additions & 3 deletions clash_lib/src/app/dispatcher/tracked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tracing::debug;

use crate::{
app::router::RuleMatcher,
proxy::{datagram::UdpPacket, AnyOutboundDatagram, ProxyStream},
proxy::{datagram::UdpPacket, OutboundDatagram, ProxyStream},
session::Session,
};

Expand Down Expand Up @@ -263,21 +263,106 @@ impl AsyncWrite for TrackedStream {
}
}

#[async_trait::async_trait]
pub trait ChainedDatagram:
OutboundDatagram<UdpPacket, Item = UdpPacket, Error = std::io::Error>
{
fn chain(&self) -> &ProxyChain;
async fn append_to_chain(&self, name: &str);
}

pub type BoxedChainedDatagram = Box<dyn ChainedDatagram + Send + Sync>;

#[async_trait::async_trait]
impl<T> ChainedDatagram for ChainedDatagramWrapper<T>
where
T: Sink<UdpPacket, Error = std::io::Error> + Unpin + Send + Sync + 'static,
T: Stream<Item = UdpPacket>,
{
fn chain(&self) -> &ProxyChain {
&self.chain
}

async fn append_to_chain(&self, name: &str) {
self.chain.push(name.to_owned()).await;
}
}

#[derive(Debug)]
pub struct ChainedDatagramWrapper<T> {
inner: T,
chain: ProxyChain,
}

impl<T> ChainedDatagramWrapper<T> {
pub fn new(inner: T) -> Self {
Self {
inner,
chain: ProxyChain::default(),
}
}
}

impl<T> Stream for ChainedDatagramWrapper<T>
where
T: Stream<Item = UdpPacket> + Unpin,
{
type Item = UdpPacket;

fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.get_mut().inner).poll_next(cx)
}
}
impl<T> Sink<UdpPacket> for ChainedDatagramWrapper<T>
where
T: Sink<UdpPacket, Error = std::io::Error> + Unpin,
{
type Error = std::io::Error;
fn poll_ready(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.get_mut().inner).poll_ready(cx)
}

fn start_send(self: Pin<&mut Self>, item: UdpPacket) -> Result<(), Self::Error> {
Pin::new(&mut self.get_mut().inner).start_send(item)
}

fn poll_flush(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.get_mut().inner).poll_flush(cx)
}

fn poll_close(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.get_mut().inner).poll_close(cx)
}
}

pub struct TrackedDatagram {
inner: AnyOutboundDatagram,
inner: BoxedChainedDatagram,
manager: Arc<Manager>,
tracker: Arc<TrackerInfo>,
close_notify: Receiver<()>,
}

impl TrackedDatagram {
pub async fn new(
inner: AnyOutboundDatagram,
inner: BoxedChainedDatagram,
manager: Arc<Manager>,
sess: Session,
rule: Option<&Box<dyn RuleMatcher>>,
) -> Self {
let uuid = uuid::Uuid::new_v4();
let chain = inner.chain().clone();
let (tx, rx) = tokio::sync::oneshot::channel();
let s = Self {
inner,
Expand All @@ -292,6 +377,7 @@ impl TrackedDatagram {
.map(|x| x.type_name().to_owned())
.unwrap_or_default(),
rule_payload: rule.map(|x| x.payload().to_owned()).unwrap_or_default(),
proxy_chain_holder: chain.clone(),
..Default::default()
}),
close_notify: rx,
Expand Down
17 changes: 12 additions & 5 deletions clash_lib/src/proxy/direct/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use crate::app::dispatcher::{BoxedChainedStream, ChainedStream, ChainedStreamWrapper};
use crate::app::dispatcher::{
BoxedChainedDatagram, BoxedChainedStream, ChainedDatagram, ChainedDatagramWrapper,
ChainedStream, ChainedStreamWrapper,
};
use crate::app::dns::ThreadSafeDNSResolver;
use crate::config::internal::proxy::PROXY_DIRECT;
use crate::proxy::datagram::OutboundDatagramImpl;
use crate::proxy::utils::{new_tcp_stream, new_udp_socket};
use crate::proxy::{AnyOutboundDatagram, AnyOutboundHandler, AnyStream, OutboundHandler};
use crate::proxy::{AnyOutboundHandler, AnyStream, OutboundHandler};
use crate::session::{Session, SocksAddr};

use async_trait::async_trait;
Expand Down Expand Up @@ -72,14 +75,18 @@ impl OutboundHandler for Handler {
&self,
sess: &Session,
resolver: ThreadSafeDNSResolver,
) -> std::io::Result<AnyOutboundDatagram> {
new_udp_socket(
) -> std::io::Result<BoxedChainedDatagram> {
let d = new_udp_socket(
None,
sess.iface.as_ref(),
#[cfg(any(target_os = "linux", target_os = "android"))]
None,
)
.await
.map(|x| OutboundDatagramImpl::new(x, resolver))
.map(|x| OutboundDatagramImpl::new(x, resolver))?;

let d = ChainedDatagramWrapper::new(d);
d.append_to_chain(self.name()).await;
Ok(Box::new(d))
}
}
8 changes: 4 additions & 4 deletions clash_lib/src/proxy/fallback/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use tracing::debug;

use crate::{
app::{
dispatcher::BoxedChainedStream,
dispatcher::{BoxedChainedDatagram, BoxedChainedStream},
dns::ThreadSafeDNSResolver,
remote_content_manager::{
providers::proxy_provider::proxy_provider::ThreadSafeProxyProvider, ProxyManager,
Expand All @@ -15,8 +15,8 @@ use crate::{
};

use super::{
utils::provider_helper::get_proxies_from_providers, AnyOutboundDatagram, AnyOutboundHandler,
AnyStream, CommonOption, OutboundHandler, OutboundType,
utils::provider_helper::get_proxies_from_providers, AnyOutboundHandler, AnyStream,
CommonOption, OutboundHandler, OutboundType,
};

#[derive(Default, Clone)]
Expand Down Expand Up @@ -117,7 +117,7 @@ impl OutboundHandler for Handler {
&self,
sess: &Session,
resolver: ThreadSafeDNSResolver,
) -> io::Result<AnyOutboundDatagram> {
) -> io::Result<BoxedChainedDatagram> {
let proxy = self.find_alive_proxy(true).await;
proxy.connect_datagram(sess, resolver).await
}
Expand Down
9 changes: 5 additions & 4 deletions clash_lib/src/proxy/loadbalance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use tracing::debug;

use crate::{
app::{
dispatcher::BoxedChainedStream, dns::ThreadSafeDNSResolver,
dispatcher::{BoxedChainedDatagram, BoxedChainedStream},
dns::ThreadSafeDNSResolver,
remote_content_manager::providers::proxy_provider::ThreadSafeProxyProvider,
},
config::internal::proxy::LoadBalanceStrategy,
Expand All @@ -18,8 +19,8 @@ use crate::{
use self::helpers::{strategy_consistent_hashring, strategy_rr, StrategyFn};

use super::{
utils::provider_helper::get_proxies_from_providers, AnyOutboundDatagram, AnyOutboundHandler,
AnyStream, CommonOption, OutboundHandler, OutboundType,
utils::provider_helper::get_proxies_from_providers, AnyOutboundHandler, AnyStream,
CommonOption, OutboundHandler, OutboundType,
};

#[derive(Default, Clone)]
Expand Down Expand Up @@ -121,7 +122,7 @@ impl OutboundHandler for Handler {
&self,
sess: &Session,
resolver: ThreadSafeDNSResolver,
) -> io::Result<AnyOutboundDatagram> {
) -> io::Result<BoxedChainedDatagram> {
let proxies = self.get_proxies(false).await;
let proxy = (self.inner.lock().await.strategy_fn)(proxies, &sess).await?;
debug!("{} use proxy {}", self.name(), proxy.name());
Expand Down
6 changes: 3 additions & 3 deletions clash_lib/src/proxy/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use mockall::mock;

use crate::{
app::{
dispatcher::BoxedChainedStream,
dispatcher::{BoxedChainedDatagram, BoxedChainedStream},
dns::ThreadSafeDNSResolver,
remote_content_manager::providers::{
proxy_provider::ProxyProvider, Provider, ProviderType, ProviderVehicleType,
Expand All @@ -14,7 +14,7 @@ use crate::{
session::{Session, SocksAddr},
};

use super::{AnyOutboundDatagram, AnyOutboundHandler, AnyStream, OutboundHandler, OutboundType};
use super::{AnyOutboundHandler, AnyStream, OutboundHandler, OutboundType};

mock! {
pub DummyProxyProvider {}
Expand Down Expand Up @@ -77,7 +77,7 @@ mock! {
&self,
sess: &Session,
resolver: ThreadSafeDNSResolver,
) -> io::Result<AnyOutboundDatagram>;
) -> io::Result<BoxedChainedDatagram>;

/// for API
async fn as_map(&self) -> HashMap<String, Box<dyn Serialize + Send>>;
Expand Down
19 changes: 2 additions & 17 deletions clash_lib/src/proxy/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::app::dispatcher::BoxedChainedStream;
use crate::app::dispatcher::{BoxedChainedDatagram, BoxedChainedStream};
use crate::app::dns::ThreadSafeDNSResolver;
use crate::proxy::datagram::UdpPacket;
use crate::proxy::utils::Interface;
Expand Down Expand Up @@ -91,21 +91,6 @@ impl<T, U> OutboundDatagram<U> for T where
pub type AnyOutboundDatagram =
Box<dyn OutboundDatagram<UdpPacket, Item = UdpPacket, Error = io::Error>>;

#[async_trait]
pub trait OutboundDatagramRecvHalf: Sync + Send + Unpin {
/// Receives a message on the socket. On success, returns the number of
/// bytes read and the origin of the message.
async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocksAddr)>;
}

/// The send half.
#[async_trait]
pub trait OutboundDatagramSendHalf: Sync + Send + Unpin {
/// Sends a message on the socket to `dst_addr`. On success, returns the
/// number of bytes sent.
async fn send_to(&mut self, buf: &[u8], dst_addr: &SocksAddr) -> io::Result<usize>;
}

#[derive(Default, Debug, Clone)]
pub struct CommonOption {
#[allow(dead_code)]
Expand Down Expand Up @@ -178,7 +163,7 @@ pub trait OutboundHandler: Sync + Send + Unpin {
&self,
sess: &Session,
resolver: ThreadSafeDNSResolver,
) -> io::Result<AnyOutboundDatagram>;
) -> io::Result<BoxedChainedDatagram>;

/// for API
/// the map only contains basic information
Expand Down
6 changes: 3 additions & 3 deletions clash_lib/src/proxy/reject/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::app::dispatcher::BoxedChainedStream;
use crate::app::dispatcher::{BoxedChainedDatagram, BoxedChainedStream};
use crate::app::dns::ThreadSafeDNSResolver;
use crate::config::internal::proxy::PROXY_REJECT;
use crate::proxy::{AnyOutboundDatagram, AnyOutboundHandler, AnyStream, OutboundHandler};
use crate::proxy::{AnyOutboundHandler, AnyStream, OutboundHandler};
use crate::session::{Session, SocksAddr};
use async_trait::async_trait;
use serde::Serialize;
Expand Down Expand Up @@ -58,7 +58,7 @@ impl OutboundHandler for Handler {
&self,
#[allow(unused_variables)] sess: &Session,
#[allow(unused_variables)] _resolver: ThreadSafeDNSResolver,
) -> io::Result<AnyOutboundDatagram> {
) -> io::Result<BoxedChainedDatagram> {
Err(io::Error::new(io::ErrorKind::Other, "REJECT"))
}
}
10 changes: 6 additions & 4 deletions clash_lib/src/proxy/relay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use futures::stream::{self, StreamExt};

use crate::{
app::{
dispatcher::{BoxedChainedStream, ChainedStream, ChainedStreamWrapper},
dispatcher::{
BoxedChainedDatagram, BoxedChainedStream, ChainedStream, ChainedStreamWrapper,
},
dns::ThreadSafeDNSResolver,
remote_content_manager::providers::proxy_provider::ThreadSafeProxyProvider,
},
Expand All @@ -16,8 +18,8 @@ use crate::{
};

use super::{
utils::provider_helper::get_proxies_from_providers, AnyOutboundDatagram, AnyOutboundHandler,
AnyStream, CommonOption, OutboundHandler, OutboundType,
utils::provider_helper::get_proxies_from_providers, AnyOutboundHandler, AnyStream,
CommonOption, OutboundHandler, OutboundType,
};

#[derive(Default)]
Expand Down Expand Up @@ -130,7 +132,7 @@ impl OutboundHandler for Handler {
&self,
_sess: &Session,
_resolver: ThreadSafeDNSResolver,
) -> io::Result<AnyOutboundDatagram> {
) -> io::Result<BoxedChainedDatagram> {
Err(new_io_error("not implemented for Relay"))
}

Expand Down
Loading

0 comments on commit 59d85fd

Please sign in to comment.