Skip to content

Commit

Permalink
Performance options, better logs
Browse files Browse the repository at this point in the history
  • Loading branch information
vi committed Aug 13, 2023
1 parent cb690f9 commit 6899e43
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 13 deletions.
10 changes: 10 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ pub struct Opts {
/// maximum transfer unit to use for TCP. Default is 1420.
#[argh(option, default="1420")]
pub mtu: usize,

/// in-application socket TCP buffer size. Note that operating system socket buffer also applies.
#[argh(option, default="65535")]
pub tcp_buffer_size: usize,

/// nubmer of outgoing (to wireguard) packets to hold in a queue
#[argh(option, default="256")]
pub transmit_queue_capacity: usize,
}


Expand Down Expand Up @@ -71,6 +79,7 @@ async fn main() -> anyhow::Result<()> {
peer_endpoint: opts.peer_endpoint,
keepalive_interval: opts.keepalive_interval,
bind_ip_port: opts.bind_ip_port,
transmit_queue_capacity: opts.transmit_queue_capacity,
};

let (wg_tx, wg_rx) = wgopts.start().await?;
Expand All @@ -79,6 +88,7 @@ async fn main() -> anyhow::Result<()> {
dns_addr: opts.dns,
pingable: opts.pingable,
mtu: opts.mtu,
tcp_buffer_size: opts.tcp_buffer_size,
};

router::run(wg_rx, wg_tx, r_opts).await?;
Expand Down
19 changes: 16 additions & 3 deletions src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,21 @@ enum NatKey {
}
}

impl std::fmt::Display for NatKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
NatKey::Tcp { client_side, external_side } => write!(f, "TCP {client_side} -> {external_side}"),
NatKey::Udp { client_side } => write!(f, "UDP {client_side} -> *"),
NatKey::Pingable { client_side, external_side } => write!(f, "Pinger {client_side} -> {external_side}"),
}
}
}

pub struct Opts {
pub dns_addr: Option<SocketAddr>,
pub pingable: Option<IpAddr>,
pub mtu: usize,
pub tcp_buffer_size: usize,
}

mod serve_dns;
Expand All @@ -41,6 +52,7 @@ pub async fn run(
opts: Opts,
) -> anyhow::Result<()> {
let mtu = opts.mtu;
let tcp_buffer_size = opts.tcp_buffer_size;
let mut table = HashMap::<NatKey, Sender<BytesMut>>::new();

let (tx_closes, mut rx_closes): (Sender<NatKey>, Receiver<NatKey>) = channel(4);
Expand Down Expand Up @@ -167,7 +179,7 @@ pub async fn run(
let per_socket_sender: &mut Sender<BytesMut> = match table.entry(key) {
hashbrown::hash_map::Entry::Occupied(entry) => entry.into_mut(),
hashbrown::hash_map::Entry::Vacant(entry) => {
info!("New NAT entry for {:?}", key);
info!("Serving {}", key);
let tx_to_wg2 = tx_to_wg.clone();
let (tx_persocket_fromwg, rx_persocket_fromwg) = channel(4);
let k = entry.key().clone();
Expand All @@ -184,6 +196,7 @@ pub async fn run(
external_side,
client_side,
mtu,
tcp_buffer_size,
)
.await
}
Expand All @@ -206,9 +219,9 @@ pub async fn run(
}
};
if let Err(e) = ret {
error!("Finished serving {k:?}: {e}");
error!(" finished serving {k}: {e}");
} else {
debug!("Finished serving {k:?}");
info!(" Finished serving {k}");
}
let _ = tx_closes.send(k).await;
});
Expand Down
4 changes: 2 additions & 2 deletions src/router/serve_dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use smoltcp::{
},
};

use tracing::{warn, debug};
use tracing::{warn, info};


pub async fn dns(
Expand Down Expand Up @@ -66,7 +66,7 @@ pub async fn dns(
let mut reply = dns.clone().into_reply();

let nam = format!("{}:0", q.qname);
debug!("DNS query {nam}");
info!("DNS query {nam} from {src_addr} {srcport}");

if let Ok(ret) = tokio::net::lookup_host(nam).await {
for x in ret {
Expand Down
15 changes: 8 additions & 7 deletions src/router/serve_tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub async fn tcp_outgoing_connection(
external_addr: IpEndpoint,
_client_addr: IpEndpoint,
mtu: usize,
tcp_buffer_size: usize,
) -> anyhow::Result<()> {
let target_addr = match external_addr.addr {
IpAddress::Ipv4(x) => SocketAddr::new(std::net::IpAddr::V4(x.into()), external_addr.port),
Expand Down Expand Up @@ -112,11 +113,11 @@ pub async fn tcp_outgoing_connection(
let (mut tcp_r, mut tcp_w) = tcp.split();
debug!("Connected to upstream TCP");

let tcp_rx_buffer = tcp::SocketBuffer::new(vec![0; 65535]);
let tcp_tx_buffer = tcp::SocketBuffer::new(vec![0; 65535]);
let tcp_rx_buffer = tcp::SocketBuffer::new(vec![0; tcp_buffer_size]);
let tcp_tx_buffer = tcp::SocketBuffer::new(vec![0; tcp_buffer_size]);
let tcp_socket = tcp::Socket::new(tcp_rx_buffer, tcp_tx_buffer);

let mut external_tcp_buffer = [0; 32768];
let mut external_tcp_buffer = vec![0; tcp_buffer_size];

let mut sockets = SocketSet::new([SocketStorage::EMPTY]);
let h = sockets.add(tcp_socket);
Expand Down Expand Up @@ -220,15 +221,15 @@ pub async fn tcp_outgoing_connection(
biased;
x = rx_from_wg.recv() => SelectOutcome::PacketFromWg(x),
x = tcp_w.write(dtstes.unwrap_or(b"")), if dtstes.is_some() => SelectOutcome::WrittenToRealTcpSocket(x),
x = tcp_r.read(&mut external_tcp_buffer[..]), if nbsend > 0 => SelectOutcome::ReadFromRealTcpSocket(x),
x = tcp_r.read(&mut external_tcp_buffer[..nbsend]), if nbsend > 0 => SelectOutcome::ReadFromRealTcpSocket(x),
_ = tmo => SelectOutcome::TimePassed,
}
} else {
tokio::select! {
biased;
x = rx_from_wg.recv() => SelectOutcome::PacketFromWg(x),
x = tcp_w.shutdown() => { SelectOutcome::WrittenToRealTcpSocket(x.map(|()|0)) }
x = tcp_r.read(&mut external_tcp_buffer[..]), if nbsend > 0 => SelectOutcome::ReadFromRealTcpSocket(x),
x = tcp_r.read(&mut external_tcp_buffer[..nbsend]), if nbsend > 0 => SelectOutcome::ReadFromRealTcpSocket(x),
_ = tmo => SelectOutcome::TimePassed,
}
}
Expand All @@ -238,15 +239,15 @@ pub async fn tcp_outgoing_connection(
biased;
x = rx_from_wg.recv() => SelectOutcome::PacketFromWg(x),
x = tcp_w.write(dtstes.unwrap_or(b"")), if dtstes.is_some() => SelectOutcome::WrittenToRealTcpSocket(x),
x = tcp_r.read(&mut external_tcp_buffer[..]), if nbsend > 0 => SelectOutcome::ReadFromRealTcpSocket(x),
x = tcp_r.read(&mut external_tcp_buffer[..nbsend]), if nbsend > 0 => SelectOutcome::ReadFromRealTcpSocket(x),
_ = std::future::ready(()) => SelectOutcome::Noop,
}
} else {
tokio::select! {
biased;
x = rx_from_wg.recv() => SelectOutcome::PacketFromWg(x),
x = tcp_w.shutdown() => { SelectOutcome::WrittenToRealTcpSocket(x.map(|()|0)) }
x = tcp_r.read(&mut external_tcp_buffer[..]), if nbsend > 0 => SelectOutcome::ReadFromRealTcpSocket(x),
x = tcp_r.read(&mut external_tcp_buffer[..nbsend]), if nbsend > 0 => SelectOutcome::ReadFromRealTcpSocket(x),
_ = std::future::ready(()) => SelectOutcome::Noop,
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/wg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub struct Opts {
pub peer_endpoint: Option<SocketAddr>,
pub keepalive_interval: Option<u16>,
pub bind_ip_port: SocketAddr,
pub transmit_queue_capacity: usize,
}

impl Opts {
Expand All @@ -28,7 +29,7 @@ impl Opts {
None,
).map_err(|e|anyhow::anyhow!(e))?;

let (tx_towg, mut rx_towg) = channel(64);
let (tx_towg, mut rx_towg) = channel(self.transmit_queue_capacity);
let (tx_fromwg, rx_fromwg) = channel(4);

let udp = tokio::net::UdpSocket::bind(self.bind_ip_port).await?;
Expand Down

0 comments on commit 6899e43

Please sign in to comment.