From 6899e439a4ed788a9876cabc717482fe7f5ac60f Mon Sep 17 00:00:00 2001 From: Vitaly _Vi Shukela Date: Sun, 13 Aug 2023 04:21:16 +0200 Subject: [PATCH] Performance options, better logs --- src/main.rs | 10 ++++++++++ src/router.rs | 19 ++++++++++++++++--- src/router/serve_dns.rs | 4 ++-- src/router/serve_tcp.rs | 15 ++++++++------- src/wg.rs | 3 ++- 5 files changed, 38 insertions(+), 13 deletions(-) diff --git a/src/main.rs b/src/main.rs index 0987069..61cba73 100644 --- a/src/main.rs +++ b/src/main.rs @@ -41,6 +41,14 @@ pub struct Opts { /// maximum transfer unit to use for TCP. Default is 1420. #[argh(option, default="1420")] pub mtu: usize, + + /// in-application socket TCP buffer size. Note that operating system socket buffer also applies. + #[argh(option, default="65535")] + pub tcp_buffer_size: usize, + + /// nubmer of outgoing (to wireguard) packets to hold in a queue + #[argh(option, default="256")] + pub transmit_queue_capacity: usize, } @@ -71,6 +79,7 @@ async fn main() -> anyhow::Result<()> { peer_endpoint: opts.peer_endpoint, keepalive_interval: opts.keepalive_interval, bind_ip_port: opts.bind_ip_port, + transmit_queue_capacity: opts.transmit_queue_capacity, }; let (wg_tx, wg_rx) = wgopts.start().await?; @@ -79,6 +88,7 @@ async fn main() -> anyhow::Result<()> { dns_addr: opts.dns, pingable: opts.pingable, mtu: opts.mtu, + tcp_buffer_size: opts.tcp_buffer_size, }; router::run(wg_rx, wg_tx, r_opts).await?; diff --git a/src/router.rs b/src/router.rs index 3c0dee0..0fae0f1 100644 --- a/src/router.rs +++ b/src/router.rs @@ -24,10 +24,21 @@ enum NatKey { } } +impl std::fmt::Display for NatKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + NatKey::Tcp { client_side, external_side } => write!(f, "TCP {client_side} -> {external_side}"), + NatKey::Udp { client_side } => write!(f, "UDP {client_side} -> *"), + NatKey::Pingable { client_side, external_side } => write!(f, "Pinger {client_side} -> {external_side}"), + } + } +} + pub struct Opts { pub dns_addr: Option, pub pingable: Option, pub mtu: usize, + pub tcp_buffer_size: usize, } mod serve_dns; @@ -41,6 +52,7 @@ pub async fn run( opts: Opts, ) -> anyhow::Result<()> { let mtu = opts.mtu; + let tcp_buffer_size = opts.tcp_buffer_size; let mut table = HashMap::>::new(); let (tx_closes, mut rx_closes): (Sender, Receiver) = channel(4); @@ -167,7 +179,7 @@ pub async fn run( let per_socket_sender: &mut Sender = match table.entry(key) { hashbrown::hash_map::Entry::Occupied(entry) => entry.into_mut(), hashbrown::hash_map::Entry::Vacant(entry) => { - info!("New NAT entry for {:?}", key); + info!("Serving {}", key); let tx_to_wg2 = tx_to_wg.clone(); let (tx_persocket_fromwg, rx_persocket_fromwg) = channel(4); let k = entry.key().clone(); @@ -184,6 +196,7 @@ pub async fn run( external_side, client_side, mtu, + tcp_buffer_size, ) .await } @@ -206,9 +219,9 @@ pub async fn run( } }; if let Err(e) = ret { - error!("Finished serving {k:?}: {e}"); + error!(" finished serving {k}: {e}"); } else { - debug!("Finished serving {k:?}"); + info!(" Finished serving {k}"); } let _ = tx_closes.send(k).await; }); diff --git a/src/router/serve_dns.rs b/src/router/serve_dns.rs index 65c2692..da09d4a 100644 --- a/src/router/serve_dns.rs +++ b/src/router/serve_dns.rs @@ -9,7 +9,7 @@ use smoltcp::{ }, }; -use tracing::{warn, debug}; +use tracing::{warn, info}; pub async fn dns( @@ -66,7 +66,7 @@ pub async fn dns( let mut reply = dns.clone().into_reply(); let nam = format!("{}:0", q.qname); - debug!("DNS query {nam}"); + info!("DNS query {nam} from {src_addr} {srcport}"); if let Ok(ret) = tokio::net::lookup_host(nam).await { for x in ret { diff --git a/src/router/serve_tcp.rs b/src/router/serve_tcp.rs index b280a47..b5905ee 100644 --- a/src/router/serve_tcp.rs +++ b/src/router/serve_tcp.rs @@ -24,6 +24,7 @@ pub async fn tcp_outgoing_connection( external_addr: IpEndpoint, _client_addr: IpEndpoint, mtu: usize, + tcp_buffer_size: usize, ) -> anyhow::Result<()> { let target_addr = match external_addr.addr { IpAddress::Ipv4(x) => SocketAddr::new(std::net::IpAddr::V4(x.into()), external_addr.port), @@ -112,11 +113,11 @@ pub async fn tcp_outgoing_connection( let (mut tcp_r, mut tcp_w) = tcp.split(); debug!("Connected to upstream TCP"); - let tcp_rx_buffer = tcp::SocketBuffer::new(vec![0; 65535]); - let tcp_tx_buffer = tcp::SocketBuffer::new(vec![0; 65535]); + let tcp_rx_buffer = tcp::SocketBuffer::new(vec![0; tcp_buffer_size]); + let tcp_tx_buffer = tcp::SocketBuffer::new(vec![0; tcp_buffer_size]); let tcp_socket = tcp::Socket::new(tcp_rx_buffer, tcp_tx_buffer); - let mut external_tcp_buffer = [0; 32768]; + let mut external_tcp_buffer = vec![0; tcp_buffer_size]; let mut sockets = SocketSet::new([SocketStorage::EMPTY]); let h = sockets.add(tcp_socket); @@ -220,7 +221,7 @@ pub async fn tcp_outgoing_connection( biased; x = rx_from_wg.recv() => SelectOutcome::PacketFromWg(x), x = tcp_w.write(dtstes.unwrap_or(b"")), if dtstes.is_some() => SelectOutcome::WrittenToRealTcpSocket(x), - x = tcp_r.read(&mut external_tcp_buffer[..]), if nbsend > 0 => SelectOutcome::ReadFromRealTcpSocket(x), + x = tcp_r.read(&mut external_tcp_buffer[..nbsend]), if nbsend > 0 => SelectOutcome::ReadFromRealTcpSocket(x), _ = tmo => SelectOutcome::TimePassed, } } else { @@ -228,7 +229,7 @@ pub async fn tcp_outgoing_connection( biased; x = rx_from_wg.recv() => SelectOutcome::PacketFromWg(x), x = tcp_w.shutdown() => { SelectOutcome::WrittenToRealTcpSocket(x.map(|()|0)) } - x = tcp_r.read(&mut external_tcp_buffer[..]), if nbsend > 0 => SelectOutcome::ReadFromRealTcpSocket(x), + x = tcp_r.read(&mut external_tcp_buffer[..nbsend]), if nbsend > 0 => SelectOutcome::ReadFromRealTcpSocket(x), _ = tmo => SelectOutcome::TimePassed, } } @@ -238,7 +239,7 @@ pub async fn tcp_outgoing_connection( biased; x = rx_from_wg.recv() => SelectOutcome::PacketFromWg(x), x = tcp_w.write(dtstes.unwrap_or(b"")), if dtstes.is_some() => SelectOutcome::WrittenToRealTcpSocket(x), - x = tcp_r.read(&mut external_tcp_buffer[..]), if nbsend > 0 => SelectOutcome::ReadFromRealTcpSocket(x), + x = tcp_r.read(&mut external_tcp_buffer[..nbsend]), if nbsend > 0 => SelectOutcome::ReadFromRealTcpSocket(x), _ = std::future::ready(()) => SelectOutcome::Noop, } } else { @@ -246,7 +247,7 @@ pub async fn tcp_outgoing_connection( biased; x = rx_from_wg.recv() => SelectOutcome::PacketFromWg(x), x = tcp_w.shutdown() => { SelectOutcome::WrittenToRealTcpSocket(x.map(|()|0)) } - x = tcp_r.read(&mut external_tcp_buffer[..]), if nbsend > 0 => SelectOutcome::ReadFromRealTcpSocket(x), + x = tcp_r.read(&mut external_tcp_buffer[..nbsend]), if nbsend > 0 => SelectOutcome::ReadFromRealTcpSocket(x), _ = std::future::ready(()) => SelectOutcome::Noop, } } diff --git a/src/wg.rs b/src/wg.rs index dee2aa5..6e19d20 100644 --- a/src/wg.rs +++ b/src/wg.rs @@ -15,6 +15,7 @@ pub struct Opts { pub peer_endpoint: Option, pub keepalive_interval: Option, pub bind_ip_port: SocketAddr, + pub transmit_queue_capacity: usize, } impl Opts { @@ -28,7 +29,7 @@ impl Opts { None, ).map_err(|e|anyhow::anyhow!(e))?; - let (tx_towg, mut rx_towg) = channel(64); + let (tx_towg, mut rx_towg) = channel(self.transmit_queue_capacity); let (tx_fromwg, rx_fromwg) = channel(4); let udp = tokio::net::UdpSocket::bind(self.bind_ip_port).await?;